KAFKA
Intermediaire

Topic-as-a-Service: Industrialiser Kafka pour les Equipes

Topic-as-a-Service: Industrialiser Kafka pour les Equipes

Comment construire une plateforme Kafka self-service pour vos equipes : API de provisioning, workflows d'approbation, quotas automatises, et monitoring integre.

Florian Courouge
18 min de lecture
4,125 mots
0 vues
Kafka
Platform Engineering
Self-Service
Governance
DevOps
Automation
API
GitOps

Topic-as-a-Service: Industrialiser Kafka pour les Equipes

A mesure que l'adoption de Kafka grandit dans une organisation, la gestion manuelle des topics devient un goulot d'etranglement. Les equipes attendent des jours pour obtenir un topic, les conventions ne sont pas respectees, et l'equipe plateforme croule sous les demandes. La solution ? Topic-as-a-Service (TaaS) : une approche self-service qui democratise l'acces a Kafka tout en maintenant gouvernance et securite.

Topic-as-a-Service Architecture

Pourquoi Topic-as-a-Service ?

Les Douleurs du Mode "Ticket"

┌─────────────────────────────────────────────────────────────────┐
│              WORKFLOW TRADITIONNEL (A EVITER)                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Dev Team        Ticket System       Platform Team    Kafka     │
│     │                 │                    │            │       │
│     │── Create Ticket ▶                    │            │       │
│     │                 │── Assign ─────────▶│            │       │
│     │                 │                    │── Review ──│       │
│     │                 │                    │            │       │
│     │                 │◀── Questions ──────│            │       │
│     │◀── Questions ───│                    │            │       │
│     │── Answers ──────▶                    │            │       │
│     │                 │── Forward ────────▶│            │       │
│     │                 │                    │── Create ──▶       │
│     │                 │◀── Done ───────────│            │       │
│     │◀── Resolved ────│                    │            │       │
│     │                                                           │
│     │        Delai moyen : 3-5 jours                           │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Problemes identifies :

  • Delai de provisioning : 3-5 jours en moyenne
  • Charge sur l'equipe plateforme : 40% du temps en tickets
  • Inconsistances : nommage, partitioning, retention varies
  • Pas de visibilite : qui utilise quoi ?

La Vision Topic-as-a-Service

┌─────────────────────────────────────────────────────────────────┐
│              WORKFLOW TOPIC-AS-A-SERVICE                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Dev Team        TaaS Portal          Automation      Kafka     │
│     │                 │                    │            │       │
│     │── Request ─────▶│                    │            │       │
│     │                 │── Validate ───────▶│            │       │
│     │                 │◀── OK ─────────────│            │       │
│     │                 │── Create Topic ────────────────▶│       │
│     │                 │── Configure ACLs ──────────────▶│       │
│     │                 │── Set Quotas ─────────────────▶│       │
│     │◀── Ready ───────│                    │            │       │
│     │                                                           │
│     │        Delai moyen : 5 minutes                           │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Architecture de la Plateforme TaaS

Vue d'Ensemble

┌─────────────────────────────────────────────────────────────────┐
│                    TOPIC-AS-A-SERVICE PLATFORM                   │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    SELF-SERVICE PORTAL                    │   │
│  │  - Topic Request Form                                     │   │
│  │  - Schema Designer                                        │   │
│  │  - Topic Catalog Browser                                  │   │
│  │  - Usage Dashboard                                        │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    API GATEWAY                            │   │
│  │  - Authentication (OAuth2/OIDC)                          │   │
│  │  - Rate Limiting                                          │   │
│  │  - Request Validation                                     │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                 PROVISIONING ENGINE                       │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       │   │
│  │  │  Validator  │ │  Approver   │ │  Executor   │       │   │
│  │  └─────────────┘ └─────────────┘ └─────────────┘       │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│         ┌────────────────────┼────────────────────┐             │
│         ▼                    ▼                    ▼             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐       │
│  │   Kafka     │     │   Schema    │     │   IAM       │       │
│  │  Clusters   │     │  Registry   │     │  (ACLs)     │       │
│  └─────────────┘     └─────────────┘     └─────────────┘       │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Composants Cles

