Topic-as-a-Service: Industrialiser Kafka pour les Equipes
A mesure que l'adoption de Kafka grandit dans une organisation, la gestion manuelle des topics devient un goulot d'etranglement. Les equipes attendent des jours pour obtenir un topic, les conventions ne sont pas respectees, et l'equipe plateforme croule sous les demandes. La solution ? Topic-as-a-Service (TaaS) : une approche self-service qui democratise l'acces a Kafka tout en maintenant gouvernance et securite.
Pourquoi Topic-as-a-Service ?
Les Douleurs du Mode "Ticket"
┌─────────────────────────────────────────────────────────────────┐
│ WORKFLOW TRADITIONNEL (A EVITER) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Dev Team Ticket System Platform Team Kafka │
│ │ │ │ │ │
│ │── Create Ticket ▶ │ │ │
│ │ │── Assign ─────────▶│ │ │
│ │ │ │── Review ──│ │
│ │ │ │ │ │
│ │ │◀── Questions ──────│ │ │
│ │◀── Questions ───│ │ │ │
│ │── Answers ──────▶ │ │ │
│ │ │── Forward ────────▶│ │ │
│ │ │ │── Create ──▶ │
│ │ │◀── Done ───────────│ │ │
│ │◀── Resolved ────│ │ │ │
│ │ │
│ │ Delai moyen : 3-5 jours │
│ │
└─────────────────────────────────────────────────────────────────┘
Problemes identifies :
- Delai de provisioning : 3-5 jours en moyenne
- Charge sur l'equipe plateforme : 40% du temps en tickets
- Inconsistances : nommage, partitioning, retention varies
- Pas de visibilite : qui utilise quoi ?
La Vision Topic-as-a-Service
┌─────────────────────────────────────────────────────────────────┐
│ WORKFLOW TOPIC-AS-A-SERVICE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Dev Team TaaS Portal Automation Kafka │
│ │ │ │ │ │
│ │── Request ─────▶│ │ │ │
│ │ │── Validate ───────▶│ │ │
│ │ │◀── OK ─────────────│ │ │
│ │ │── Create Topic ────────────────▶│ │
│ │ │── Configure ACLs ──────────────▶│ │
│ │ │── Set Quotas ─────────────────▶│ │
│ │◀── Ready ───────│ │ │ │
│ │ │
│ │ Delai moyen : 5 minutes │
│ │
└─────────────────────────────────────────────────────────────────┘
Architecture de la Plateforme TaaS
Vue d'Ensemble
┌─────────────────────────────────────────────────────────────────┐
│ TOPIC-AS-A-SERVICE PLATFORM │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SELF-SERVICE PORTAL │ │
│ │ - Topic Request Form │ │
│ │ - Schema Designer │ │
│ │ - Topic Catalog Browser │ │
│ │ - Usage Dashboard │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ API GATEWAY │ │
│ │ - Authentication (OAuth2/OIDC) │ │
│ │ - Rate Limiting │ │
│ │ - Request Validation │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PROVISIONING ENGINE │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Validator │ │ Approver │ │ Executor │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Kafka │ │ Schema │ │ IAM │ │
│ │ Clusters │ │ Registry │ │ (ACLs) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Composants Cles
| Composant | Role | Technologies |
|---|---|---|
| Portal | Interface utilisateur self-service | React, Vue.js |
| API Gateway | Authentification, rate limiting | Kong, Traefik |
| Provisioning Engine | Orchestration des creations | Go, Python, Java |
| Topic Catalog | Inventaire et metadata | PostgreSQL, Elasticsearch |
| Approval Workflow | Validation automatique/manuelle | Temporal, Argo Workflows |
Design de l'API TaaS
Specification OpenAPI
# openapi.yaml
openapi: 3.0.3
info:
title: Topic-as-a-Service API
version: 1.0.0
description: Self-service Kafka topic provisioning API
paths:
/api/v1/topics:
post:
summary: Request a new topic
operationId: createTopicRequest
security:
- oauth2: [topics:write]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/TopicRequest'
responses:
'202':
description: Request accepted
content:
application/json:
schema:
$ref: '#/components/schemas/TopicRequestResponse'
'400':
description: Validation error
'403':
description: Insufficient permissions
get:
summary: List topics owned by requester
operationId: listTopics
security:
- oauth2: [topics:read]
parameters:
- name: owner
in: query
schema:
type: string
- name: environment
in: query
schema:
type: string
enum: [dev, staging, prod]
responses:
'200':
description: List of topics
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Topic'
/api/v1/topics/{topicName}:
get:
summary: Get topic details
operationId: getTopic
parameters:
- name: topicName
in: path
required: true
schema:
type: string
responses:
'200':
description: Topic details
content:
application/json:
schema:
$ref: '#/components/schemas/TopicDetails'
delete:
summary: Request topic deletion
operationId: deleteTopic
responses:
'202':
description: Deletion request accepted
/api/v1/topics/{topicName}/acls:
post:
summary: Grant access to topic
operationId: grantAccess
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ACLRequest'
responses:
'200':
description: Access granted
components:
schemas:
TopicRequest:
type: object
required:
- name
- domain
- owner
- environment
- description
properties:
name:
type: string
pattern: '^[a-z][a-z0-9-]*$'
minLength: 3
maxLength: 64
description: Topic name (without domain prefix)
domain:
type: string
enum: [banking, marketing, analytics, operations]
subdomain:
type: string
owner:
type: string
description: Team or service account owner
environment:
type: string
enum: [dev, staging, prod]
description:
type: string
minLength: 20
partitions:
type: integer
minimum: 1
maximum: 100
default: 6
replicationFactor:
type: integer
enum: [1, 2, 3]
default: 3
retentionMs:
type: integer
description: Retention in milliseconds
default: 604800000 # 7 days
schemaType:
type: string
enum: [avro, json, protobuf, none]
default: avro
schema:
type: string
description: Avro/JSON/Protobuf schema definition
classification:
type: string
enum: [public, internal, confidential, restricted]
default: internal
criticality:
type: string
enum: [low, medium, high, critical]
default: medium
TopicRequestResponse:
type: object
properties:
requestId:
type: string
format: uuid
status:
type: string
enum: [pending, approved, provisioning, ready, rejected]
topicName:
type: string
estimatedReadyTime:
type: string
format: date-time
approvalRequired:
type: boolean
TopicDetails:
type: object
properties:
name:
type: string
fullName:
type: string
description: Full topic name with domain prefix
partitions:
type: integer
replicationFactor:
type: integer
config:
type: object
additionalProperties:
type: string
owner:
type: string
createdAt:
type: string
format: date-time
metrics:
$ref: '#/components/schemas/TopicMetrics'
TopicMetrics:
type: object
properties:
messagesPerSecond:
type: number
bytesPerSecond:
type: number
consumerGroups:
type: integer
avgLag:
type: integer
Implementation Python (FastAPI)
# app/api/topics.py
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from enum import Enum
import uuid
from datetime import datetime, timedelta
from app.auth import get_current_user, User
from app.services.kafka_provisioner import KafkaProvisioner
from app.services.schema_registry import SchemaRegistryClient
from app.services.approval_workflow import ApprovalWorkflow
from app.models import TopicRequest, TopicStatus
router = APIRouter(prefix="/api/v1/topics", tags=["topics"])
class Environment(str, Enum):
DEV = "dev"
STAGING = "staging"
PROD = "prod"
class Domain(str, Enum):
BANKING = "banking"
MARKETING = "marketing"
ANALYTICS = "analytics"
OPERATIONS = "operations"
class Criticality(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class TopicRequestModel(BaseModel):
name: str = Field(..., min_length=3, max_length=64, regex="^[a-z][a-z0-9-]*$")
domain: Domain
subdomain: Optional[str] = None
owner: str
environment: Environment
description: str = Field(..., min_length=20)
partitions: int = Field(default=6, ge=1, le=100)
replication_factor: int = Field(default=3, ge=1, le=3)
retention_ms: int = Field(default=604800000) # 7 days
schema_type: str = Field(default="avro")
schema_definition: Optional[str] = None
criticality: Criticality = Criticality.MEDIUM
@validator('replication_factor')
def validate_replication(cls, v, values):
if values.get('environment') == Environment.PROD and v < 3:
raise ValueError('Production topics must have replication factor >= 3')
return v
@validator('partitions')
def validate_partitions(cls, v, values):
if values.get('criticality') == Criticality.CRITICAL and v < 6:
raise ValueError('Critical topics must have at least 6 partitions')
return v
@router.post("/", status_code=202)
async def create_topic_request(
request: TopicRequestModel,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
provisioner: KafkaProvisioner = Depends(),
approval_workflow: ApprovalWorkflow = Depends()
):
"""
Submit a new topic creation request.
Topics in dev/staging are auto-approved.
Production topics require approval for critical/high criticality.
"""
# Generate full topic name
full_topic_name = generate_topic_name(request)
# Check if topic already exists
if await provisioner.topic_exists(full_topic_name, request.environment):
raise HTTPException(status_code=409, detail="Topic already exists")
# Create request record
request_id = str(uuid.uuid4())
topic_request = TopicRequest(
id=request_id,
full_name=full_topic_name,
requester=current_user.username,
request_data=request.dict(),
status=TopicStatus.PENDING,
created_at=datetime.utcnow()
)
# Determine if approval is required
requires_approval = (
request.environment == Environment.PROD and
request.criticality in [Criticality.HIGH, Criticality.CRITICAL]
)
if requires_approval:
# Submit to approval workflow
await approval_workflow.submit(topic_request)
return {
"request_id": request_id,
"status": "pending_approval",
"topic_name": full_topic_name,
"approval_required": True,
"estimated_ready_time": (datetime.utcnow() + timedelta(hours=24)).isoformat()
}
else:
# Auto-approve and provision in background
background_tasks.add_task(
provision_topic,
topic_request,
provisioner
)
return {
"request_id": request_id,
"status": "provisioning",
"topic_name": full_topic_name,
"approval_required": False,
"estimated_ready_time": (datetime.utcnow() + timedelta(minutes=5)).isoformat()
}
def generate_topic_name(request: TopicRequestModel) -> str:
"""Generate standardized topic name."""
parts = [request.domain.value]
if request.subdomain:
parts.append(request.subdomain)
parts.append(request.name)
return ".".join(parts)
async def provision_topic(
topic_request: TopicRequest,
provisioner: KafkaProvisioner
):
"""Background task to provision topic."""
try:
# Create topic
await provisioner.create_topic(
name=topic_request.full_name,
partitions=topic_request.request_data['partitions'],
replication_factor=topic_request.request_data['replication_factor'],
config={
'retention.ms': str(topic_request.request_data['retention_ms']),
'cleanup.policy': 'delete'
},
environment=topic_request.request_data['environment']
)
# Register schema if provided
if topic_request.request_data.get('schema_definition'):
schema_client = SchemaRegistryClient()
await schema_client.register_schema(
subject=f"{topic_request.full_name}-value",
schema=topic_request.request_data['schema_definition'],
schema_type=topic_request.request_data['schema_type']
)
# Set up ACLs
await provisioner.create_acls(
topic=topic_request.full_name,
owner=topic_request.request_data['owner'],
environment=topic_request.request_data['environment']
)
# Update request status
topic_request.status = TopicStatus.READY
topic_request.provisioned_at = datetime.utcnow()
except Exception as e:
topic_request.status = TopicStatus.FAILED
topic_request.error_message = str(e)
raise
@router.get("/{topic_name}")
async def get_topic(
topic_name: str,
environment: Environment,
current_user: User = Depends(get_current_user),
provisioner: KafkaProvisioner = Depends()
):
"""Get topic details including metrics."""
topic = await provisioner.get_topic_details(topic_name, environment)
if not topic:
raise HTTPException(status_code=404, detail="Topic not found")
# Get metrics
metrics = await provisioner.get_topic_metrics(topic_name, environment)
return {
"name": topic_name,
"environment": environment,
"partitions": topic.partitions,
"replication_factor": topic.replication_factor,
"config": topic.config,
"metrics": metrics
}
@router.get("/")
async def list_topics(
owner: Optional[str] = None,
environment: Optional[Environment] = None,
domain: Optional[Domain] = None,
current_user: User = Depends(get_current_user),
provisioner: KafkaProvisioner = Depends()
):
"""List topics with optional filters."""
filters = {}
if owner:
filters['owner'] = owner
if environment:
filters['environment'] = environment.value
if domain:
filters['domain'] = domain.value
topics = await provisioner.list_topics(filters)
return topics
Workflows d'Approbation
Matrice d'Approbation
┌─────────────────────────────────────────────────────────────────┐
│ APPROVAL MATRIX │
├─────────────────┬─────────────┬─────────────┬───────────────────┤
│ Criticality │ Dev │ Staging │ Production │
├─────────────────┼─────────────┼─────────────┼───────────────────┤
│ Low │ Auto │ Auto │ Auto │
│ Medium │ Auto │ Auto │ Team Lead │
│ High │ Auto │ Team Lead │ Platform + Data │
│ Critical │ Team Lead │ Platform │ Platform + CISO │
└─────────────────┴─────────────┴─────────────┴───────────────────┘
Implementation Temporal Workflow
# workflows/approval_workflow.py
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from datetime import timedelta
from typing import List
import asyncio
@activity.defn
async def notify_approvers(request_id: str, approvers: List[str]) -> None:
"""Send notification to approvers via Slack/Email."""
from app.services.notifications import NotificationService
notification_service = NotificationService()
for approver in approvers:
await notification_service.send(
recipient=approver,
template="topic_approval_request",
data={
"request_id": request_id,
"approval_url": f"https://kafka-portal.company.com/approvals/{request_id}"
}
)
@activity.defn
async def check_approval_status(request_id: str) -> dict:
"""Check if request has been approved/rejected."""
from app.repositories.approval_repository import ApprovalRepository
repo = ApprovalRepository()
return await repo.get_status(request_id)
@activity.defn
async def provision_topic(request_id: str) -> dict:
"""Provision the approved topic."""
from app.services.kafka_provisioner import KafkaProvisioner
from app.repositories.topic_request_repository import TopicRequestRepository
repo = TopicRequestRepository()
request = await repo.get(request_id)
provisioner = KafkaProvisioner()
result = await provisioner.provision(request)
return {"topic_name": result.topic_name, "status": "provisioned"}
@activity.defn
async def notify_requester(request_id: str, status: str, message: str) -> None:
"""Notify the requester of the outcome."""
from app.services.notifications import NotificationService
from app.repositories.topic_request_repository import TopicRequestRepository
repo = TopicRequestRepository()
request = await repo.get(request_id)
notification_service = NotificationService()
await notification_service.send(
recipient=request.requester,
template=f"topic_request_{status}",
data={
"request_id": request_id,
"topic_name": request.full_name,
"message": message
}
)
@workflow.defn
class TopicApprovalWorkflow:
"""Workflow for topic creation with approval."""
def __init__(self):
self.approved = False
self.rejected = False
self.rejection_reason = ""
@workflow.signal
async def approve(self, approver: str) -> None:
"""Signal to approve the request."""
self.approved = True
@workflow.signal
async def reject(self, approver: str, reason: str) -> None:
"""Signal to reject the request."""
self.rejected = True
self.rejection_reason = reason
@workflow.run
async def run(self, request_id: str, approvers: List[str]) -> dict:
"""Execute the approval workflow."""
# Step 1: Notify approvers
await workflow.execute_activity(
notify_approvers,
args=[request_id, approvers],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Step 2: Wait for approval (max 7 days)
try:
await workflow.wait_condition(
lambda: self.approved or self.rejected,
timeout=timedelta(days=7)
)
except asyncio.TimeoutError:
# Auto-reject after timeout
await workflow.execute_activity(
notify_requester,
args=[request_id, "expired", "Request expired after 7 days without approval"],
start_to_close_timeout=timedelta(minutes=5)
)
return {"status": "expired", "request_id": request_id}
# Step 3: Handle decision
if self.rejected:
await workflow.execute_activity(
notify_requester,
args=[request_id, "rejected", self.rejection_reason],
start_to_close_timeout=timedelta(minutes=5)
)
return {"status": "rejected", "reason": self.rejection_reason}
# Step 4: Provision topic
result = await workflow.execute_activity(
provision_topic,
args=[request_id],
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Step 5: Notify success
await workflow.execute_activity(
notify_requester,
args=[request_id, "approved", f"Topic {result['topic_name']} is ready"],
start_to_close_timeout=timedelta(minutes=5)
)
return {"status": "provisioned", **result}
Gestion des Quotas
Architecture Quotas
┌─────────────────────────────────────────────────────────────────┐
│ QUOTA MANAGEMENT │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ QUOTA TIERS │ │
│ ├─────────────┬─────────────┬─────────────┬───────────────┤ │
│ │ Tier │ Producer │ Consumer │ Request Rate │ │
│ ├─────────────┼─────────────┼─────────────┼───────────────┤ │
│ │ Bronze │ 5 MB/s │ 10 MB/s │ 100 req/s │ │
│ │ Silver │ 20 MB/s │ 40 MB/s │ 500 req/s │ │
│ │ Gold │ 50 MB/s │ 100 MB/s │ 2000 req/s │ │
│ │ Platinum │ Unlimited │ Unlimited │ Unlimited │ │
│ └─────────────┴─────────────┴─────────────┴───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Configuration Automatisee des Quotas
# services/quota_manager.py
from dataclasses import dataclass
from typing import Dict
from enum import Enum
from confluent_kafka.admin import AdminClient, ConfigResource
class QuotaTier(Enum):
BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
PLATINUM = "platinum"
@dataclass
class QuotaConfig:
producer_byte_rate: int # bytes/sec
consumer_byte_rate: int # bytes/sec
request_percentage: int # CPU percentage
QUOTA_TIERS: Dict[QuotaTier, QuotaConfig] = {
QuotaTier.BRONZE: QuotaConfig(
producer_byte_rate=5 * 1024 * 1024, # 5 MB/s
consumer_byte_rate=10 * 1024 * 1024, # 10 MB/s
request_percentage=10
),
QuotaTier.SILVER: QuotaConfig(
producer_byte_rate=20 * 1024 * 1024, # 20 MB/s
consumer_byte_rate=40 * 1024 * 1024, # 40 MB/s
request_percentage=25
),
QuotaTier.GOLD: QuotaConfig(
producer_byte_rate=50 * 1024 * 1024, # 50 MB/s
consumer_byte_rate=100 * 1024 * 1024, # 100 MB/s
request_percentage=50
),
QuotaTier.PLATINUM: QuotaConfig(
producer_byte_rate=None, # Unlimited
consumer_byte_rate=None, # Unlimited
request_percentage=None # Unlimited
)
}
class QuotaManager:
def __init__(self, bootstrap_servers: str):
self.admin_client = AdminClient({
'bootstrap.servers': bootstrap_servers
})
async def set_user_quota(self, user: str, tier: QuotaTier) -> None:
"""Set quota for a user based on their tier."""
quota_config = QUOTA_TIERS[tier]
configs = {}
if quota_config.producer_byte_rate:
configs['producer_byte_rate'] = str(quota_config.producer_byte_rate)
if quota_config.consumer_byte_rate:
configs['consumer_byte_rate'] = str(quota_config.consumer_byte_rate)
if quota_config.request_percentage:
configs['request_percentage'] = str(quota_config.request_percentage)
# Apply quota configuration
# Using kafka-configs equivalent
import subprocess
cmd = [
'kafka-configs.sh',
'--bootstrap-server', self.admin_client._conf['bootstrap.servers'],
'--alter',
'--entity-type', 'users',
'--entity-name', user,
'--add-config', ','.join(f'{k}={v}' for k, v in configs.items())
]
subprocess.run(cmd, check=True)
async def get_user_quota(self, user: str) -> Dict:
"""Get current quota for a user."""
import subprocess
cmd = [
'kafka-configs.sh',
'--bootstrap-server', self.admin_client._conf['bootstrap.servers'],
'--describe',
'--entity-type', 'users',
'--entity-name', user
]
result = subprocess.run(cmd, capture_output=True, text=True)
# Parse output and return quota info
return self._parse_quota_output(result.stdout)
async def check_quota_usage(self, user: str) -> Dict:
"""Check current quota usage via JMX metrics."""
# Query Prometheus for quota metrics
from app.services.prometheus_client import PrometheusClient
prom = PrometheusClient()
producer_rate = await prom.query(
f'kafka_server_fetchsessioncache_numincrementalsessions{{user="{user}"}}'
)
consumer_rate = await prom.query(
f'kafka_server_brokertopicmetrics_bytesinpersec{{user="{user}"}}'
)
return {
"user": user,
"producer_rate_bytes": producer_rate,
"consumer_rate_bytes": consumer_rate
}
Monitoring et Observabilite
Dashboard Self-Service
┌─────────────────────────────────────────────────────────────────┐
│ MY TOPICS DASHBOARD │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topics Owned: 12 Active Consumers: 8 │
│ Total Partitions: 72 Avg Lag: 234 │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Topic │ Msgs/s │ Lag │ Status │ │
│ ├──────────────────────────┼────────┼────────┼──────────────┤ │
│ │ banking.payments.orders │ 1,234 │ 45 │ Healthy │ │
│ │ banking.payments.refunds │ 456 │ 12 │ Healthy │ │
│ │ banking.fraud.alerts │ 89 │ 2,345 │ Warning │ │
│ │ analytics.events.clicks │ 5,678 │ 0 │ Healthy │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ Quota Usage (Silver Tier) │
│ Producer: ████████░░░░ 67% │ Consumer: ███░░░░░░░░░ 23% │
│ │
└─────────────────────────────────────────────────────────────────┘
Alertes Self-Service
# alert_rules.yaml (user-configurable)
apiVersion: v1
kind: TopicAlertRule
metadata:
name: high-lag-alert
owner: payments-team
spec:
topics:
- banking.payments.orders
- banking.payments.refunds
conditions:
- metric: consumer_lag
operator: ">"
threshold: 1000
duration: 5m
actions:
- type: slack
channel: "#payments-alerts"
- type: pagerduty
severity: warning
Approche GitOps
Topic Definitions as Code
# topics/banking/payments/orders.yaml
apiVersion: kafka.company.com/v1
kind: Topic
metadata:
name: banking.payments.orders
labels:
domain: banking
subdomain: payments
owner: payments-team
criticality: critical
spec:
partitions: 12
replicationFactor: 3
config:
retention.ms: "2592000000" # 30 days
cleanup.policy: "delete"
min.insync.replicas: "2"
compression.type: "lz4"
schema:
type: avro
compatibility: BACKWARD
file: ./schemas/order-event.avsc
acls:
- principal: "User:payments-service"
operations: [READ, WRITE]
- principal: "User:analytics-service"
operations: [READ]
- principal: "User:audit-service"
operations: [READ]
CI/CD Pipeline
# .github/workflows/kafka-topics.yml
name: Kafka Topics Deployment
on:
push:
branches: [main]
paths:
- 'topics/**'
pull_request:
branches: [main]
paths:
- 'topics/**'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Validate topic definitions
run: |
kafka-gitops validate --directory topics/
- name: Check schema compatibility
run: |
kafka-gitops schema-check --directory topics/
plan:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Generate plan
run: |
kafka-gitops plan --directory topics/ --output plan.json
- name: Comment PR with plan
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const plan = require('./plan.json');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: '## Kafka Changes Plan\n```json\n' + JSON.stringify(plan, null, 2) + '\n```'
});
apply:
needs: plan
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v3
- name: Apply topic changes
run: |
kafka-gitops apply --directory topics/ --environment prod
env:
KAFKA_BOOTSTRAP_SERVERS: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS }}
KAFKA_SASL_USERNAME: ${{ secrets.KAFKA_SASL_USERNAME }}
KAFKA_SASL_PASSWORD: ${{ secrets.KAFKA_SASL_PASSWORD }}
A Retenir
Benefices du Topic-as-a-Service
| Metrique | Avant | Apres | Amelioration |
|---|---|---|---|
| Delai de provisioning | 3-5 jours | 5 minutes | 99% |
| Charge equipe plateforme | 40% tickets | 5% exceptions | 87% |
| Conformite nommage | 60% | 100% | 40% |
| Topics documentes | 30% | 100% | 70% |
Facteurs Cles de Succes
1. Experience Developpeur
- Interface intuitive
- Documentation integree
- Feedback immediat sur les erreurs
2. Gouvernance Automatisee
- Validation des conventions
- Approbations contextuelles
- Audit trail complet
3. Observabilite Native
- Metriques en temps reel
- Alertes self-service
- Dashboards personnalises
4. Evolution Continue
- Ecouter le feedback utilisateur
- Iterer sur les workflows
- Automatiser les cas recurrents
Passez a l'Action
Vous souhaitez mettre en place une plateforme Topic-as-a-Service dans votre organisation ? Je peux vous accompagner :
- Audit de votre organisation Kafka actuelle
- Design d'architecture TaaS adaptee a votre contexte
- Implementation et deploiement de la plateforme
- Formation de vos equipes plateforme et developpeurs
Contactez-moi pour discuter de votre projet
Cet article fait partie d'une serie sur l'industrialisation de Kafka en entreprise. Consultez egalement mon retour d'experience sur 5 ans de Kafka au Credit Agricole et ma methodologie d'audit Kafka.