KAFKA
Debutant

Premier Producer/Consumer Kafka en Python

Premier Producer/Consumer Kafka en Python

Tutoriel pratique pour creer votre premier producer et consumer Kafka en Python avec confluent-kafka : installation, code complet, gestion des erreurs et bonnes pratiques.

Florian Courouge
12 min de lecture
4,183 mots
0 vues
Kafka
Python
Tutorial
Producer
Consumer
confluent-kafka
Beginner

Premier Producer/Consumer Kafka en Python

Python est l'un des langages les plus populaires pour interagir avec Apache Kafka, que ce soit pour du data engineering, du machine learning ou des microservices. Ce tutoriel vous guide pas a pas pour creer votre premier producer et consumer Kafka en Python avec la bibliotheque confluent-kafka.

Python Kafka Tutorial

Prerequis

Avant de commencer, assurez-vous d'avoir :

  • Python 3.8+ installe
  • Docker (pour lancer Kafka facilement)
  • Un editeur de code (VSCode, PyCharm, etc.)

Verification de l'environnement

# Verifier Python
python --version
# Python 3.10.12

# Verifier pip
pip --version
# pip 23.0.1

Etape 1 : Lancer Kafka avec Docker

Pour ce tutoriel, nous utilisons Docker Compose pour demarrer un cluster Kafka local.

docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  # Interface web pour visualiser Kafka (optionnel mais utile)
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Demarrer Kafka

# Demarrer les services
docker-compose up -d

# Verifier que tout fonctionne
docker-compose ps

# Resultat attendu:
# NAME        STATUS
# zookeeper   Up
# kafka       Up
# kafka-ui    Up

# Attendre quelques secondes que Kafka soit pret
sleep 10

# Creer un topic de test
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic mon-premier-topic \
  --partitions 3 \
  --replication-factor 1

# Lister les topics
docker exec kafka kafka-topics --list \
  --bootstrap-server localhost:9092

Vous pouvez maintenant acceder a l'interface Kafka UI sur http://localhost:8080

Etape 2 : Configuration du Projet Python

Creation du projet

# Creer le repertoire du projet
mkdir kafka-python-demo
cd kafka-python-demo

# Creer un environnement virtuel
python -m venv venv

# Activer l'environnement
# Linux/Mac:
source venv/bin/activate
# Windows:
# venv\Scripts\activate

# Installer les dependances
pip install confluent-kafka python-dotenv

Structure du projet

kafka-python-demo/
├── venv/
├── .env
├── config.py
├── producer.py
├── consumer.py
├── producer_advanced.py
├── consumer_advanced.py
└── requirements.txt

requirements.txt

confluent-kafka>=2.3.0
python-dotenv>=1.0.0

.env

# Configuration Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9093
KAFKA_TOPIC=mon-premier-topic

# Configuration optionnelle
KAFKA_GROUP_ID=mon-groupe-python

config.py

"""Configuration centralisee pour Kafka."""
import os
from dotenv import load_dotenv

load_dotenv()

# Configuration de base
BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9093')
TOPIC = os.getenv('KAFKA_TOPIC', 'mon-premier-topic')
GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'mon-groupe-python')

# Configuration Producer
PRODUCER_CONFIG = {
    'bootstrap.servers': BOOTSTRAP_SERVERS,
    'client.id': 'python-producer-demo',
}

# Configuration Consumer
CONSUMER_CONFIG = {
    'bootstrap.servers': BOOTSTRAP_SERVERS,
    'group.id': GROUP_ID,
    'auto.offset.reset': 'earliest',  # Lire depuis le debut si nouveau groupe
}

print(f"Configuration Kafka:")
print(f"  Bootstrap: {BOOTSTRAP_SERVERS}")
print(f"  Topic: {TOPIC}")
print(f"  Group ID: {GROUP_ID}")

Etape 3 : Premier Producer (Simple)

Commencons par un producer basique qui envoie des messages a Kafka.

producer.py

"""
Premier Producer Kafka en Python.
Envoie des messages simples vers un topic Kafka.
"""
from confluent_kafka import Producer
import json
import time
from config import PRODUCER_CONFIG, TOPIC


