Les 10 Erreurs Courantes avec Kafka (et Comment les Eviter)
Introduction
Apache Kafka est puissant mais complexe. Apres avoir aide de nombreuses equipes a deployer Kafka en production, j'ai identifie les erreurs les plus frequentes qui causent des problemes de performance, de fiabilite ou de maintenabilite.
Ce guide presente ces erreurs courantes avec :
- Une description du probleme
- Les consequences en production
- La solution recommandee
- Des exemples de code corriges
Erreur 1 : Pas Assez de Partitions
Le Probleme
Creer des topics avec trop peu de partitions limite le parallelisme et la scalabilite.
# Erreur courante : 1 seule partition
kafka-topics --create --topic orders --partitions 1 --replication-factor 3
Consequences
Topic "orders" avec 1 partition:
┌────────────────────────────────────┐
│ Partition 0 │
│ [msg1][msg2][msg3][msg4][msg5] │
└────────────────────────────────────┘
│
▼
┌─────────────┐
│ Consumer 1 │ ← Seul consumer possible!
└─────────────┘
Consumer 2 ? → IDLE (pas de partition disponible)
Consumer 3 ? → IDLE
Problemes:
├─ Impossible de scaler les consumers
├─ Throughput limite par un seul consumer
└─ Goulot d'etranglement sur la partition
Solution
# Bonne pratique : Dimensionner selon les besoins
# Regle: partitions >= nb max de consumers attendus
kafka-topics --create \
--topic orders \
--partitions 12 \ # Permet 12 consumers en parallele
--replication-factor 3
# Calcul recommande:
# partitions = max(throughput_target / throughput_partition, nb_consumers_max)
# throughput_partition ≈ 10-30 MB/s selon le cas
Attention
⚠️ Augmenter le nombre de partitions apres creation est possible
mais peut casser l'ordre des messages par cle!
# Avant: hash("user-123") % 6 = partition 2
# Apres: hash("user-123") % 12 = partition 8 (differente!)
Erreur 2 : Producer Synchrone en Production
Le Probleme
Utiliser send().get() de maniere synchrone bloque le thread a chaque message.
// ERREUR : Appel synchrone bloquant
for (Order order : orders) {
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", order.getId(), order.toJson());
producer.send(record).get(); // BLOQUE ~5-10ms par message!
}
// Throughput: ~100-200 msg/s (terrible)
Consequences
Temps d'execution avec 10,000 messages:
Mode Synchrone:
├─ 10,000 msg × 5ms = 50 secondes
└─ Throughput: 200 msg/s
Mode Asynchrone:
├─ Batching + parallelisme
└─ Throughput: 100,000+ msg/s
Solution
// CORRECT : Mode asynchrone avec callback
AtomicLong successCount = new AtomicLong(0);
AtomicLong errorCount = new AtomicLong(0);
for (Order order : orders) {
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", order.getId(), order.toJson());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
errorCount.incrementAndGet();
log.error("Failed to send order {}: {}", order.getId(), exception.getMessage());
// Gerer l'erreur (retry, DLQ, etc.)
} else {
successCount.incrementAndGet();
}
});
}
// Attendre la fin si necessaire
producer.flush();
// OU avec CompletableFuture pour traitement avance
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(metadata);
}
});
Erreur 3 : Ignorer les Configurations de Batch
Le Probleme
Laisser les configurations de batching par defaut reduit drastiquement le throughput.
// Configuration par defaut (non optimale)
props.put("batch.size", 16384); // 16 KB seulement
props.put("linger.ms", 0); // Pas d'attente
// Resultat: Beaucoup de petites requetes
Consequences
Sans batching optimise:
┌────────────────────────────────────────────┐
│ Request 1: [msg1] │
│ Request 2: [msg2] │
│ Request 3: [msg3] │
│ Request 4: [msg4] │
│ ... (une requete par message) │
└────────────────────────────────────────────┘
Overhead reseau: ENORME
Avec batching:
┌────────────────────────────────────────────┐
│ Request 1: [msg1, msg2, msg3, ..., msg1000]│
└────────────────────────────────────────────┘
Overhead reseau: MINIMAL
Solution
// Configuration optimisee pour le throughput
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144); // 256 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Attend 10ms
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64 MB buffer
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Trade-off:
// - linger.ms=0 → Latence minimale, throughput faible
// - linger.ms=10 → +10ms latence, throughput x10
// - linger.ms=100 → +100ms latence, throughput max
Erreur 4 : Consumer Commit Auto sans Comprendre
Le Probleme
Le commit automatique (enable.auto.commit=true) peut causer des pertes de messages ou des doublons.
// Configuration risquee
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // Commit toutes les 5s
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // Si crash ici, messages perdus!
}
// Auto-commit se fait en arriere-plan, pas apres le traitement
}
Consequences
Scenario de perte de messages:
1. Poll: recoit messages [1, 2, 3, 4, 5]
2. Auto-commit: offset = 5 (en arriere-plan)
3. Traitement: messages 1, 2, 3 traites
4. CRASH!
5. Redemarrage: reprend a offset 5
6. Messages 4, 5 PERDUS!
Scenario de doublons:
1. Poll: recoit messages [1, 2, 3, 4, 5]
2. Traitement: tous traites
3. CRASH avant auto-commit!
4. Redemarrage: reprend a offset 0
5. Messages 1-5 traites DEUX FOIS!
Solution
// CORRECT : Commit manuel apres traitement
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
// Gerer l'erreur (DLQ, retry, etc.)
handleError(record, e);
}
}
// Commit APRES traitement reussi
consumer.commitSync(); // Synchrone (plus sur)
// ou consumer.commitAsync(); // Asynchrone (plus rapide)
}
// ENCORE MIEUX : Commit par partition
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// Commit uniquement cette partition
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(
partition,
new OffsetAndMetadata(lastOffset + 1)
));
}
Erreur 5 : Utiliser le Mauvais Niveau de acks
Le Probleme
Choisir acks sans comprendre les implications sur la durabilite et la performance.
// Erreur 1: acks=0 en production (donnees critiques)
props.put("acks", "0"); // Fire-and-forget, risque de perte!
// Erreur 2: acks=all sans min.insync.replicas
props.put("acks", "all");
// Mais min.insync.replicas=1 sur le topic (defaut)!
Consequences
acks=0 (Fire-and-forget):
Producer ──► Broker (pas de confirmation)
├─ Pas de garantie de reception
├─ Message peut etre perdu sans le savoir
└─ Usage: Logs non critiques, metriques
acks=1 (Leader only):
Producer ──► Broker (leader confirme)
├─ Perte si leader crash avant replication
├─ Bon compromis perf/fiabilite
└─ Usage: Cas generaux
acks=all SANS min.insync.replicas correct:
┌────────────────────────────────────────────┐
│ Topic: orders (replication.factor=3) │
│ min.insync.replicas=1 (DEFAUT!) │
│ │
│ Si ISR = [broker-1] seulement: │
│ acks=all = acks=1 (une seule confirmation) │
│ → Faux sentiment de securite! │
└────────────────────────────────────────────┘
Solution
// PRODUCTION : Configuration robuste
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// + Configuration du topic
kafka-configs --alter --entity-type topics --entity-name orders \
--add-config min.insync.replicas=2
// Resultat:
// - acks=all attend que 2+ replicas confirment
// - Idempotence evite les duplicates
// - Si ISR < 2, les ecritures sont REFUSEES (NotEnoughReplicasException)
Choix recommandes:
Donnees critiques (paiements, commandes):
├─ acks=all
├─ min.insync.replicas=2
├─ enable.idempotence=true
└─ replication.factor=3
Donnees importantes (events applicatifs):
├─ acks=1
├─ replication.factor=3
└─ Acceptable: perte rare
Donnees non critiques (logs, metriques):
├─ acks=0 ou acks=1
├─ replication.factor=2
└─ Perte acceptable
Erreur 6 : Cles de Partitionnement Mal Choisies
Le Probleme
Choisir une cle qui cree un desequilibre entre partitions (hot partition).
// ERREUR : Cle avec faible cardinalite
// Seulement 3 valeurs possibles → max 3 partitions utilisees
producer.send(new ProducerRecord<>("orders", order.getStatus(), order.toJson()));
// status = "pending", "processing", "completed"
// ERREUR : Cle avec distribution non uniforme
producer.send(new ProducerRecord<>("orders", order.getCountry(), order.toJson()));
// 60% des commandes viennent de France → 60% sur une partition
Consequences
Topic avec 12 partitions, cle = status:
Partition 0: ████████████████████████ (40% - "pending")
Partition 1:
Partition 2:
Partition 3: ████████████████ (35% - "processing")
Partition 4:
Partition 5:
Partition 6: ███████████ (25% - "completed")
Partition 7:
...
Partition 11:
Problemes:
├─ 9 partitions inutilisees
├─ 3 consumers font tout le travail
├─ Hot partition = goulot d'etranglement
└─ Scalabilite compromise
Solution
// CORRECT : Cle avec haute cardinalite et distribution uniforme
// Utiliser un ID unique (customer_id, order_id, etc.)
producer.send(new ProducerRecord<>("orders", order.getCustomerId(), order.toJson()));
// Si pas de cle naturelle, utiliser null pour round-robin
producer.send(new ProducerRecord<>("orders", null, order.toJson()));
// OU combiner plusieurs champs pour meilleure distribution
String key = order.getCustomerId() + "-" + order.getOrderId();
producer.send(new ProducerRecord<>("orders", key, order.toJson()));
// Verifier la distribution
kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic orders \
--time -1 # Offset le plus recent par partition
Bonne distribution:
Partition 0: ████████
Partition 1: ████████
Partition 2: █████████
Partition 3: ████████
Partition 4: ████████
Partition 5: █████████
Partition 6: ████████
Partition 7: ████████
...
Partition 11: ████████
→ Charge equilibree, tous les consumers travaillent
Erreur 7 : Ignorer le Consumer Lag
Le Probleme
Ne pas surveiller le lag des consumers conduit a des problemes non detectes.
# Pas de monitoring en place
# Consumer ralentit sans que personne ne s'en rende compte
# Donnees "fraiches" de plus en plus anciennes
Consequences
Consumer lag non surveille:
Jour 1: lag = 100 messages (OK)
Jour 2: lag = 10,000 messages (probleme naissant)
Jour 3: lag = 500,000 messages (probleme serieux)
Jour 4: lag = 10,000,000 messages (CRISE!)
└─ Donnees de 2 jours de retard
└─ Disk full imminent
└─ Retention va supprimer des messages non traites!
Solution
# Monitoring du lag avec kafka-consumer-groups
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group my-consumer-group
# Output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# my-consumer orders 0 1000 1500 500
# my-consumer orders 1 2000 2100 100
# my-consumer orders 2 1500 1600 100
// Monitoring programmatique avec AdminClient
AdminClient admin = AdminClient.create(props);
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets("my-consumer-group")
.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
admin.listOffsets(/* end offsets request */).all().get();
for (TopicPartition tp : offsets.keySet()) {
long currentOffset = offsets.get(tp).offset();
long endOffset = endOffsets.get(tp).offset();
long lag = endOffset - currentOffset;
metrics.gauge("kafka.consumer.lag", lag,
"topic", tp.topic(),
"partition", String.valueOf(tp.partition()));
}
# Alerte Prometheus
- alert: KafkaConsumerLagHigh
expr: sum(kafka_consumergroup_lag) by (consumergroup, topic) > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag > 10000 for {{ $labels.consumergroup }}"
Erreur 8 : Pas de Strategie de Dead Letter Queue
Le Probleme
Ignorer les messages en erreur ou bloquer indefiniment sur un message defaillant.
// ERREUR : Boucle infinie sur message invalide
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // Si toujours en echec, on ne progresse jamais!
}
consumer.commitSync();
}
// ERREUR : Ignorer silencieusement les erreurs
try {
processRecord(record);
} catch (Exception e) {
// Rien... message perdu
}
Consequences
Sans DLQ:
Scenario 1 - Retry infini:
├─ Message invalide bloque le consumer
├─ Lag augmente indefiniment
└─ Systeme entier bloque
Scenario 2 - Ignore silencieux:
├─ Messages perdus sans trace
├─ Donnees incoherentes
└─ Impossible a debugger
Solution
// CORRECT : Pattern Dead Letter Queue
public class RobustConsumer {
private static final int MAX_RETRIES = 3;
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic;
public void consume() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processWithRetry(record);
}
consumer.commitSync();
}
}
private void processWithRetry(ConsumerRecord<String, String> record) {
int attempts = 0;
Exception lastException = null;
while (attempts < MAX_RETRIES) {
try {
processRecord(record);
return; // Succes!
} catch (RetryableException e) {
attempts++;
lastException = e;
log.warn("Retry {}/{} for record offset {}",
attempts, MAX_RETRIES, record.offset());
sleep(100 * attempts); // Backoff
} catch (NonRetryableException e) {
// Pas de retry, directement vers DLQ
sendToDLQ(record, e);
return;
}
}
// Max retries atteint
sendToDLQ(record, lastException);
}
private void sendToDLQ(ConsumerRecord<String, String> original, Exception error) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
dlqTopic,
original.key(),
original.value()
);
// Ajouter des headers avec le contexte d'erreur
dlqRecord.headers().add("original-topic", original.topic().getBytes());
dlqRecord.headers().add("original-partition",
String.valueOf(original.partition()).getBytes());
dlqRecord.headers().add("original-offset",
String.valueOf(original.offset()).getBytes());
dlqRecord.headers().add("error-message", error.getMessage().getBytes());
dlqRecord.headers().add("error-timestamp",
String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
log.error("Sent to DLQ: topic={}, partition={}, offset={}, error={}",
original.topic(), original.partition(), original.offset(), error.getMessage());
}
}
Architecture DLQ:
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Producer │────►│ orders │────►│ Consumer │
└────────────┘ └────────────┘ └─────┬──────┘
│
┌────────▼────────┐
│ Traitement OK? │
└────────┬────────┘
Oui ──────────────┴──────────── Non
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ Commit │ │ orders-dlq │
└─────────────┘ └──────┬───────┘
│
▼
┌─────────────┐
│ Analyse & │
│ Replay │
└─────────────┘
Erreur 9 : Schemas Non Versionnes
Le Probleme
Envoyer des messages sans schema ou avec des schemas incompatibles casse les consumers.
// Version 1 du message
{"orderId": "123", "amount": 99.99}
// Version 2 : Champ renomme (INCOMPATIBLE!)
{"order_id": "123", "total_amount": 99.99}
// Consumer v1 ne comprend plus les messages v2!
Consequences
Sans gestion des schemas:
1. Producer A deploie v2 avec champ renomme
2. Consumer B (non mis a jour) lit les messages
3. NPE ou donnees manquantes
4. Erreurs silencieuses ou crash
5. Donnees corrompues en base
Timeline:
─────────────────────────────────────────────────
Producer v1 → Producer v2 deploye
│
Consumer v1 ────────┼───────► CRASH!
│
Messages incompatibles
Solution
// Utiliser Schema Registry avec Avro/Protobuf
// 1. Definir le schema Avro
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"} // Avec default!
]
}
// 2. Configuration producer avec Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 3. Utiliser les classes generees
Order order = Order.newBuilder()
.setOrderId("123")
.setAmount(99.99)
.build();
producer.send(new ProducerRecord<>("orders", order.getOrderId(), order));
// 4. Configurer la compatibilite
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
Regles de compatibilite:
BACKWARD (recommande):
├─ Nouveau schema peut lire anciens messages
├─ Ajouter champs avec default OK
├─ Supprimer champs OK
└─ Consumer peut etre mis a jour avant Producer
FORWARD:
├─ Ancien schema peut lire nouveaux messages
├─ Ajouter champs OK
├─ Supprimer champs avec default OK
└─ Producer peut etre mis a jour avant Consumer
FULL:
├─ Combine BACKWARD + FORWARD
└─ Plus restrictif, plus sur
Erreur 10 : Sous-Estimer les Besoins en Monitoring
Le Probleme
Deployer Kafka en production sans monitoring adequat.
# "Ca marche" ≠ "Ca fonctionne bien"
# Sans monitoring:
# - Pas de visibilite sur la sante du cluster
# - Problemes detectes par les utilisateurs
# - Debugging en aveugle
Consequences
Sans monitoring:
Lundi 9h: Kafka "fonctionne"
Lundi 10h: Disk 85% (non detecte)
Lundi 14h: Disk 95% (non detecte)
Lundi 15h: Disk FULL!
├─ Brokers crash
├─ Donnees perdues (retention forcee)
├─ Incident critique
└─ Plusieurs heures de downtime
Avec monitoring:
Lundi 10h: Alerte "Disk > 80%"
Lundi 10h15: Augmentation retention ou cleanup
Lundi 10h30: Situation resolue
└─ Zero downtime
Solution
# Metriques essentielles a surveiller
# 1. SANTE CLUSTER
- name: kafka.controller.activecontrollercount
alert_if: != 1
severity: critical
- name: kafka.server.replicamanager.underreplicatedpartitions
alert_if: > 0
severity: warning
- name: kafka.controller.offlinepartitionscount
alert_if: > 0
severity: critical
# 2. PERFORMANCE
- name: kafka.server.brokertopicmetrics.messagesinpersec
dashboard: throughput
- name: kafka.network.requestmetrics.totaltimems
percentiles: [p50, p95, p99]
alert_if_p99: > 100ms
# 3. RESSOURCES
- name: kafka.log.size (par topic/partition)
alert_if: > 80% capacity
- name: jvm.memory.heap.used
alert_if: > 80%
- name: jvm.gc.pause
alert_if_p99: > 100ms
# 4. CONSUMERS
- name: kafka.consumer.lag
alert_if: > 10000 for 5min
severity: warning
- name: kafka.consumer.lag
alert_if: > 100000 for 5min
severity: critical
# Stack de monitoring recommandee
# 1. JMX Exporter sur chaque broker
KAFKA_JMX_OPTS="-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
# 2. Prometheus pour collecter
# 3. Grafana pour visualiser
# 4. Alertmanager pour alerter
# Dashboard essentiels:
# - Cluster Overview
# - Topic Details
# - Consumer Group Lag
# - Broker Performance
# - JVM Metrics
Resume : Checklist Anti-Erreurs
┌─────────────────────────────────────────────────────────────────┐
│ CHECKLIST KAFKA PRODUCTION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Configuration: │
│ [ ] Partitions suffisantes (>= nb consumers max) │
│ [ ] Replication factor >= 3 │
│ [ ] min.insync.replicas >= 2 │
│ [ ] acks=all pour donnees critiques │
│ │
│ Producer: │
│ [ ] Mode asynchrone avec callbacks │
│ [ ] Batching configure (batch.size, linger.ms) │
│ [ ] Compression activee (lz4) │
│ [ ] Cles de partitionnement bien choisies │
│ │
│ Consumer: │
│ [ ] Auto-commit desactive OU compris │
│ [ ] Dead Letter Queue implementee │
│ [ ] Retry avec backoff │
│ [ ] Lag surveille │
│ │
│ Schemas: │
│ [ ] Schema Registry utilise │
│ [ ] Compatibilite configuree (BACKWARD) │
│ [ ] Versionning des schemas │
│ │
│ Monitoring: │
│ [ ] Metriques JMX exportees │
│ [ ] Dashboards Grafana │
│ [ ] Alertes configurees │
│ [ ] Consumer lag surveille │
│ │
└─────────────────────────────────────────────────────────────────┘
A Retenir
- Dimensionnez les partitions des le depart (difficile a changer apres)
- Utilisez le mode asynchrone pour les producers en production
- Configurez le batching pour maximiser le throughput
- Comprenez les implications de acks et min.insync.replicas
- Choisissez bien vos cles pour eviter les hot partitions
- Committez manuellement pour controler la semantique de livraison
- Surveillez le consumer lag comme indicateur de sante
- Implementez une DLQ pour gerer les erreurs proprement
- Versionnez vos schemas avec Schema Registry
- Monitorez tout : metriques, logs, alertes
Vous rencontrez des problemes avec votre cluster Kafka ? Contactez-moi pour un audit de votre configuration et des recommandations personnalisees pour optimiser votre infrastructure.