Composant Role Technologies
Portal Interface utilisateur self-service React, Vue.js
API Gateway Authentification, rate limiting Kong, Traefik
Provisioning Engine Orchestration des creations Go, Python, Java
Topic Catalog Inventaire et metadata PostgreSQL, Elasticsearch
Approval Workflow Validation automatique/manuelle Temporal, Argo Workflows

Design de l'API TaaS

Specification OpenAPI

# openapi.yaml
openapi: 3.0.3
info:
  title: Topic-as-a-Service API
  version: 1.0.0
  description: Self-service Kafka topic provisioning API

paths:
  /api/v1/topics:
    post:
      summary: Request a new topic
      operationId: createTopicRequest
      security:
        - oauth2: [topics:write]
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/TopicRequest'
      responses:
        '202':
          description: Request accepted
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/TopicRequestResponse'
        '400':
          description: Validation error
        '403':
          description: Insufficient permissions

    get:
      summary: List topics owned by requester
      operationId: listTopics
      security:
        - oauth2: [topics:read]
      parameters:
        - name: owner
          in: query
          schema:
            type: string
        - name: environment
          in: query
          schema:
            type: string
            enum: [dev, staging, prod]
      responses:
        '200':
          description: List of topics
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/Topic'

  /api/v1/topics/{topicName}:
    get:
      summary: Get topic details
      operationId: getTopic
      parameters:
        - name: topicName
          in: path
          required: true
          schema:
            type: string
      responses:
        '200':
          description: Topic details
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/TopicDetails'

    delete:
      summary: Request topic deletion
      operationId: deleteTopic
      responses:
        '202':
          description: Deletion request accepted

  /api/v1/topics/{topicName}/acls:
    post:
      summary: Grant access to topic
      operationId: grantAccess
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/ACLRequest'
      responses:
        '200':
          description: Access granted

components:
  schemas:
    TopicRequest:
      type: object
      required:
        - name
        - domain
        - owner
        - environment
        - description
      properties:
        name:
          type: string
          pattern: '^[a-z][a-z0-9-]*$'
          minLength: 3
          maxLength: 64
          description: Topic name (without domain prefix)
        domain:
          type: string
          enum: [banking, marketing, analytics, operations]
        subdomain:
          type: string
        owner:
          type: string
          description: Team or service account owner
        environment:
          type: string
          enum: [dev, staging, prod]
        description:
          type: string
          minLength: 20
        partitions:
          type: integer
          minimum: 1
          maximum: 100
          default: 6
        replicationFactor:
          type: integer
          enum: [1, 2, 3]
          default: 3
        retentionMs:
          type: integer
          description: Retention in milliseconds
          default: 604800000  # 7 days
        schemaType:
          type: string
          enum: [avro, json, protobuf, none]
          default: avro
        schema:
          type: string
          description: Avro/JSON/Protobuf schema definition
        classification:
          type: string
          enum: [public, internal, confidential, restricted]
          default: internal
        criticality:
          type: string
          enum: [low, medium, high, critical]
          default: medium

    TopicRequestResponse:
      type: object
      properties:
        requestId:
          type: string
          format: uuid
        status:
          type: string
          enum: [pending, approved, provisioning, ready, rejected]
        topicName:
          type: string
        estimatedReadyTime:
          type: string
          format: date-time
        approvalRequired:
          type: boolean

    TopicDetails:
      type: object
      properties:
        name:
          type: string
        fullName:
          type: string
          description: Full topic name with domain prefix
        partitions:
          type: integer
        replicationFactor:
          type: integer
        config:
          type: object
          additionalProperties:
            type: string
        owner:
          type: string
        createdAt:
          type: string
          format: date-time
        metrics:
          $ref: '#/components/schemas/TopicMetrics'

    TopicMetrics:
      type: object
      properties:
        messagesPerSecond:
          type: number
        bytesPerSecond:
          type: number
        consumerGroups:
          type: integer
        avgLag:
          type: integer

Implementation Python (FastAPI)

# app/api/topics.py
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from enum import Enum
import uuid
from datetime import datetime, timedelta