def delivery_callback(err, msg):
    """
    Callback appele quand un message est delivre (ou echoue).

    Args:
        err: Erreur eventuelle
        msg: Message envoye
    """
    if err is not None:
        print(f'Erreur de livraison: {err}')
    else:
        print(f'Message livre: topic={msg.topic()}, '
              f'partition={msg.partition()}, '
              f'offset={msg.offset()}')


def create_producer():
    """Cree et retourne un producer Kafka."""
    return Producer(PRODUCER_CONFIG)


def send_message(producer, topic, key, value):
    """
    Envoie un message a Kafka.

    Args:
        producer: Instance du Producer
        topic: Nom du topic
        key: Cle du message (pour partitioning)
        value: Contenu du message
    """
    # Serialiser en JSON si c'est un dictionnaire
    if isinstance(value, dict):
        value = json.dumps(value)

    # Encoder en bytes
    if isinstance(value, str):
        value = value.encode('utf-8')
    if key and isinstance(key, str):
        key = key.encode('utf-8')

    # Envoyer le message de maniere asynchrone
    producer.produce(
        topic=topic,
        key=key,
        value=value,
        callback=delivery_callback
    )

    # Declencher l'envoi des messages en attente
    producer.poll(0)


def main():
    """Fonction principale."""
    print("=== Demarrage du Producer Kafka ===")

    # Creer le producer
    producer = create_producer()

    try:
        # Envoyer 10 messages de test
        for i in range(10):
            message = {
                "id": i,
                "message": f"Hello Kafka #{i}",
                "timestamp": time.time()
            }

            # Utiliser l'ID comme cle (garantit l'ordre par cle)
            key = f"key-{i % 3}"  # 3 cles differentes pour 3 partitions

            print(f"Envoi: key={key}, value={message}")
            send_message(producer, TOPIC, key, message)

            # Petite pause pour visualiser
            time.sleep(0.5)

        # Attendre que tous les messages soient envoyes
        print("\nAttente de la confirmation de tous les messages...")
        producer.flush(timeout=10)
        print("Tous les messages ont ete envoyes!")

    except KeyboardInterrupt:
        print("\nInterruption par l'utilisateur")
    finally:
        print("Producer termine.")


if __name__ == "__main__":
    main()

Execution du Producer

# Activer l'environnement virtuel si necessaire
source venv/bin/activate

# Lancer le producer
python producer.py

Resultat attendu :

Configuration Kafka:
  Bootstrap: localhost:9093
  Topic: mon-premier-topic
  Group ID: mon-groupe-python
=== Demarrage du Producer Kafka ===
Envoi: key=key-0, value={'id': 0, 'message': 'Hello Kafka #0', 'timestamp': 1704456789.123}
Message livre: topic=mon-premier-topic, partition=1, offset=0
Envoi: key=key-1, value={'id': 1, 'message': 'Hello Kafka #1', 'timestamp': 1704456789.623}
Message livre: topic=mon-premier-topic, partition=2, offset=0
...

Etape 4 : Premier Consumer (Simple)

Maintenant, creons un consumer pour lire les messages.

consumer.py

"""
Premier Consumer Kafka en Python.
Lit les messages depuis un topic Kafka.
"""
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import sys
from config import CONSUMER_CONFIG, TOPIC


def create_consumer():
    """Cree et retourne un consumer Kafka."""
    return Consumer(CONSUMER_CONFIG)


def process_message(msg):
    """
    Traite un message recu.

    Args:
        msg: Message Kafka

    Returns:
        dict: Message decode
    """
    # Decoder la cle
    key = msg.key().decode('utf-8') if msg.key() else None

    # Decoder la valeur
    try:
        value = json.loads(msg.value().decode('utf-8'))
    except json.JSONDecodeError:
        value = msg.value().decode('utf-8')

    return {
        'topic': msg.topic(),
        'partition': msg.partition(),
        'offset': msg.offset(),
        'key': key,
        'value': value
    }


