Producers et Consumers Kafka
Les producers (producteurs) et consumers (consommateurs) sont les deux types de clients qui interagissent avec Kafka. Comprendre leur fonctionnement est essentiel pour construire des applications robustes.
Producers (Producteurs)
Rôle du producer
Un producer est une application qui envoie des messages vers Kafka. Son travail :
- Sérialiser les données en bytes
- Déterminer la partition cible
- Envoyer le message au broker leader
- Gérer les confirmations et les erreurs
Flux d'envoi d'un message
┌──────────────────────────────────────────────────────────────────┐
│ PRODUCER │
├──────────────────────────────────────────────────────────────────┤
│ │
│ Message │
│ │ │
│ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Serializer │───▶│Partitioner │───▶│ Buffer │ │
│ │ (Key+Value)│ │ │ │ (Batch) │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │ Sender │ │
│ │ (Thread) │ │
│ └────────────┘ │
│ │ │
└────────────────────────────────────────────┼──────────────────────┘
│
▼
┌────────────┐
│ Broker │
│ (Leader) │
└────────────┘
Sérialisation
Kafka transporte des bytes. Le producer doit convertir les objets en bytes.
Sérialiseurs courants :
| Sérialiseur | Usage | Format |
|---|---|---|
StringSerializer |
Texte simple | UTF-8 |
IntegerSerializer |
Entiers | 4 bytes |
ByteArraySerializer |
Bytes bruts | Passthrough |
JsonSerializer |
Objets JSON | JSON string |
AvroSerializer |
Schémas évolutifs | Avro binaire |
Partitionnement
Le producer détermine dans quelle partition envoyer chaque message :
┌─────────────────────────────────────────────────────────────┐
│ STRATÉGIES DE PARTITIONNEMENT │
├─────────────────────────────────────────────────────────────┤
│ │
│ Message avec clé ? │
│ │ │
│ ┌────┴────┐ │
│ OUI NON │
│ │ │ │
│ ▼ ▼ │
│ hash(key) Round-robin │
│ % N ou Sticky │
│ │
│ Résultat: Partition 0..N-1 │
└─────────────────────────────────────────────────────────────┘
Avec clé : partition = hash(key) % nombre_partitions
- Les messages avec la même clé vont toujours dans la même partition
- Garantit l'ordre pour une entité donnée (ex: tous les événements d'un utilisateur)
Sans clé : Distribution équilibrée
- Round-robin : alternance entre partitions
- Sticky : batch vers une partition, puis change
Configuration importante du producer
| Paramètre | Défaut | Description |
|---|---|---|
bootstrap.servers |
- | Liste des brokers pour la connexion initiale |
key.serializer |
- | Classe de sérialisation pour les clés |
value.serializer |
- | Classe de sérialisation pour les valeurs |
acks |
all |
Niveau de confirmation requis |
retries |
2147483647 |
Nombre de tentatives en cas d'erreur |
batch.size |
16384 |
Taille du batch en bytes |
linger.ms |
0 |
Temps d'attente pour remplir un batch |
Niveaux d'acknowledgement (acks)
Le paramètre acks contrôle la durabilité des messages :
| Valeur | Comportement | Durabilité | Performance |
|---|---|---|---|
0 |
Pas de confirmation | Faible | Très haute |
1 |
Leader confirme | Moyenne | Haute |
all / -1 |
Toutes les ISR confirment | Haute | Plus faible |
acks=0 : Fire and forget
┌──────────┐ ┌──────────┐
│ Producer │────────▶│ Broker │
└──────────┘ └──────────┘
(pas d'attente)
acks=1 : Leader acknowledgement
┌──────────┐ ┌──────────┐
│ Producer │────────▶│ Leader │
└──────────┘◀────────└──────────┘
ACK
acks=all : Full ISR acknowledgement
┌──────────┐ ┌──────────┐
│ Producer │────────▶│ Leader │───▶ Réplication
└──────────┘◀────────└──────────┘◀─── ISR sync
ACK après réplication
Consumers (Consommateurs)
Rôle du consumer
Un consumer est une application qui lit des messages depuis Kafka. Son travail :
- S'abonner à un ou plusieurs topics
- Récupérer les messages par batch (poll)
- Désérialiser les données
- Traiter les messages
- Valider la progression (commit offset)
Flux de lecture
┌──────────────────────────────────────────────────────────────────┐
│ CONSUMER │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────┐ │
│ │ poll() │◀───── Appel périodique │
│ └────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Broker │───▶│ Fetch │───▶│Deserializer│ │
│ │ │ │ Buffer │ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │ Process │ │
│ │ Messages │ │
│ └────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │ Commit │ │
│ │ Offset │ │
│ └────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
Le mécanisme de poll
Le consumer utilise un modèle pull : c'est lui qui demande les messages.
# Pseudo-code du pattern de consommation
while True:
records = consumer.poll(timeout=1000) # Attend jusqu'à 1 seconde
for record in records:
process(record)
consumer.commit() # Valide la progression
Pourquoi un modèle pull plutôt que push ?
- Le consumer contrôle son rythme de traitement
- Évite de surcharger le consumer
- Permet le traitement par batch
Positions de lecture
Un consumer peut commencer à lire à différentes positions :
| Configuration | Comportement |
|---|---|
earliest |
Depuis le début du topic |
latest |
Uniquement les nouveaux messages |
specific offset |
À partir d'un offset précis |
Partition avec messages :
┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘
▲ ▲ ▲
│ │ │
earliest current offset latest
Configuration importante du consumer
| Paramètre | Défaut | Description |
|---|---|---|
bootstrap.servers |
- | Liste des brokers |
group.id |
- | Identifiant du consumer group |
key.deserializer |
- | Classe de désérialisation pour les clés |
value.deserializer |
- | Classe de désérialisation pour les valeurs |
auto.offset.reset |
latest |
Où commencer si pas d'offset sauvegardé |
enable.auto.commit |
true |
Commit automatique des offsets |
max.poll.records |
500 |
Nombre max de messages par poll |
Gestion des offsets
Qu'est-ce que le commit ?
Le commit d'offset indique à Kafka jusqu'où le consumer a traité les messages. En cas de redémarrage, le consumer reprend à partir du dernier offset commité.
Avant commit (offset 3) :
┌────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │
└────┴────┴────┴────┴────┴────┘
▲
committed
Après traitement et commit (offset 6) :
┌────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │
└────┴────┴────┴────┴────┴────┘
▲
committed
Stratégies de commit
1. Auto-commit (par défaut)
enable.auto.commit = true
auto.commit.interval.ms = 5000
- Simple à utiliser
- Risque de perte ou duplication en cas de crash
2. Commit synchrone
consumer.commit_sync()
- Bloque jusqu'à confirmation
- Garantit que l'offset est sauvegardé
- Impact sur les performances
3. Commit asynchrone
consumer.commit_async()
- Non-bloquant
- Meilleure performance
- Callbacks pour gérer les erreurs
4. Commit par message
for record in records:
process(record)
consumer.commit_sync(offsets={
TopicPartition(record.topic, record.partition):
OffsetAndMetadata(record.offset + 1)
})
- Contrôle fin
- Plus lent mais plus précis
Sémantiques de livraison
Le choix de la stratégie de commit impacte les garanties de livraison :
| Sémantique | Description | Risque |
|---|---|---|
| At-most-once | Commit avant traitement | Perte de messages |
| At-least-once | Commit après traitement | Doublons possibles |
| Exactly-once | Transactions Kafka | Complexité |
Gestion des erreurs
Erreurs côté producer
| Erreur | Cause | Action |
|---|---|---|
TimeoutException |
Broker ne répond pas | Retry automatique |
SerializationException |
Données invalides | Fix et retry |
RecordTooLargeException |
Message > 1MB | Compresser ou découper |
Erreurs côté consumer
| Erreur | Cause | Action |
|---|---|---|
DeserializationException |
Format invalide | Skip ou dead letter |
CommitFailedException |
Rebalance pendant commit | Retry |
WakeupException |
Arrêt demandé | Fermer proprement |
Pattern Dead Letter Queue
Pour les messages impossibles à traiter :
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Topic │────▶│ Consumer │────▶│ Process │
│ orders │ │ │ │ │
└──────────┘ └────┬─────┘ └──────────┘
│
│ Erreur de traitement
▼
┌──────────┐
│ DLQ │
│orders-dlq│
└──────────┘
Bonnes pratiques
Pour les producers
- Toujours spécifier une clé si l'ordre des messages compte
- Utiliser
acks=allen production pour la durabilité - Activer l'idempotence (
enable.idempotence=true) pour éviter les doublons - Configurer les retries avec backoff exponentiel
- Monitorer le lag et les erreurs d'envoi
Pour les consumers
- Traiter les messages de manière idempotente (supporter les doublons)
- Commit après traitement, pas avant
- Gérer les exceptions avec un pattern DLQ
- Configurer
max.poll.recordsselon la capacité de traitement - Implémenter un shutdown graceful
Résumé
| Composant | Responsabilité | Points clés |
|---|---|---|
| Producer | Envoyer des messages | Sérialisation, partitionnement, acks |
| Consumer | Lire des messages | Poll, désérialisation, commit |
| Offset | Position de lecture | Commit strategies, sémantiques |
Prochaines étapes
Pour aller plus loin, consultez l'article suivant :
- Consumer Groups : Traitement parallèle et coordination
Cet article fait partie de la série "Fondamentaux Kafka" qui explique les concepts de base de manière progressive et accessible.