Premier Producer/Consumer Kafka en Python
Python est l'un des langages les plus populaires pour interagir avec Apache Kafka, que ce soit pour du data engineering, du machine learning ou des microservices. Ce tutoriel vous guide pas a pas pour creer votre premier producer et consumer Kafka en Python avec la bibliotheque confluent-kafka.
Prerequis
Avant de commencer, assurez-vous d'avoir :
- Python 3.8+ installe
- Docker (pour lancer Kafka facilement)
- Un editeur de code (VSCode, PyCharm, etc.)
Verification de l'environnement
# Verifier Python
python --version
# Python 3.10.12
# Verifier pip
pip --version
# pip 23.0.1
Etape 1 : Lancer Kafka avec Docker
Pour ce tutoriel, nous utilisons Docker Compose pour demarrer un cluster Kafka local.
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Interface web pour visualiser Kafka (optionnel mais utile)
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Demarrer Kafka
# Demarrer les services
docker-compose up -d
# Verifier que tout fonctionne
docker-compose ps
# Resultat attendu:
# NAME STATUS
# zookeeper Up
# kafka Up
# kafka-ui Up
# Attendre quelques secondes que Kafka soit pret
sleep 10
# Creer un topic de test
docker exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic mon-premier-topic \
--partitions 3 \
--replication-factor 1
# Lister les topics
docker exec kafka kafka-topics --list \
--bootstrap-server localhost:9092
Vous pouvez maintenant acceder a l'interface Kafka UI sur http://localhost:8080
Etape 2 : Configuration du Projet Python
Creation du projet
# Creer le repertoire du projet
mkdir kafka-python-demo
cd kafka-python-demo
# Creer un environnement virtuel
python -m venv venv
# Activer l'environnement
# Linux/Mac:
source venv/bin/activate
# Windows:
# venv\Scripts\activate
# Installer les dependances
pip install confluent-kafka python-dotenv
Structure du projet
kafka-python-demo/
├── venv/
├── .env
├── config.py
├── producer.py
├── consumer.py
├── producer_advanced.py
├── consumer_advanced.py
└── requirements.txt
requirements.txt
confluent-kafka>=2.3.0
python-dotenv>=1.0.0
.env
# Configuration Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9093
KAFKA_TOPIC=mon-premier-topic
# Configuration optionnelle
KAFKA_GROUP_ID=mon-groupe-python
config.py
"""Configuration centralisee pour Kafka."""
import os
from dotenv import load_dotenv
load_dotenv()
# Configuration de base
BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9093')
TOPIC = os.getenv('KAFKA_TOPIC', 'mon-premier-topic')
GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'mon-groupe-python')
# Configuration Producer
PRODUCER_CONFIG = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'client.id': 'python-producer-demo',
}
# Configuration Consumer
CONSUMER_CONFIG = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'group.id': GROUP_ID,
'auto.offset.reset': 'earliest', # Lire depuis le debut si nouveau groupe
}
print(f"Configuration Kafka:")
print(f" Bootstrap: {BOOTSTRAP_SERVERS}")
print(f" Topic: {TOPIC}")
print(f" Group ID: {GROUP_ID}")
Etape 3 : Premier Producer (Simple)
Commencons par un producer basique qui envoie des messages a Kafka.
producer.py
"""
Premier Producer Kafka en Python.
Envoie des messages simples vers un topic Kafka.
"""
from confluent_kafka import Producer
import json
import time
from config import PRODUCER_CONFIG, TOPIC
def delivery_callback(err, msg):
"""
Callback appele quand un message est delivre (ou echoue).
Args:
err: Erreur eventuelle
msg: Message envoye
"""
if err is not None:
print(f'Erreur de livraison: {err}')
else:
print(f'Message livre: topic={msg.topic()}, '
f'partition={msg.partition()}, '
f'offset={msg.offset()}')
def create_producer():
"""Cree et retourne un producer Kafka."""
return Producer(PRODUCER_CONFIG)
def send_message(producer, topic, key, value):
"""
Envoie un message a Kafka.
Args:
producer: Instance du Producer
topic: Nom du topic
key: Cle du message (pour partitioning)
value: Contenu du message
"""
# Serialiser en JSON si c'est un dictionnaire
if isinstance(value, dict):
value = json.dumps(value)
# Encoder en bytes
if isinstance(value, str):
value = value.encode('utf-8')
if key and isinstance(key, str):
key = key.encode('utf-8')
# Envoyer le message de maniere asynchrone
producer.produce(
topic=topic,
key=key,
value=value,
callback=delivery_callback
)
# Declencher l'envoi des messages en attente
producer.poll(0)
def main():
"""Fonction principale."""
print("=== Demarrage du Producer Kafka ===")
# Creer le producer
producer = create_producer()
try:
# Envoyer 10 messages de test
for i in range(10):
message = {
"id": i,
"message": f"Hello Kafka #{i}",
"timestamp": time.time()
}
# Utiliser l'ID comme cle (garantit l'ordre par cle)
key = f"key-{i % 3}" # 3 cles differentes pour 3 partitions
print(f"Envoi: key={key}, value={message}")
send_message(producer, TOPIC, key, message)
# Petite pause pour visualiser
time.sleep(0.5)
# Attendre que tous les messages soient envoyes
print("\nAttente de la confirmation de tous les messages...")
producer.flush(timeout=10)
print("Tous les messages ont ete envoyes!")
except KeyboardInterrupt:
print("\nInterruption par l'utilisateur")
finally:
print("Producer termine.")
if __name__ == "__main__":
main()
Execution du Producer
# Activer l'environnement virtuel si necessaire
source venv/bin/activate
# Lancer le producer
python producer.py
Resultat attendu :
Configuration Kafka:
Bootstrap: localhost:9093
Topic: mon-premier-topic
Group ID: mon-groupe-python
=== Demarrage du Producer Kafka ===
Envoi: key=key-0, value={'id': 0, 'message': 'Hello Kafka #0', 'timestamp': 1704456789.123}
Message livre: topic=mon-premier-topic, partition=1, offset=0
Envoi: key=key-1, value={'id': 1, 'message': 'Hello Kafka #1', 'timestamp': 1704456789.623}
Message livre: topic=mon-premier-topic, partition=2, offset=0
...
Etape 4 : Premier Consumer (Simple)
Maintenant, creons un consumer pour lire les messages.
consumer.py
"""
Premier Consumer Kafka en Python.
Lit les messages depuis un topic Kafka.
"""
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import sys
from config import CONSUMER_CONFIG, TOPIC
def create_consumer():
"""Cree et retourne un consumer Kafka."""
return Consumer(CONSUMER_CONFIG)
def process_message(msg):
"""
Traite un message recu.
Args:
msg: Message Kafka
Returns:
dict: Message decode
"""
# Decoder la cle
key = msg.key().decode('utf-8') if msg.key() else None
# Decoder la valeur
try:
value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError:
value = msg.value().decode('utf-8')
return {
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset(),
'key': key,
'value': value
}
def main():
"""Fonction principale."""
print("=== Demarrage du Consumer Kafka ===")
# Creer le consumer
consumer = create_consumer()
# S'abonner au topic
consumer.subscribe([TOPIC])
print(f"Abonne au topic: {TOPIC}")
print("En attente de messages... (Ctrl+C pour arreter)\n")
try:
while True:
# Attendre un message (timeout 1 seconde)
msg = consumer.poll(timeout=1.0)
if msg is None:
# Pas de message disponible
continue
if msg.error():
# Gestion des erreurs
if msg.error().code() == KafkaError._PARTITION_EOF:
# Fin de partition atteinte (normal)
print(f"Fin de partition {msg.partition()}")
else:
raise KafkaException(msg.error())
else:
# Message recu avec succes
data = process_message(msg)
print(f"Recu: partition={data['partition']}, "
f"offset={data['offset']}, "
f"key={data['key']}")
print(f" -> {data['value']}")
print()
except KeyboardInterrupt:
print("\nInterruption par l'utilisateur")
finally:
# Fermer proprement le consumer
print("Fermeture du consumer...")
consumer.close()
print("Consumer termine.")
if __name__ == "__main__":
main()
Execution du Consumer
# Dans un nouveau terminal
source venv/bin/activate
python consumer.py
Resultat attendu :
Configuration Kafka:
Bootstrap: localhost:9093
Topic: mon-premier-topic
Group ID: mon-groupe-python
=== Demarrage du Consumer Kafka ===
Abonne au topic: mon-premier-topic
En attente de messages... (Ctrl+C pour arreter)
Recu: partition=1, offset=0, key=key-0
-> {'id': 0, 'message': 'Hello Kafka #0', 'timestamp': 1704456789.123}
Recu: partition=2, offset=0, key=key-1
-> {'id': 1, 'message': 'Hello Kafka #1', 'timestamp': 1704456789.623}
...
Etape 5 : Producer Avance
Ameliorons le producer avec gestion des erreurs, retry, et batch.
producer_advanced.py
"""
Producer Kafka avance avec gestion d'erreurs, retry et metriques.
"""
from confluent_kafka import Producer, KafkaError
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional, Dict, Any
from config import BOOTSTRAP_SERVERS, TOPIC
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class ProducerMetrics:
"""Metriques du producer."""
messages_sent: int = 0
messages_failed: int = 0
total_latency_ms: float = 0.0
@property
def avg_latency_ms(self) -> float:
if self.messages_sent == 0:
return 0
return self.total_latency_ms / self.messages_sent
class KafkaProducerAdvanced:
"""Producer Kafka avec fonctionnalites avancees."""
def __init__(self, bootstrap_servers: str, **kwargs):
"""
Initialise le producer.
Args:
bootstrap_servers: Serveurs Kafka
**kwargs: Configuration additionnelle
"""
# Configuration production-ready
self.config = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'python-producer-advanced',
# Durabilite
'acks': 'all', # Attendre confirmation de tous les replicas
'enable.idempotence': True, # Eviter les doublons
# Performance
'batch.size': 16384, # 16KB par batch
'linger.ms': 5, # Attendre 5ms pour remplir le batch
'compression.type': 'snappy', # Compression
# Retry
'retries': 3,
'retry.backoff.ms': 100,
# Timeouts
'request.timeout.ms': 30000,
'delivery.timeout.ms': 120000,
**kwargs
}
self.producer = Producer(self.config)
self.metrics = ProducerMetrics()
self._pending_messages: Dict[str, float] = {} # Pour calculer la latence
logger.info(f"Producer initialise: {bootstrap_servers}")
def _delivery_callback(self, err, msg):
"""Callback de livraison avec metriques."""
msg_id = f"{msg.topic()}-{msg.partition()}-{msg.offset()}"
# Calculer la latence
start_time = self._pending_messages.pop(msg_id, time.time())
latency = (time.time() - start_time) * 1000
if err is not None:
self.metrics.messages_failed += 1
logger.error(f"Echec livraison: {err}")
else:
self.metrics.messages_sent += 1
self.metrics.total_latency_ms += latency
logger.debug(
f"Message livre: topic={msg.topic()}, "
f"partition={msg.partition()}, "
f"offset={msg.offset()}, "
f"latency={latency:.2f}ms"
)
def send(
self,
topic: str,
value: Any,
key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
) -> None:
"""
Envoie un message a Kafka.
Args:
topic: Nom du topic
value: Valeur du message (sera serialise en JSON)
key: Cle optionnelle
headers: Headers optionnels
"""
# Serialisation
if isinstance(value, (dict, list)):
value = json.dumps(value).encode('utf-8')
elif isinstance(value, str):
value = value.encode('utf-8')
if key:
key = key.encode('utf-8')
# Preparer les headers
kafka_headers = None
if headers:
kafka_headers = [(k, v.encode('utf-8')) for k, v in headers.items()]
# Marquer le temps de debut
msg_id = f"{topic}-pending-{time.time()}"
self._pending_messages[msg_id] = time.time()
# Envoyer
try:
self.producer.produce(
topic=topic,
key=key,
value=value,
headers=kafka_headers,
callback=self._delivery_callback
)
# Trigger l'envoi
self.producer.poll(0)
except BufferError:
logger.warning("Buffer plein, attente...")
self.producer.poll(1)
self.producer.produce(
topic=topic,
key=key,
value=value,
headers=kafka_headers,
callback=self._delivery_callback
)
def send_batch(self, topic: str, messages: list) -> None:
"""
Envoie un batch de messages.
Args:
topic: Nom du topic
messages: Liste de tuples (key, value) ou juste values
"""
for msg in messages:
if isinstance(msg, tuple):
key, value = msg
else:
key, value = None, msg
self.send(topic, value, key)
# Forcer l'envoi du batch
self.producer.poll(0)
def flush(self, timeout: float = 10.0) -> int:
"""
Attend que tous les messages soient envoyes.
Args:
timeout: Timeout en secondes
Returns:
Nombre de messages encore en attente
"""
return self.producer.flush(timeout)
def get_metrics(self) -> Dict:
"""Retourne les metriques du producer."""
return {
'messages_sent': self.metrics.messages_sent,
'messages_failed': self.metrics.messages_failed,
'avg_latency_ms': round(self.metrics.avg_latency_ms, 2),
'success_rate': round(
self.metrics.messages_sent /
max(1, self.metrics.messages_sent + self.metrics.messages_failed) * 100,
2
)
}
def close(self) -> None:
"""Ferme le producer proprement."""
remaining = self.flush(timeout=30)
if remaining > 0:
logger.warning(f"{remaining} messages non envoyes")
logger.info(f"Producer ferme. Metriques: {self.get_metrics()}")
def main():
"""Demonstration du producer avance."""
logger.info("=== Demo Producer Avance ===")
producer = KafkaProducerAdvanced(BOOTSTRAP_SERVERS)
try:
# Envoyer des messages individuels
for i in range(5):
message = {
"id": i,
"type": "event",
"data": f"Message avance #{i}",
"timestamp": time.time()
}
producer.send(
topic=TOPIC,
value=message,
key=f"user-{i % 2}",
headers={"source": "python-demo", "version": "2.0"}
)
time.sleep(0.1)
# Envoyer un batch
logger.info("Envoi d'un batch...")
batch = [
("batch-key-1", {"type": "batch", "index": 0}),
("batch-key-2", {"type": "batch", "index": 1}),
("batch-key-1", {"type": "batch", "index": 2}),
]
producer.send_batch(TOPIC, batch)
# Attendre la fin
producer.flush()
# Afficher les metriques
logger.info(f"Metriques finales: {producer.get_metrics()}")
except KeyboardInterrupt:
logger.info("Interruption utilisateur")
finally:
producer.close()
if __name__ == "__main__":
main()
Etape 6 : Consumer Avance
Consumer avec commit manuel, gestion d'erreurs, et Dead Letter Queue.
consumer_advanced.py
"""
Consumer Kafka avance avec commit manuel, retry et DLQ.
"""
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException
import json
import time
import logging
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from config import BOOTSTRAP_SERVERS, TOPIC, GROUP_ID
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class ConsumerMetrics:
"""Metriques du consumer."""
messages_processed: int = 0
messages_failed: int = 0
processing_time_ms: float = 0.0
@property
def avg_processing_time_ms(self) -> float:
if self.messages_processed == 0:
return 0
return self.processing_time_ms / self.messages_processed
class KafkaConsumerAdvanced:
"""Consumer Kafka avec fonctionnalites avancees."""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
topics: list,
dlq_topic: Optional[str] = None,
**kwargs
):
"""
Initialise le consumer.
Args:
bootstrap_servers: Serveurs Kafka
group_id: ID du consumer group
topics: Liste des topics a consommer
dlq_topic: Topic Dead Letter Queue (optionnel)
**kwargs: Configuration additionnelle
"""
self.topics = topics
self.dlq_topic = dlq_topic
# Configuration consumer
self.config = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
# Commit manuel pour plus de controle
'enable.auto.commit': False,
# Performance
'fetch.min.bytes': 1024, # 1KB minimum
'fetch.wait.max.ms': 500,
'max.poll.interval.ms': 300000, # 5 minutes max entre polls
# Heartbeat
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
**kwargs
}
self.consumer = Consumer(self.config)
self.metrics = ConsumerMetrics()
self._running = False
# Producer pour DLQ si configure
self._dlq_producer = None
if dlq_topic:
self._dlq_producer = Producer({
'bootstrap.servers': bootstrap_servers
})
logger.info(f"Consumer initialise: group={group_id}, topics={topics}")
def _send_to_dlq(self, msg, error: str) -> None:
"""Envoie un message en erreur vers la DLQ."""
if not self._dlq_producer or not self.dlq_topic:
logger.warning("DLQ non configuree, message perdu")
return
dlq_message = {
'original_topic': msg.topic(),
'original_partition': msg.partition(),
'original_offset': msg.offset(),
'original_key': msg.key().decode('utf-8') if msg.key() else None,
'original_value': msg.value().decode('utf-8'),
'error': error,
'timestamp': time.time()
}
self._dlq_producer.produce(
topic=self.dlq_topic,
value=json.dumps(dlq_message).encode('utf-8'),
key=msg.key()
)
self._dlq_producer.poll(0)
logger.info(f"Message envoye en DLQ: {self.dlq_topic}")
def _process_message(
self,
msg,
handler: Callable[[Dict], bool],
max_retries: int = 3
) -> bool:
"""
Traite un message avec retry.
Args:
msg: Message Kafka
handler: Fonction de traitement
max_retries: Nombre max de tentatives
Returns:
True si traite avec succes
"""
# Decoder le message
try:
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.error(f"Erreur decodage: {e}")
self._send_to_dlq(msg, f"Decode error: {e}")
return False
message_data = {
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset(),
'key': key,
'value': value,
'headers': dict(msg.headers()) if msg.headers() else {}
}
# Retry logic
last_error = None
for attempt in range(max_retries):
try:
start_time = time.time()
result = handler(message_data)
processing_time = (time.time() - start_time) * 1000
if result:
self.metrics.messages_processed += 1
self.metrics.processing_time_ms += processing_time
return True
else:
raise ValueError("Handler returned False")
except Exception as e:
last_error = str(e)
logger.warning(
f"Erreur traitement (tentative {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
time.sleep(0.5 * (attempt + 1)) # Backoff exponentiel
# Toutes les tentatives ont echoue
self.metrics.messages_failed += 1
self._send_to_dlq(msg, f"Max retries exceeded: {last_error}")
return False
def consume(
self,
handler: Callable[[Dict], bool],
batch_size: int = 100,
poll_timeout: float = 1.0
) -> None:
"""
Demarre la consommation des messages.
Args:
handler: Fonction appelee pour chaque message
batch_size: Taille du batch pour commit
poll_timeout: Timeout du poll en secondes
"""
self.consumer.subscribe(self.topics)
self._running = True
messages_since_commit = 0
logger.info(f"Demarrage consommation: {self.topics}")
try:
while self._running:
msg = self.consumer.poll(timeout=poll_timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.debug(f"Fin de partition {msg.partition()}")
else:
logger.error(f"Erreur Kafka: {msg.error()}")
continue
# Traiter le message
success = self._process_message(msg, handler)
messages_since_commit += 1
# Commit periodique
if messages_since_commit >= batch_size:
self.consumer.commit(asynchronous=False)
logger.debug(f"Commit de {messages_since_commit} messages")
messages_since_commit = 0
except KeyboardInterrupt:
logger.info("Interruption utilisateur")
finally:
# Commit final
if messages_since_commit > 0:
self.consumer.commit(asynchronous=False)
self.close()
def get_metrics(self) -> Dict:
"""Retourne les metriques du consumer."""
return {
'messages_processed': self.metrics.messages_processed,
'messages_failed': self.metrics.messages_failed,
'avg_processing_time_ms': round(self.metrics.avg_processing_time_ms, 2),
'success_rate': round(
self.metrics.messages_processed /
max(1, self.metrics.messages_processed + self.metrics.messages_failed) * 100,
2
)
}
def stop(self) -> None:
"""Arrete la consommation."""
self._running = False
def close(self) -> None:
"""Ferme le consumer proprement."""
self.consumer.close()
if self._dlq_producer:
self._dlq_producer.flush()
logger.info(f"Consumer ferme. Metriques: {self.get_metrics()}")
def message_handler(message: Dict) -> bool:
"""
Exemple de handler de message.
Args:
message: Donnees du message
Returns:
True si traitement reussi
"""
logger.info(
f"Traitement: partition={message['partition']}, "
f"offset={message['offset']}, "
f"key={message['key']}"
)
logger.info(f" Contenu: {message['value']}")
# Simuler un traitement
time.sleep(0.1)
# Simuler une erreur aleatoire (pour demo retry)
# if message['value'].get('id', 0) % 5 == 4:
# raise ValueError("Erreur simulee")
return True
def main():
"""Demonstration du consumer avance."""
logger.info("=== Demo Consumer Avance ===")
# Creer le topic DLQ si necessaire
dlq_topic = f"{TOPIC}-dlq"
consumer = KafkaConsumerAdvanced(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP_ID,
topics=[TOPIC],
dlq_topic=dlq_topic
)
try:
consumer.consume(
handler=message_handler,
batch_size=10,
poll_timeout=1.0
)
except Exception as e:
logger.error(f"Erreur fatale: {e}")
finally:
logger.info(f"Metriques finales: {consumer.get_metrics()}")
if __name__ == "__main__":
main()
Etape 7 : Test Complet
Script de test end-to-end
# test_kafka.py
"""Test end-to-end du producer et consumer."""
import threading
import time
from producer_advanced import KafkaProducerAdvanced
from consumer_advanced import KafkaConsumerAdvanced
from config import BOOTSTRAP_SERVERS, TOPIC, GROUP_ID
received_messages = []
def test_handler(message):
"""Handler de test qui collecte les messages."""
received_messages.append(message['value'])
return True
def test_producer_consumer():
"""Test d'integration producer/consumer."""
print("=== Test Integration Kafka ===\n")
# 1. Demarrer le consumer dans un thread
consumer = KafkaConsumerAdvanced(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=f"{GROUP_ID}-test-{time.time()}", # Groupe unique
topics=[TOPIC]
)
consumer_thread = threading.Thread(
target=consumer.consume,
kwargs={'handler': test_handler, 'batch_size': 5}
)
consumer_thread.daemon = True
consumer_thread.start()
# Attendre que le consumer soit pret
time.sleep(2)
# 2. Envoyer des messages avec le producer
producer = KafkaProducerAdvanced(BOOTSTRAP_SERVERS)
test_messages = [
{"test_id": 1, "data": "Premier message"},
{"test_id": 2, "data": "Deuxieme message"},
{"test_id": 3, "data": "Troisieme message"},
]
print("Envoi des messages de test...")
for msg in test_messages:
producer.send(TOPIC, msg, key=f"test-{msg['test_id']}")
producer.flush()
print(f" {len(test_messages)} messages envoyes")
# 3. Attendre la reception
print("\nAttente de la reception...")
time.sleep(5)
# 4. Verifier les resultats
consumer.stop()
time.sleep(1)
print(f"\nResultats:")
print(f" Messages envoyes: {len(test_messages)}")
print(f" Messages recus: {len(received_messages)}")
print(f" Producer metrics: {producer.get_metrics()}")
print(f" Consumer metrics: {consumer.get_metrics()}")
# Verification
success = len(received_messages) >= len(test_messages)
print(f"\n{'SUCCESS' if success else 'FAILED'}")
producer.close()
if __name__ == "__main__":
test_producer_consumer()
Bonnes Pratiques
1. Gestion des Erreurs
# Toujours wrapper les operations Kafka
try:
producer.send(topic, message)
except BufferError:
# Buffer plein - attendre et reessayer
producer.poll(1)
producer.send(topic, message)
except KafkaException as e:
# Erreur Kafka - logger et gerer
logger.error(f"Erreur Kafka: {e}")
2. Serialisation
# Utiliser un serializer reutilisable
def serialize(data: Any) -> bytes:
"""Serialise les donnees en JSON bytes."""
if isinstance(data, bytes):
return data
if isinstance(data, str):
return data.encode('utf-8')
return json.dumps(data, default=str).encode('utf-8')
3. Configuration Production
# Configuration recommandee pour production
PROD_PRODUCER_CONFIG = {
'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
'acks': 'all',
'enable.idempotence': True,
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'lz4',
'batch.size': 32768,
'linger.ms': 10,
}
PROD_CONSUMER_CONFIG = {
'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
'group.id': 'my-app-consumer',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'max.poll.records': 500,
'session.timeout.ms': 30000,
}
A Retenir
Producer
- Toujours utiliser
acks='all'en production - Activer l'idempotence pour eviter les doublons
- Utiliser des callbacks pour confirmer la livraison
- Appeler
flush()avant de fermer
Consumer
- Preferer le commit manuel pour plus de controle
- Implementer une Dead Letter Queue pour les erreurs
- Gerer gracieusement les interruptions
- Utiliser des consumer groups pour la scalabilite
General
- Tester avec Docker en local avant la production
- Monitorer les metriques (lag, throughput, erreurs)
- Documenter vos schemas de messages
Besoin d'Aller Plus Loin ?
Ce tutoriel vous a donne les bases pour travailler avec Kafka en Python. Pour approfondir :
- Formation Kafka Python : Schemas Avro, Kafka Streams, patterns avances
- Accompagnement projet : Integration Kafka dans votre architecture
- Audit de code : Review de votre implementation Kafka
Contactez-moi pour discuter de vos besoins
Ce tutoriel fait partie de ma serie sur Apache Kafka. Consultez egalement mes guides sur le monitoring, l'architecture, et les bonnes pratiques en production.