def main():
    """Fonction principale."""
    print("=== Demarrage du Consumer Kafka ===")

    # Creer le consumer
    consumer = create_consumer()

    # S'abonner au topic
    consumer.subscribe([TOPIC])
    print(f"Abonne au topic: {TOPIC}")
    print("En attente de messages... (Ctrl+C pour arreter)\n")

    try:
        while True:
            # Attendre un message (timeout 1 seconde)
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                # Pas de message disponible
                continue

            if msg.error():
                # Gestion des erreurs
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # Fin de partition atteinte (normal)
                    print(f"Fin de partition {msg.partition()}")
                else:
                    raise KafkaException(msg.error())
            else:
                # Message recu avec succes
                data = process_message(msg)
                print(f"Recu: partition={data['partition']}, "
                      f"offset={data['offset']}, "
                      f"key={data['key']}")
                print(f"  -> {data['value']}")
                print()

    except KeyboardInterrupt:
        print("\nInterruption par l'utilisateur")
    finally:
        # Fermer proprement le consumer
        print("Fermeture du consumer...")
        consumer.close()
        print("Consumer termine.")


if __name__ == "__main__":
    main()

Execution du Consumer

# Dans un nouveau terminal
source venv/bin/activate
python consumer.py

Resultat attendu :

Configuration Kafka:
  Bootstrap: localhost:9093
  Topic: mon-premier-topic
  Group ID: mon-groupe-python
=== Demarrage du Consumer Kafka ===
Abonne au topic: mon-premier-topic
En attente de messages... (Ctrl+C pour arreter)

Recu: partition=1, offset=0, key=key-0
  -> {'id': 0, 'message': 'Hello Kafka #0', 'timestamp': 1704456789.123}

Recu: partition=2, offset=0, key=key-1
  -> {'id': 1, 'message': 'Hello Kafka #1', 'timestamp': 1704456789.623}
...

Etape 5 : Producer Avance

Ameliorons le producer avec gestion des erreurs, retry, et batch.

producer_advanced.py

"""
Producer Kafka avance avec gestion d'erreurs, retry et metriques.
"""
from confluent_kafka import Producer, KafkaError
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional, Dict, Any
from config import BOOTSTRAP_SERVERS, TOPIC

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class ProducerMetrics:
    """Metriques du producer."""
    messages_sent: int = 0
    messages_failed: int = 0
    total_latency_ms: float = 0.0

    @property
    def avg_latency_ms(self) -> float:
        if self.messages_sent == 0:
            return 0
        return self.total_latency_ms / self.messages_sent