from app.auth import get_current_user, User
from app.services.kafka_provisioner import KafkaProvisioner
from app.services.schema_registry import SchemaRegistryClient
from app.services.approval_workflow import ApprovalWorkflow
from app.models import TopicRequest, TopicStatus

router = APIRouter(prefix="/api/v1/topics", tags=["topics"])

class Environment(str, Enum):
    DEV = "dev"
    STAGING = "staging"
    PROD = "prod"

class Domain(str, Enum):
    BANKING = "banking"
    MARKETING = "marketing"
    ANALYTICS = "analytics"
    OPERATIONS = "operations"

class Criticality(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class TopicRequestModel(BaseModel):
    name: str = Field(..., min_length=3, max_length=64, regex="^[a-z][a-z0-9-]*$")
    domain: Domain
    subdomain: Optional[str] = None
    owner: str
    environment: Environment
    description: str = Field(..., min_length=20)
    partitions: int = Field(default=6, ge=1, le=100)
    replication_factor: int = Field(default=3, ge=1, le=3)
    retention_ms: int = Field(default=604800000)  # 7 days
    schema_type: str = Field(default="avro")
    schema_definition: Optional[str] = None
    criticality: Criticality = Criticality.MEDIUM

    @validator('replication_factor')
    def validate_replication(cls, v, values):
        if values.get('environment') == Environment.PROD and v < 3:
            raise ValueError('Production topics must have replication factor >= 3')
        return v

    @validator('partitions')
    def validate_partitions(cls, v, values):
        if values.get('criticality') == Criticality.CRITICAL and v < 6:
            raise ValueError('Critical topics must have at least 6 partitions')
        return v


@router.post("/", status_code=202)
async def create_topic_request(
    request: TopicRequestModel,
    background_tasks: BackgroundTasks,
    current_user: User = Depends(get_current_user),
    provisioner: KafkaProvisioner = Depends(),
    approval_workflow: ApprovalWorkflow = Depends()
):
    """
    Submit a new topic creation request.
    Topics in dev/staging are auto-approved.
    Production topics require approval for critical/high criticality.
    """
    # Generate full topic name
    full_topic_name = generate_topic_name(request)

    # Check if topic already exists
    if await provisioner.topic_exists(full_topic_name, request.environment):
        raise HTTPException(status_code=409, detail="Topic already exists")

    # Create request record
    request_id = str(uuid.uuid4())
    topic_request = TopicRequest(
        id=request_id,
        full_name=full_topic_name,
        requester=current_user.username,
        request_data=request.dict(),
        status=TopicStatus.PENDING,
        created_at=datetime.utcnow()
    )

    # Determine if approval is required
    requires_approval = (
        request.environment == Environment.PROD and
        request.criticality in [Criticality.HIGH, Criticality.CRITICAL]
    )

    if requires_approval:
        # Submit to approval workflow
        await approval_workflow.submit(topic_request)
        return {
            "request_id": request_id,
            "status": "pending_approval",
            "topic_name": full_topic_name,
            "approval_required": True,
            "estimated_ready_time": (datetime.utcnow() + timedelta(hours=24)).isoformat()
        }
    else:
        # Auto-approve and provision in background
        background_tasks.add_task(
            provision_topic,
            topic_request,
            provisioner
        )
        return {
            "request_id": request_id,
            "status": "provisioning",
            "topic_name": full_topic_name,
            "approval_required": False,
            "estimated_ready_time": (datetime.utcnow() + timedelta(minutes=5)).isoformat()
        }


def generate_topic_name(request: TopicRequestModel) -> str:
    """Generate standardized topic name."""
    parts = [request.domain.value]
    if request.subdomain:
        parts.append(request.subdomain)
    parts.append(request.name)
    return ".".join(parts)


async def provision_topic(
    topic_request: TopicRequest,
    provisioner: KafkaProvisioner
):
    """Background task to provision topic."""
    try:
        # Create topic
        await provisioner.create_topic(
            name=topic_request.full_name,
            partitions=topic_request.request_data['partitions'],
            replication_factor=topic_request.request_data['replication_factor'],
            config={
                'retention.ms': str(topic_request.request_data['retention_ms']),
                'cleanup.policy': 'delete'
            },
            environment=topic_request.request_data['environment']
        )

        # Register schema if provided
        if topic_request.request_data.get('schema_definition'):
            schema_client = SchemaRegistryClient()
            await schema_client.register_schema(
                subject=f"{topic_request.full_name}-value",
                schema=topic_request.request_data['schema_definition'],
                schema_type=topic_request.request_data['schema_type']
            )

        # Set up ACLs
        await provisioner.create_acls(
            topic=topic_request.full_name,
            owner=topic_request.request_data['owner'],
            environment=topic_request.request_data['environment']
        )

        # Update request status
        topic_request.status = TopicStatus.READY
        topic_request.provisioned_at = datetime.utcnow()

    except Exception as e:
        topic_request.status = TopicStatus.FAILED
        topic_request.error_message = str(e)
        raise


@router.get("/{topic_name}")
async def get_topic(
    topic_name: str,
    environment: Environment,
    current_user: User = Depends(get_current_user),
    provisioner: KafkaProvisioner = Depends()
):
    """Get topic details including metrics."""
    topic = await provisioner.get_topic_details(topic_name, environment)
    if not topic:
        raise HTTPException(status_code=404, detail="Topic not found")

    # Get metrics
    metrics = await provisioner.get_topic_metrics(topic_name, environment)

    return {
        "name": topic_name,
        "environment": environment,
        "partitions": topic.partitions,
        "replication_factor": topic.replication_factor,
        "config": topic.config,
        "metrics": metrics
    }


@router.get("/")
async def list_topics(
    owner: Optional[str] = None,
    environment: Optional[Environment] = None,
    domain: Optional[Domain] = None,
    current_user: User = Depends(get_current_user),
    provisioner: KafkaProvisioner = Depends()
):
    """List topics with optional filters."""
    filters = {}
    if owner:
        filters['owner'] = owner
    if environment:
        filters['environment'] = environment.value
    if domain:
        filters['domain'] = domain.value

    topics = await provisioner.list_topics(filters)
    return topics

Workflows d'Approbation

Matrice d'Approbation

┌─────────────────────────────────────────────────────────────────┐
│                    APPROVAL MATRIX                               │
├─────────────────┬─────────────┬─────────────┬───────────────────┤
│ Criticality     │ Dev         │ Staging     │ Production        │
├─────────────────┼─────────────┼─────────────┼───────────────────┤
│ Low             │ Auto        │ Auto        │ Auto              │
│ Medium          │ Auto        │ Auto        │ Team Lead         │
│ High            │ Auto        │ Team Lead   │ Platform + Data   │
│ Critical        │ Team Lead   │ Platform    │ Platform + CISO   │
└─────────────────┴─────────────┴─────────────┴───────────────────┘

Implementation Temporal Workflow

# workflows/approval_workflow.py
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from datetime import timedelta
from typing import List
import asyncio

@activity.defn
async def notify_approvers(request_id: str, approvers: List[str]) -> None:
    """Send notification to approvers via Slack/Email."""
    from app.services.notifications import NotificationService
    notification_service = NotificationService()

    for approver in approvers:
        await notification_service.send(
            recipient=approver,
            template="topic_approval_request",
            data={
                "request_id": request_id,
                "approval_url": f"https://kafka-portal.company.com/approvals/{request_id}"
            }
        )

@activity.defn
async def check_approval_status(request_id: str) -> dict:
    """Check if request has been approved/rejected."""
    from app.repositories.approval_repository import ApprovalRepository
    repo = ApprovalRepository()
    return await repo.get_status(request_id)

@activity.defn
async def provision_topic(request_id: str) -> dict:
    """Provision the approved topic."""
    from app.services.kafka_provisioner import KafkaProvisioner
    from app.repositories.topic_request_repository import TopicRequestRepository

    repo = TopicRequestRepository()
    request = await repo.get(request_id)

    provisioner = KafkaProvisioner()
    result = await provisioner.provision(request)

    return {"topic_name": result.topic_name, "status": "provisioned"}

@activity.defn
async def notify_requester(request_id: str, status: str, message: str) -> None:
    """Notify the requester of the outcome."""
    from app.services.notifications import NotificationService
    from app.repositories.topic_request_repository import TopicRequestRepository

    repo = TopicRequestRepository()
    request = await repo.get(request_id)

    notification_service = NotificationService()
    await notification_service.send(
        recipient=request.requester,
        template=f"topic_request_{status}",
        data={
            "request_id": request_id,
            "topic_name": request.full_name,
            "message": message
        }
    )


@workflow.defn
class TopicApprovalWorkflow:
    """Workflow for topic creation with approval."""

    def __init__(self):
        self.approved = False
        self.rejected = False
        self.rejection_reason = ""

    @workflow.signal
    async def approve(self, approver: str) -> None:
        """Signal to approve the request."""
        self.approved = True

    @workflow.signal
    async def reject(self, approver: str, reason: str) -> None:
        """Signal to reject the request."""
        self.rejected = True
        self.rejection_reason = reason

    @workflow.run
    async def run(self, request_id: str, approvers: List[str]) -> dict:
        """Execute the approval workflow."""

        # Step 1: Notify approvers
        await workflow.execute_activity(
            notify_approvers,
            args=[request_id, approvers],
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        # Step 2: Wait for approval (max 7 days)
        try:
            await workflow.wait_condition(
                lambda: self.approved or self.rejected,
                timeout=timedelta(days=7)
            )
        except asyncio.TimeoutError:
            # Auto-reject after timeout
            await workflow.execute_activity(
                notify_requester,
                args=[request_id, "expired", "Request expired after 7 days without approval"],
                start_to_close_timeout=timedelta(minutes=5)
            )
            return {"status": "expired", "request_id": request_id}

        # Step 3: Handle decision
        if self.rejected:
            await workflow.execute_activity(
                notify_requester,
                args=[request_id, "rejected", self.rejection_reason],
                start_to_close_timeout=timedelta(minutes=5)
            )
            return {"status": "rejected", "reason": self.rejection_reason}

        # Step 4: Provision topic
        result = await workflow.execute_activity(
            provision_topic,
            args=[request_id],
            start_to_close_timeout=timedelta(minutes=30),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        # Step 5: Notify success
        await workflow.execute_activity(
            notify_requester,
            args=[request_id, "approved", f"Topic {result['topic_name']} is ready"],
            start_to_close_timeout=timedelta(minutes=5)
        )

        return {"status": "provisioned", **result}

Gestion des Quotas

Architecture Quotas

┌─────────────────────────────────────────────────────────────────┐
│                    QUOTA MANAGEMENT                              │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                 QUOTA TIERS                               │   │
│  ├─────────────┬─────────────┬─────────────┬───────────────┤   │
│  │ Tier        │ Producer    │ Consumer    │ Request Rate  │   │
│  ├─────────────┼─────────────┼─────────────┼───────────────┤   │
│  │ Bronze      │ 5 MB/s      │ 10 MB/s     │ 100 req/s     │   │
│  │ Silver      │ 20 MB/s     │ 40 MB/s     │ 500 req/s     │   │
│  │ Gold        │ 50 MB/s     │ 100 MB/s    │ 2000 req/s    │   │
│  │ Platinum    │ Unlimited   │ Unlimited   │ Unlimited     │   │
│  └─────────────┴─────────────┴─────────────┴───────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Configuration Automatisee des Quotas

# services/quota_manager.py
from dataclasses import dataclass
from typing import Dict
from enum import Enum
from confluent_kafka.admin import AdminClient, ConfigResource

class QuotaTier(Enum):
    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"
    PLATINUM = "platinum"

@dataclass
class QuotaConfig:
    producer_byte_rate: int  # bytes/sec
    consumer_byte_rate: int  # bytes/sec
    request_percentage: int  # CPU percentage

QUOTA_TIERS: Dict[QuotaTier, QuotaConfig] = {
    QuotaTier.BRONZE: QuotaConfig(
        producer_byte_rate=5 * 1024 * 1024,      # 5 MB/s
        consumer_byte_rate=10 * 1024 * 1024,     # 10 MB/s
        request_percentage=10
    ),
    QuotaTier.SILVER: QuotaConfig(
        producer_byte_rate=20 * 1024 * 1024,     # 20 MB/s
        consumer_byte_rate=40 * 1024 * 1024,     # 40 MB/s
        request_percentage=25
    ),
    QuotaTier.GOLD: QuotaConfig(
        producer_byte_rate=50 * 1024 * 1024,     # 50 MB/s
        consumer_byte_rate=100 * 1024 * 1024,    # 100 MB/s
        request_percentage=50
    ),
    QuotaTier.PLATINUM: QuotaConfig(
        producer_byte_rate=None,  # Unlimited
        consumer_byte_rate=None,  # Unlimited
        request_percentage=None   # Unlimited
    )
}


class QuotaManager:
    def __init__(self, bootstrap_servers: str):
        self.admin_client = AdminClient({
            'bootstrap.servers': bootstrap_servers
        })

    async def set_user_quota(self, user: str, tier: QuotaTier) -> None:
        """Set quota for a user based on their tier."""
        quota_config = QUOTA_TIERS[tier]

        configs = {}
        if quota_config.producer_byte_rate:
            configs['producer_byte_rate'] = str(quota_config.producer_byte_rate)
        if quota_config.consumer_byte_rate:
            configs['consumer_byte_rate'] = str(quota_config.consumer_byte_rate)
        if quota_config.request_percentage:
            configs['request_percentage'] = str(quota_config.request_percentage)

        # Apply quota configuration
        # Using kafka-configs equivalent
        import subprocess
        cmd = [
            'kafka-configs.sh',
            '--bootstrap-server', self.admin_client._conf['bootstrap.servers'],
            '--alter',
            '--entity-type', 'users',
            '--entity-name', user,
            '--add-config', ','.join(f'{k}={v}' for k, v in configs.items())
        ]
        subprocess.run(cmd, check=True)

    async def get_user_quota(self, user: str) -> Dict:
        """Get current quota for a user."""
        import subprocess
        cmd = [
            'kafka-configs.sh',
            '--bootstrap-server', self.admin_client._conf['bootstrap.servers'],
            '--describe',
            '--entity-type', 'users',
            '--entity-name', user
        ]
        result = subprocess.run(cmd, capture_output=True, text=True)
        # Parse output and return quota info
        return self._parse_quota_output(result.stdout)

    async def check_quota_usage(self, user: str) -> Dict:
        """Check current quota usage via JMX metrics."""
        # Query Prometheus for quota metrics
        from app.services.prometheus_client import PrometheusClient
        prom = PrometheusClient()

        producer_rate = await prom.query(
            f'kafka_server_fetchsessioncache_numincrementalsessions{{user="{user}"}}'
        )
        consumer_rate = await prom.query(
            f'kafka_server_brokertopicmetrics_bytesinpersec{{user="{user}"}}'
        )

        return {
            "user": user,
            "producer_rate_bytes": producer_rate,
            "consumer_rate_bytes": consumer_rate
        }

Monitoring et Observabilite

Dashboard Self-Service

┌─────────────────────────────────────────────────────────────────┐
│                    MY TOPICS DASHBOARD                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Topics Owned: 12          Active Consumers: 8                  │
│  Total Partitions: 72      Avg Lag: 234                         │
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │ Topic                    │ Msgs/s │ Lag    │ Status       │  │
│  ├──────────────────────────┼────────┼────────┼──────────────┤  │
│  │ banking.payments.orders  │ 1,234  │ 45     │ Healthy      │  │
│  │ banking.payments.refunds │ 456    │ 12     │ Healthy      │  │
│  │ banking.fraud.alerts     │ 89     │ 2,345  │ Warning      │  │
│  │ analytics.events.clicks  │ 5,678  │ 0      │ Healthy      │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
│  Quota Usage (Silver Tier)                                      │
│  Producer: ████████░░░░ 67%  │  Consumer: ███░░░░░░░░░ 23%     │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Alertes Self-Service

# alert_rules.yaml (user-configurable)
apiVersion: v1
kind: TopicAlertRule
metadata:
  name: high-lag-alert
  owner: payments-team
spec:
  topics:
    - banking.payments.orders
    - banking.payments.refunds
  conditions:
    - metric: consumer_lag
      operator: ">"
      threshold: 1000
      duration: 5m
  actions:
    - type: slack
      channel: "#payments-alerts"
    - type: pagerduty
      severity: warning

Approche GitOps

Topic Definitions as Code

# topics/banking/payments/orders.yaml
apiVersion: kafka.company.com/v1
kind: Topic
metadata:
  name: banking.payments.orders
  labels:
    domain: banking
    subdomain: payments
    owner: payments-team
    criticality: critical
spec:
  partitions: 12
  replicationFactor: 3
  config:
    retention.ms: "2592000000"  # 30 days
    cleanup.policy: "delete"
    min.insync.replicas: "2"
    compression.type: "lz4"
  schema:
    type: avro
    compatibility: BACKWARD
    file: ./schemas/order-event.avsc
  acls:
    - principal: "User:payments-service"
      operations: [READ, WRITE]
    - principal: "User:analytics-service"
      operations: [READ]
    - principal: "User:audit-service"
      operations: [READ]

CI/CD Pipeline

# .github/workflows/kafka-topics.yml
name: Kafka Topics Deployment

on:
  push:
    branches: [main]
    paths:
      - 'topics/**'
  pull_request:
    branches: [main]
    paths:
      - 'topics/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Validate topic definitions
        run: |
          kafka-gitops validate --directory topics/

      - name: Check schema compatibility
        run: |
          kafka-gitops schema-check --directory topics/

  plan:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Generate plan
        run: |
          kafka-gitops plan --directory topics/ --output plan.json

      - name: Comment PR with plan
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const plan = require('./plan.json');
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: '## Kafka Changes Plan\n```json\n' + JSON.stringify(plan, null, 2) + '\n```'
            });

  apply:
    needs: plan
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v3

      - name: Apply topic changes
        run: |
          kafka-gitops apply --directory topics/ --environment prod
        env:
          KAFKA_BOOTSTRAP_SERVERS: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS }}
          KAFKA_SASL_USERNAME: ${{ secrets.KAFKA_SASL_USERNAME }}
          KAFKA_SASL_PASSWORD: ${{ secrets.KAFKA_SASL_PASSWORD }}