class KafkaProducerAdvanced:
    """Producer Kafka avec fonctionnalites avancees."""

    def __init__(self, bootstrap_servers: str, **kwargs):
        """
        Initialise le producer.

        Args:
            bootstrap_servers: Serveurs Kafka
            **kwargs: Configuration additionnelle
        """
        # Configuration production-ready
        self.config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': 'python-producer-advanced',

            # Durabilite
            'acks': 'all',  # Attendre confirmation de tous les replicas
            'enable.idempotence': True,  # Eviter les doublons

            # Performance
            'batch.size': 16384,  # 16KB par batch
            'linger.ms': 5,  # Attendre 5ms pour remplir le batch
            'compression.type': 'snappy',  # Compression

            # Retry
            'retries': 3,
            'retry.backoff.ms': 100,

            # Timeouts
            'request.timeout.ms': 30000,
            'delivery.timeout.ms': 120000,

            **kwargs
        }

        self.producer = Producer(self.config)
        self.metrics = ProducerMetrics()
        self._pending_messages: Dict[str, float] = {}  # Pour calculer la latence

        logger.info(f"Producer initialise: {bootstrap_servers}")

    def _delivery_callback(self, err, msg):
        """Callback de livraison avec metriques."""
        msg_id = f"{msg.topic()}-{msg.partition()}-{msg.offset()}"

        # Calculer la latence
        start_time = self._pending_messages.pop(msg_id, time.time())
        latency = (time.time() - start_time) * 1000

        if err is not None:
            self.metrics.messages_failed += 1
            logger.error(f"Echec livraison: {err}")
        else:
            self.metrics.messages_sent += 1
            self.metrics.total_latency_ms += latency
            logger.debug(
                f"Message livre: topic={msg.topic()}, "
                f"partition={msg.partition()}, "
                f"offset={msg.offset()}, "
                f"latency={latency:.2f}ms"
            )

    def send(
        self,
        topic: str,
        value: Any,
        key: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Envoie un message a Kafka.

        Args:
            topic: Nom du topic
            value: Valeur du message (sera serialise en JSON)
            key: Cle optionnelle
            headers: Headers optionnels
        """
        # Serialisation
        if isinstance(value, (dict, list)):
            value = json.dumps(value).encode('utf-8')
        elif isinstance(value, str):
            value = value.encode('utf-8')

        if key:
            key = key.encode('utf-8')

        # Preparer les headers
        kafka_headers = None
        if headers:
            kafka_headers = [(k, v.encode('utf-8')) for k, v in headers.items()]

        # Marquer le temps de debut
        msg_id = f"{topic}-pending-{time.time()}"
        self._pending_messages[msg_id] = time.time()

        # Envoyer
        try:
            self.producer.produce(
                topic=topic,
                key=key,
                value=value,
                headers=kafka_headers,
                callback=self._delivery_callback
            )
            # Trigger l'envoi
            self.producer.poll(0)
        except BufferError:
            logger.warning("Buffer plein, attente...")
            self.producer.poll(1)
            self.producer.produce(
                topic=topic,
                key=key,
                value=value,
                headers=kafka_headers,
                callback=self._delivery_callback
            )

    def send_batch(self, topic: str, messages: list) -> None:
        """
        Envoie un batch de messages.

        Args:
            topic: Nom du topic
            messages: Liste de tuples (key, value) ou juste values
        """
        for msg in messages:
            if isinstance(msg, tuple):
                key, value = msg
            else:
                key, value = None, msg
            self.send(topic, value, key)

        # Forcer l'envoi du batch
        self.producer.poll(0)

    def flush(self, timeout: float = 10.0) -> int:
        """
        Attend que tous les messages soient envoyes.

        Args:
            timeout: Timeout en secondes

        Returns:
            Nombre de messages encore en attente
        """
        return self.producer.flush(timeout)

    def get_metrics(self) -> Dict:
        """Retourne les metriques du producer."""
        return {
            'messages_sent': self.metrics.messages_sent,
            'messages_failed': self.metrics.messages_failed,
            'avg_latency_ms': round(self.metrics.avg_latency_ms, 2),
            'success_rate': round(
                self.metrics.messages_sent /
                max(1, self.metrics.messages_sent + self.metrics.messages_failed) * 100,
                2
            )
        }

    def close(self) -> None:
        """Ferme le producer proprement."""
        remaining = self.flush(timeout=30)
        if remaining > 0:
            logger.warning(f"{remaining} messages non envoyes")
        logger.info(f"Producer ferme. Metriques: {self.get_metrics()}")


def main():
    """Demonstration du producer avance."""
    logger.info("=== Demo Producer Avance ===")

    producer = KafkaProducerAdvanced(BOOTSTRAP_SERVERS)

    try:
        # Envoyer des messages individuels
        for i in range(5):
            message = {
                "id": i,
                "type": "event",
                "data": f"Message avance #{i}",
                "timestamp": time.time()
            }
            producer.send(
                topic=TOPIC,
                value=message,
                key=f"user-{i % 2}",
                headers={"source": "python-demo", "version": "2.0"}
            )
            time.sleep(0.1)

        # Envoyer un batch
        logger.info("Envoi d'un batch...")
        batch = [
            ("batch-key-1", {"type": "batch", "index": 0}),
            ("batch-key-2", {"type": "batch", "index": 1}),
            ("batch-key-1", {"type": "batch", "index": 2}),
        ]
        producer.send_batch(TOPIC, batch)

        # Attendre la fin
        producer.flush()

        # Afficher les metriques
        logger.info(f"Metriques finales: {producer.get_metrics()}")

    except KeyboardInterrupt:
        logger.info("Interruption utilisateur")
    finally:
        producer.close()


if __name__ == "__main__":
    main()

Etape 6 : Consumer Avance

Consumer avec commit manuel, gestion d'erreurs, et Dead Letter Queue.

consumer_advanced.py

"""
Consumer Kafka avance avec commit manuel, retry et DLQ.
"""
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException
import json
import time
import logging
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from config import BOOTSTRAP_SERVERS, TOPIC, GROUP_ID

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class ConsumerMetrics:
    """Metriques du consumer."""
    messages_processed: int = 0
    messages_failed: int = 0
    processing_time_ms: float = 0.0

    @property
    def avg_processing_time_ms(self) -> float:
        if self.messages_processed == 0:
            return 0
        return self.processing_time_ms / self.messages_processed


class KafkaConsumerAdvanced:
    """Consumer Kafka avec fonctionnalites avancees."""

    def __init__(
        self,
        bootstrap_servers: str,
        group_id: str,
        topics: list,
        dlq_topic: Optional[str] = None,
        **kwargs
    ):
        """
        Initialise le consumer.

        Args:
            bootstrap_servers: Serveurs Kafka
            group_id: ID du consumer group
            topics: Liste des topics a consommer
            dlq_topic: Topic Dead Letter Queue (optionnel)
            **kwargs: Configuration additionnelle
        """
        self.topics = topics
        self.dlq_topic = dlq_topic

        # Configuration consumer
        self.config = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',

            # Commit manuel pour plus de controle
            'enable.auto.commit': False,

            # Performance
            'fetch.min.bytes': 1024,  # 1KB minimum
            'fetch.wait.max.ms': 500,
            'max.poll.interval.ms': 300000,  # 5 minutes max entre polls

            # Heartbeat
            'session.timeout.ms': 30000,
            'heartbeat.interval.ms': 10000,

            **kwargs
        }

        self.consumer = Consumer(self.config)
        self.metrics = ConsumerMetrics()
        self._running = False

        # Producer pour DLQ si configure
        self._dlq_producer = None
        if dlq_topic:
            self._dlq_producer = Producer({
                'bootstrap.servers': bootstrap_servers
            })

        logger.info(f"Consumer initialise: group={group_id}, topics={topics}")

    def _send_to_dlq(self, msg, error: str) -> None:
        """Envoie un message en erreur vers la DLQ."""
        if not self._dlq_producer or not self.dlq_topic:
            logger.warning("DLQ non configuree, message perdu")
            return

        dlq_message = {
            'original_topic': msg.topic(),
            'original_partition': msg.partition(),
            'original_offset': msg.offset(),
            'original_key': msg.key().decode('utf-8') if msg.key() else None,
            'original_value': msg.value().decode('utf-8'),
            'error': error,
            'timestamp': time.time()
        }

        self._dlq_producer.produce(
            topic=self.dlq_topic,
            value=json.dumps(dlq_message).encode('utf-8'),
            key=msg.key()
        )
        self._dlq_producer.poll(0)
        logger.info(f"Message envoye en DLQ: {self.dlq_topic}")

    def _process_message(
        self,
        msg,
        handler: Callable[[Dict], bool],
        max_retries: int = 3
    ) -> bool:
        """
        Traite un message avec retry.

        Args:
            msg: Message Kafka
            handler: Fonction de traitement
            max_retries: Nombre max de tentatives

        Returns:
            True si traite avec succes
        """
        # Decoder le message
        try:
            key = msg.key().decode('utf-8') if msg.key() else None
            value = json.loads(msg.value().decode('utf-8'))
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            logger.error(f"Erreur decodage: {e}")
            self._send_to_dlq(msg, f"Decode error: {e}")
            return False

        message_data = {
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset(),
            'key': key,
            'value': value,
            'headers': dict(msg.headers()) if msg.headers() else {}
        }

        # Retry logic
        last_error = None
        for attempt in range(max_retries):
            try:
                start_time = time.time()
                result = handler(message_data)
                processing_time = (time.time() - start_time) * 1000

                if result:
                    self.metrics.messages_processed += 1
                    self.metrics.processing_time_ms += processing_time
                    return True
                else:
                    raise ValueError("Handler returned False")

            except Exception as e:
                last_error = str(e)
                logger.warning(
                    f"Erreur traitement (tentative {attempt + 1}/{max_retries}): {e}"
                )
                if attempt < max_retries - 1:
                    time.sleep(0.5 * (attempt + 1))  # Backoff exponentiel

        # Toutes les tentatives ont echoue
        self.metrics.messages_failed += 1
        self._send_to_dlq(msg, f"Max retries exceeded: {last_error}")
        return False

    def consume(
        self,
        handler: Callable[[Dict], bool],
        batch_size: int = 100,
        poll_timeout: float = 1.0
    ) -> None:
        """
        Demarre la consommation des messages.

        Args:
            handler: Fonction appelee pour chaque message
            batch_size: Taille du batch pour commit
            poll_timeout: Timeout du poll en secondes
        """
        self.consumer.subscribe(self.topics)
        self._running = True
        messages_since_commit = 0

        logger.info(f"Demarrage consommation: {self.topics}")

        try:
            while self._running:
                msg = self.consumer.poll(timeout=poll_timeout)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        logger.debug(f"Fin de partition {msg.partition()}")
                    else:
                        logger.error(f"Erreur Kafka: {msg.error()}")
                    continue

                # Traiter le message
                success = self._process_message(msg, handler)
                messages_since_commit += 1

                # Commit periodique
                if messages_since_commit >= batch_size:
                    self.consumer.commit(asynchronous=False)
                    logger.debug(f"Commit de {messages_since_commit} messages")
                    messages_since_commit = 0

        except KeyboardInterrupt:
            logger.info("Interruption utilisateur")
        finally:
            # Commit final
            if messages_since_commit > 0:
                self.consumer.commit(asynchronous=False)
            self.close()

    def get_metrics(self) -> Dict:
        """Retourne les metriques du consumer."""
        return {
            'messages_processed': self.metrics.messages_processed,
            'messages_failed': self.metrics.messages_failed,
            'avg_processing_time_ms': round(self.metrics.avg_processing_time_ms, 2),
            'success_rate': round(
                self.metrics.messages_processed /
                max(1, self.metrics.messages_processed + self.metrics.messages_failed) * 100,
                2
            )
        }

    def stop(self) -> None:
        """Arrete la consommation."""
        self._running = False

    def close(self) -> None:
        """Ferme le consumer proprement."""
        self.consumer.close()
        if self._dlq_producer:
            self._dlq_producer.flush()
        logger.info(f"Consumer ferme. Metriques: {self.get_metrics()}")


def message_handler(message: Dict) -> bool:
    """
    Exemple de handler de message.

    Args:
        message: Donnees du message

    Returns:
        True si traitement reussi
    """
    logger.info(
        f"Traitement: partition={message['partition']}, "
        f"offset={message['offset']}, "
        f"key={message['key']}"
    )
    logger.info(f"  Contenu: {message['value']}")

    # Simuler un traitement
    time.sleep(0.1)

    # Simuler une erreur aleatoire (pour demo retry)
    # if message['value'].get('id', 0) % 5 == 4:
    #     raise ValueError("Erreur simulee")

    return True


def main():
    """Demonstration du consumer avance."""
    logger.info("=== Demo Consumer Avance ===")

    # Creer le topic DLQ si necessaire
    dlq_topic = f"{TOPIC}-dlq"

    consumer = KafkaConsumerAdvanced(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        group_id=GROUP_ID,
        topics=[TOPIC],
        dlq_topic=dlq_topic
    )

    try:
        consumer.consume(
            handler=message_handler,
            batch_size=10,
            poll_timeout=1.0
        )
    except Exception as e:
        logger.error(f"Erreur fatale: {e}")
    finally:
        logger.info(f"Metriques finales: {consumer.get_metrics()}")


if __name__ == "__main__":
    main()

Etape 7 : Test Complet

Script de test end-to-end

# test_kafka.py
"""Test end-to-end du producer et consumer."""
import threading
import time
from producer_advanced import KafkaProducerAdvanced
from consumer_advanced import KafkaConsumerAdvanced
from config import BOOTSTRAP_SERVERS, TOPIC, GROUP_ID

received_messages = []

def test_handler(message):
    """Handler de test qui collecte les messages."""
    received_messages.append(message['value'])
    return True

def test_producer_consumer():
    """Test d'integration producer/consumer."""
    print("=== Test Integration Kafka ===\n")

    # 1. Demarrer le consumer dans un thread
    consumer = KafkaConsumerAdvanced(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        group_id=f"{GROUP_ID}-test-{time.time()}",  # Groupe unique
        topics=[TOPIC]
    )

    consumer_thread = threading.Thread(
        target=consumer.consume,
        kwargs={'handler': test_handler, 'batch_size': 5}
    )
    consumer_thread.daemon = True
    consumer_thread.start()

    # Attendre que le consumer soit pret
    time.sleep(2)

    # 2. Envoyer des messages avec le producer
    producer = KafkaProducerAdvanced(BOOTSTRAP_SERVERS)

    test_messages = [
        {"test_id": 1, "data": "Premier message"},
        {"test_id": 2, "data": "Deuxieme message"},
        {"test_id": 3, "data": "Troisieme message"},
    ]

    print("Envoi des messages de test...")
    for msg in test_messages:
        producer.send(TOPIC, msg, key=f"test-{msg['test_id']}")

    producer.flush()
    print(f"  {len(test_messages)} messages envoyes")

    # 3. Attendre la reception
    print("\nAttente de la reception...")
    time.sleep(5)

    # 4. Verifier les resultats
    consumer.stop()
    time.sleep(1)

    print(f"\nResultats:")
    print(f"  Messages envoyes: {len(test_messages)}")
    print(f"  Messages recus: {len(received_messages)}")
    print(f"  Producer metrics: {producer.get_metrics()}")
    print(f"  Consumer metrics: {consumer.get_metrics()}")

    # Verification
    success = len(received_messages) >= len(test_messages)
    print(f"\n{'SUCCESS' if success else 'FAILED'}")

    producer.close()

if __name__ == "__main__":
    test_producer_consumer()

Bonnes Pratiques

1. Gestion des Erreurs

# Toujours wrapper les operations Kafka
try:
    producer.send(topic, message)
except BufferError:
    # Buffer plein - attendre et reessayer
    producer.poll(1)
    producer.send(topic, message)
except KafkaException as e:
    # Erreur Kafka - logger et gerer
    logger.error(f"Erreur Kafka: {e}")

2. Serialisation

# Utiliser un serializer reutilisable
def serialize(data: Any) -> bytes:
    """Serialise les donnees en JSON bytes."""
    if isinstance(data, bytes):
        return data
    if isinstance(data, str):
        return data.encode('utf-8')
    return json.dumps(data, default=str).encode('utf-8')

3. Configuration Production

# Configuration recommandee pour production
PROD_PRODUCER_CONFIG = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'retries': 5,
    'retry.backoff.ms': 200,
    'compression.type': 'lz4',
    'batch.size': 32768,
    'linger.ms': 10,
}