A Retenir

Benefices du Topic-as-a-Service

Metrique Avant Apres Amelioration
Delai de provisioning 3-5 jours 5 minutes 99%
Charge equipe plateforme 40% tickets 5% exceptions 87%
Conformite nommage 60% 100% 40%
Topics documentes 30% 100% 70%

Facteurs Cles de Succes

1. Experience Developpeur

  • Interface intuitive
  • Documentation integree
  • Feedback immediat sur les erreurs

2. Gouvernance Automatisee

  • Validation des conventions
  • Approbations contextuelles
  • Audit trail complet

3. Observabilite Native

  • Metriques en temps reel
  • Alertes self-service
  • Dashboards personnalises

4. Evolution Continue

  • Ecouter le feedback utilisateur
  • Iterer sur les workflows
  • Automatiser les cas recurrents

Passez a l'Action

Vous souhaitez mettre en place une plateforme Topic-as-a-Service dans votre organisation ? Je peux vous accompagner :

  • Audit de votre organisation Kafka actuelle
  • Design d'architecture TaaS adaptee a votre contexte
  • Implementation et deploiement de la plateforme
  • Formation de vos equipes plateforme et developpeurs

Contactez-moi pour discuter de votre projet


Cet article fait partie d'une serie sur l'industrialisation de Kafka en entreprise. Consultez egalement mon retour d'experience sur 5 ans de Kafka au Credit Agricole et ma methodologie d'audit Kafka.

F

Florian Courouge

Expert DevOps & Kafka | Consultant freelance specialise dans les architectures distribuees et le streaming de donnees.

Articles similaires