PROD_CONSUMER_CONFIG = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'group.id': 'my-app-consumer',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest',
    'max.poll.records': 500,
    'session.timeout.ms': 30000,
}

A Retenir

Producer

  • Toujours utiliser acks='all' en production
  • Activer l'idempotence pour eviter les doublons
  • Utiliser des callbacks pour confirmer la livraison
  • Appeler flush() avant de fermer

Consumer

  • Preferer le commit manuel pour plus de controle
  • Implementer une Dead Letter Queue pour les erreurs
  • Gerer gracieusement les interruptions
  • Utiliser des consumer groups pour la scalabilite

General

  • Tester avec Docker en local avant la production
  • Monitorer les metriques (lag, throughput, erreurs)
  • Documenter vos schemas de messages

Besoin d'Aller Plus Loin ?

Ce tutoriel vous a donne les bases pour travailler avec Kafka en Python. Pour approfondir :

  • Formation Kafka Python : Schemas Avro, Kafka Streams, patterns avances
  • Accompagnement projet : Integration Kafka dans votre architecture
  • Audit de code : Review de votre implementation Kafka

Contactez-moi pour discuter de vos besoins


Ce tutoriel fait partie de ma serie sur Apache Kafka. Consultez egalement mes guides sur le monitoring, l'architecture, et les bonnes pratiques en production.

F

Florian Courouge

Expert DevOps & Kafka | Consultant freelance specialise dans les architectures distribuees et le streaming de donnees.

Articles similaires