Kafka Performance Tuning : Optimisation et Scalabilité
Introduction
Apache Kafka est conçu pour gérer des millions de messages par seconde, mais atteindre ces performances en production nécessite une compréhension approfondie de son architecture et un tuning méticuleux. La performance de Kafka n'est pas magique : elle résulte d'optimisations à tous les niveaux du stack (système d'exploitation, réseau, stockage, JVM, configuration Kafka).
Dans ce guide exhaustif, nous explorerons toutes les dimensions de l'optimisation de Kafka :
- Architecture haute performance
- Tuning du système d'exploitation (Linux)
- Optimisation des brokers
- Configuration des producteurs et consommateurs
- Stratégies de partitionnement
- Monitoring et benchmarking
Comprendre les Limites de Performance
Modèle de Performance Kafka
Kafka utilise un modèle de séquentialité qui le rend exceptionnellement rapide :
┌────────────────────────────────────────────────────┐
│ Écriture Séquentielle (Append-Only Log) │
│ │
│ Disque SSD: ~500 MB/s en séquentiel │
│ vs ~10 MB/s en aléatoire (50x plus lent) │
│ │
│ Zero-Copy Transfert (sendfile syscall) │
│ Données: Disque → Socket réseau (pas de userspace)│
└────────────────────────────────────────────────────┘
Facteurs Limitants
-
Réseau : Souvent le goulot d'étranglement
- 1 Gbps = ~120 MB/s max théorique
- 10 Gbps = ~1.2 GB/s max théorique
-
Disque : I/O séquentielles
- HDD: 100-200 MB/s
- SSD SATA: 500-600 MB/s
- NVMe SSD: 3-7 GB/s
-
CPU : Compression/décompression
- LZ4: ~500 MB/s par core
- Snappy: ~250 MB/s par core
- Gzip: ~100 MB/s par core
-
Mémoire : Page cache et buffers
- Plus de RAM = Plus de données en cache
- Moins de disk I/O
Benchmarks de Référence
Capacité Théorique (Single Broker)
Configuration:
- CPU: 16 cores
- RAM: 64 GB
- Disque: NVMe SSD (3 GB/s)
- Réseau: 10 Gbps
Messages: 1 KB, réplication factor 3, acks=all
Throughput max:
- Producteurs: ~800K msg/s (~800 MB/s)
- Consommateurs: ~1M msg/s (~1 GB/s)
Latence Typique
Percentile Latency
─────────────────────
P50 3-5 ms
P95 10-15 ms
P99 20-30 ms
P99.9 50-100 ms
Optimisation du Système d'Exploitation
Linux Kernel Tuning
1. File Descriptors
Kafka ouvre beaucoup de fichiers (logs, sockets).
# /etc/security/limits.conf
kafka soft nofile 100000
kafka hard nofile 100000
# Vérification
ulimit -n
2. Virtual Memory
# /etc/sysctl.conf
# Swappiness (éviter le swap)
vm.swappiness=1
# Dirty pages (contrôle du flush)
vm.dirty_ratio=80
vm.dirty_background_ratio=5
vm.dirty_expire_centisecs=12000
# Page cache
vm.vfs_cache_pressure=50
# Appliquer
sysctl -p
Explication des Paramètres
vm.swappiness=1
→ Ne swap que si absolument nécessaire
→ Kafka préfère RAM pour page cache
vm.dirty_ratio=80
→ Max 80% de RAM pour pages "sales" avant flush synchrone
→ Évite les pauses I/O soudaines
vm.dirty_background_ratio=5
→ Démarrer flush background à 5% de RAM sale
→ Flush progressif et continu
vm.dirty_expire_centisecs=12000
→ Flush pages sales après 120 secondes
→ Balance entre latence et throughput
3. Réseau (TCP Tuning)
# /etc/sysctl.conf
# TCP buffers (important pour hauts débits)
net.core.rmem_default=262144
net.core.rmem_max=134217728
net.core.wmem_default=262144
net.core.wmem_max=134217728
net.core.optmem_max=16777216
# TCP window scaling
net.ipv4.tcp_rmem=4096 87380 134217728
net.ipv4.tcp_wmem=4096 65536 134217728
# TCP connection queue
net.core.netdev_max_backlog=5000
net.core.somaxconn=4096
# TCP keepalive
net.ipv4.tcp_keepalive_time=600
net.ipv4.tcp_keepalive_intvl=60
net.ipv4.tcp_keepalive_probes=3
# Désactiver slow start après idle
net.ipv4.tcp_slow_start_after_idle=0
4. Système de Fichiers
XFS (Recommandé)
# Mount avec options optimisées
/dev/nvme0n1 /var/lib/kafka xfs noatime,nodiratime 0 0
# Dans /etc/fstab
UUID=xxx /var/lib/kafka xfs noatime,nodiratime,nobarrier 0 0
Options Expliquées
noatime : Ne pas mettre à jour access time
→ Réduit les écritures inutiles
nodiratime : Ne pas mettre à jour directory access time
→ Économise encore plus d'I/O
nobarrier : Désactive write barriers (si UPS présent)
→ +20-30% throughput écriture
→ ⚠️ DANGER si pas de protection coupure électrique
EXT4 (Alternative)
# Mount
/dev/sda1 /var/lib/kafka ext4 noatime,nodiratime,data=writeback 0 0
# data=writeback : Pas d'ordre garanti entre metadata et data
# → Meilleure performance mais moins de garantie en cas de crash
JVM Tuning
Garbage Collection (G1GC)
# Kafka broker JVM options
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
export KAFKA_JVM_PERFORMANCE_OPTS="
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M
-XX:MinMetaspaceSize=96m
-XX:MaxMetaspaceSize=256m
-XX:+ParallelRefProcEnabled
-XX:+ExplicitGCInvokesConcurrent
"
Taille de Heap
Règles:
1. Heap size = 25-50% de RAM totale
2. Max 6-8 GB (au-delà, GC pause augmente)
3. Reste de RAM → Page cache (crucial!)
Exemple avec 32 GB RAM:
- Heap: 6 GB
- Page cache: ~24 GB
- OS + autres: ~2 GB
GC Logging
export KAFKA_GC_LOG_OPTS="
-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M
"
Analyse des Logs GC
# Identifier les longues pauses
grep "Pause" gc.log | awk '{print $NF}' | sort -n | tail -20
# Moyenne des pauses
grep "Pause" gc.log | awk '{sum+=$NF; count++} END {print sum/count}'
Optimisation des Brokers
Configuration Broker Critique
1. Réplication et Durabilité
# server.properties
######################## Réplication ########################
# Nombre de brokers dans le cluster
broker.id=1
num.network.threads=8
num.io.threads=16
# Réplication factor par défaut (créations auto topics)
default.replication.factor=3
# Min ISR pour accepter écritures (balance durabilité/disponibilité)
min.insync.replicas=2
# Nombre de replicas pour offset commits (consumer groups)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# Leader imbalance (redistribution automatique)
auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=10
leader.imbalance.check.interval.seconds=300
Explication ISR (In-Sync Replicas)
Topic: payments (replication.factor=3, min.insync.replicas=2)
Scenario 1: Tous replicas synchronisés
Leader: Broker-1 ✅
ISR: [Broker-1, Broker-2, Broker-3]
→ Écriture OK
Scenario 2: Un replica en retard
Leader: Broker-1 ✅
ISR: [Broker-1, Broker-2]
Follower hors ISR: Broker-3 (lag > replica.lag.time.max.ms)
→ Écriture OK (min.insync.replicas=2 respecté)
Scenario 3: Deux replicas down
Leader: Broker-1 ✅
ISR: [Broker-1]
→ Écriture REFUSÉE (NotEnoughReplicasException)
→ Garantit durabilité
2. Log Segments et Retention
######################## Log Management ########################
# Taille max d'un segment (défaut: 1 GB)
log.segment.bytes=1073741824
# Temps avant rotation d'un segment (défaut: 7 jours)
log.roll.hours=168
# Retention par temps (7 jours)
log.retention.hours=168
# Retention par taille (ex: 500 GB par partition)
log.retention.bytes=536870912000
# Fréquence de vérification pour suppression (5 min)
log.retention.check.interval.ms=300000
# Compression des anciens segments
log.cleanup.policy=delete
# Ou pour compaction (topics avec clés)
# log.cleanup.policy=compact
# Index interval (balance RAM vs lookup speed)
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
Impact de la Taille des Segments
Segment Size = 1 GB (défaut)
├─ Avantages:
│ ├─ Moins de fichiers ouverts
│ └─ Moins de overhead metadata
└─ Inconvénients:
├─ Retention moins granulaire
└─ Plus long à compacter
Segment Size = 256 MB (haute volumétrie)
├─ Avantages:
│ ├─ Rotation fréquente
│ ├─ Deletion/Compaction plus rapide
│ └─ Meilleure granularité retention
└─ Inconvénients:
└─ Plus de fichiers (attention file descriptors!)
3. Flush et Durabilité
######################## Flush Configuration ########################
# NE PAS flush après chaque message (laisse OS gérer)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=
# Laisse le page cache faire le travail
# Kafka fait confiance à:
# 1. Réplication (min.insync.replicas)
# 2. OS page cache flush
# 3. Filesystem journaling
# ⚠️ Ne configurer flush que si vraiment nécessaire:
# log.flush.interval.messages=10000
# → Impact négatif sur performance
4. Compression
######################## Compression ########################
# Type de compression par défaut
compression.type=producer
# Options: none, gzip, snappy, lz4, zstd
# Recommandation: lz4 (bon ratio compression/CPU)
Comparaison des Algorithmes
| Algo | Ratio Compression | Vitesse Compress | Vitesse Decompress | CPU Usage | |------|-------------------|------------------|---------------------|-----------| | none | 1.0x | N/A | N/A | Aucun | | lz4 | 2.0x | ⭐⭐⭐⭐⭐ 500MB/s | ⭐⭐⭐⭐⭐ 2GB/s | Faible | | snappy | 2.5x | ⭐⭐⭐⭐ 250MB/s | ⭐⭐⭐⭐ 500MB/s | Moyen | | zstd | 3.5x | ⭐⭐⭐ 150MB/s | ⭐⭐⭐⭐ 600MB/s | Moyen | | gzip | 4.0x | ⭐⭐ 100MB/s | ⭐⭐⭐ 300MB/s | Élevé |
Recommandation
Cas d'usage Compression
────────────────────────────────────
Logs JSON lz4 ou zstd
Données binaires lz4
Très haut débit none ou lz4
Stockage limité zstd ou gzip
CPU limité lz4 ou none
5. Networking
######################## Network Threading ########################
# Threads réseau (acceptent connexions)
num.network.threads=8
# Threads I/O (lecture/écriture disque)
num.io.threads=16
# Règle générale:
# num.network.threads = nombre de cores / 2
# num.io.threads = nombre de cores
# Taille des buffers socket
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# Max taille requête (protection DoS)
socket.request.max.bytes=104857600
# Queued requests max
queued.max.requests=500
# Nombre de connections max par IP
max.connections.per.ip=2147483647
# Connections max totales
max.connections=10000
Stratégies de Partitionnement
Nombre Optimal de Partitions
Formule de départ:
Partitions = max(T_producer / T_p, T_consumer / T_c)
Où:
T_producer = Throughput cible producteur (MB/s)
T_p = Throughput max par partition producteur (~10-30 MB/s)
T_consumer = Throughput cible consommateur (MB/s)
T_c = Throughput max par partition consommateur (~50-100 MB/s)
Exemple:
Besoin: 300 MB/s en écriture
T_p = 15 MB/s (mesuré)
Partitions = 300 / 15 = 20 partitions
Impacts du Nombre de Partitions
Trop peu de partitions (ex: 3):
❌ Parallélisme limité
❌ Consumer groups limités à 3 instances max
❌ Goulot d'étranglement throughput
Trop de partitions (ex: 500):
❌ Overhead metadata (ZooKeeper/KRaft)
❌ Leader election plus lente
❌ Plus de fichiers ouverts
❌ End-to-end latency augmentée
Bonnes Pratiques
# Topic haute volumétrie (logs)
kafka-topics --create \
--topic app-logs \
--partitions 50 \
--replication-factor 3
# Topic transactionnel (orders)
kafka-topics --create \
--topic orders \
--partitions 12 \
--replication-factor 3
# Topic low-volume (config)
kafka-topics --create \
--topic app-config \
--partitions 1 \
--replication-factor 3
Répartition des Partitions
# Distribution optimale sur 5 brokers
kafka-topics --create \
--topic users \
--partitions 30 \
--replication-factor 3 \
--replica-assignment \
"0:1:2,1:2:3,2:3:4,3:4:0,4:0:1,
0:2:3,1:3:4,2:4:0,3:0:1,4:1:2,
..."
# Ou laisser Kafka auto-assigner (recommandé)
kafka-topics --create \
--topic users \
--partitions 30 \
--replication-factor 3
Optimisation des Producteurs
Configuration Producteur Haute Performance
Properties props = new Properties();
// =============== Connexion ===============
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "high-perf-producer-1");
// =============== Sérialisation ===============
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
// =============== Durabilité vs Performance ===============
// acks=all (plus sûr, latence +2-3ms)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// acks=1 (leader seulement, latence minimale)
// props.put(ProducerConfig.ACKS_CONFIG, "1");
// acks=0 (fire-and-forget, max throughput)
// props.put(ProducerConfig.ACKS_CONFIG, "0");
// =============== Batching (CRITIQUE!) ===============
// Taille max d'un batch (16 KB → 1 MB pour high throughput)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576); // 1 MB
// Délai d'attente avant envoi batch (0 → 100ms)
// Trade-off latence vs throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms
// =============== Compression ===============
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// =============== Buffer Memory ===============
// Mémoire totale pour buffering (32 MB → 256 MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64 MB
// =============== Retry et Timeout ===============
// Retry automatique (2147483647 = infini)
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Enable idempotence (évite duplicates en cas de retry)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Timeouts
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
// =============== Partitioning ===============
// Partitioner par défaut (hash de la clé)
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
Impact du Batching
Sans Batching Optimisé
batch.size=16384 (16 KB)
linger.ms=0 (envoi immédiat)
Résultat:
- Throughput: 50K msg/s
- Latency P99: 5ms
- Network: Beaucoup de petites requêtes
Avec Batching Optimisé
batch.size=1048576 (1 MB)
linger.ms=10 (attente 10ms)
Résultat:
- Throughput: 500K msg/s (10x!)
- Latency P99: 15ms (+10ms acceptable)
- Network: Peu de grosses requêtes
Tuning selon Cas d'Usage
// CAS 1: Ultra-faible latence (trading, alertes)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // Immédiat
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
// CAS 2: Haut débit (logs, événements)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576); // 1 MB
props.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 100ms
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// CAS 3: Balance (transactionnel)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144); // 256 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Producteur Asynchrone vs Synchrone
Producteur Synchrone (Lent)
// ❌ Bloque sur chaque send()
for (int i = 0; i < 1000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
Future<RecordMetadata> future = producer.send(record);
future.get(); // BLOQUE ici ! (~5-10ms par message)
}
// Throughput: ~100-200 msg/s (terrible)
Producteur Asynchrone (Rapide)
// ✅ Fire-and-forget avec callback
for (int i = 0; i < 1000000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Log erreur
log.error("Send failed", exception);
} else {
// Success (optionnel)
// log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
}
});
}
// Throughput: 500K+ msg/s
Custom Partitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (key == null) {
// Round-robin pour messages sans clé
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// Exemple: Router VIP users vers partition dédiée
String keyString = (String) key;
if (keyString.startsWith("vip-")) {
return 0; // Partition 0 réservée VIP
}
// Hash standard pour autres
return Math.abs(keyString.hashCode()) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
// Utilisation
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
Optimisation des Consommateurs
Configuration Consommateur Haute Performance
Properties props = new Properties();
// =============== Connexion ===============
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-perf-consumer-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
// =============== Sérialisation ===============
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
// =============== Fetch Configuration (CRITIQUE!) ===============
// Taille min avant retour (1 byte → 1 MB)
// Augmenter = moins de requêtes, meilleur throughput
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB
// Temps d'attente max si fetch.min.bytes pas atteint
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms
// Taille max d'un fetch (50 MB → 100 MB)
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50 MB
// Taille max par partition dans un fetch (1 MB → 10 MB)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1 MB
// =============== Poll Configuration ===============
// Nombre max de records par poll() (500 → 5000)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
// Timeout max entre deux poll() avant considéré mort
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 min
// =============== Offset Management ===============
// Auto-commit des offsets (false pour contrôle manuel)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Si auto-commit=true, fréquence de commit
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// Point de démarrage si pas d'offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Options: earliest, latest, none
// =============== Session et Heartbeat ===============
// Timeout session avant rebalance (10s → 30s)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Intervalle heartbeat (1/3 de session.timeout)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
Impact de fetch.min.bytes
fetch.min.bytes = 1 (défaut)
Comportement:
- Retourne données dès qu'au moins 1 byte disponible
- Beaucoup de requêtes réseau
- Latence minimale
Throughput: Moyen
Latence: Très faible (1-2ms)
CPU: Élevé (beaucoup d'appels système)
fetch.min.bytes = 1 MB
Comportement:
- Attend 1 MB ou fetch.max.wait.ms (500ms)
- Moins de requêtes réseau
- Batching côté broker
Throughput: Élevé (+50-100%)
Latence: Moyenne (+500ms max)
CPU: Faible
Trade-off Optimal
// Haute latence acceptable (batch processing)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// Faible latence requise (real-time)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);
Pattern de Consommation Optimisé
Consumer Loop Standard
consumer.subscribe(Arrays.asList("orders"));
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
processRecord(record);
}
// Commit manuel synchrone (bloquant mais sûr)
consumer.commitSync();
}
} finally {
consumer.close();
}
Consumer Asynchrone Haute Performance
consumer.subscribe(Arrays.asList("orders"));
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<>();
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
// Process records en parallèle
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, byte[]> record : records) {
futures.add(executor.submit(() -> {
processRecord(record);
// Track offset
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}));
}
// Attendre fin du traitement
for (Future<?> future : futures) {
future.get();
}
// Commit asynchrone (non-bloquant)
consumer.commitAsync(currentOffsets, (offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
}
} finally {
executor.shutdown();
consumer.close();
}
Consumer Groups et Parallélisme
Scaling Horizontal
Topic: orders (20 partitions, replication.factor=3)
Consumer Group: order-processors
├─ Instance 1: Partitions [0, 1, 2, 3, 4]
├─ Instance 2: Partitions [5, 6, 7, 8, 9]
├─ Instance 3: Partitions [10, 11, 12, 13, 14]
└─ Instance 4: Partitions [15, 16, 17, 18, 19]
Throughput total = 4 × throughput_instance
Rebalancing (Coût)
Événement déclencheur:
- Nouveau consumer rejoint
- Consumer quitte (crash, shutdown)
- Nouveau partitions ajoutées
Durée rebalance:
- 3-10 secondes (cluster normal)
- 30-60 secondes (gros cluster, 100+ consumers)
Impact:
- STOP de tous les consumers du groupe
- Pas de consommation pendant rebalance
- Perte temporaire de throughput
Minimiser Rebalancing
// Augmenter session timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); // 60s
// Augmenter max.poll.interval (si traitement lent)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 10min
// Sticky assignor (minimise reassignment)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
StickyAssignor.class.getName());
Monitoring et Benchmarking
Métriques JMX Critiques
Broker Metrics
# Messages in per second
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
# Bytes in/out per second
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
# Request latency (produce, fetch)
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
# Under-replicated partitions (CRITIQUE!)
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
# Offline partitions (ALERTE!)
kafka.controller:type=KafkaController,name=OfflinePartitionsCount
# Active controller count (doit être 1)
kafka.controller:type=KafkaController,name=ActiveControllerCount
Producer Metrics
// Accès aux métriques du producer
Map<MetricName, ? extends Metric> metrics = producer.metrics();
// Métriques clés:
// - record-send-rate: Messages/s
// - record-send-total: Total messages
// - batch-size-avg: Taille moyenne des batches
// - compression-rate-avg: Ratio compression
// - record-error-rate: Taux d'erreur
// - buffer-available-bytes: Mémoire buffer disponible
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
System.out.printf("%s: %s%n", entry.getKey().name(), entry.getValue().metricValue());
}
Consumer Metrics
// Métriques clés:
// - records-consumed-rate: Records/s
// - fetch-latency-avg: Latence moyenne fetch
// - fetch-size-avg: Taille moyenne fetch
// - records-lag-max: Lag max sur partitions
// - commit-latency-avg: Latence commit
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
Kafka Performance Test Tool
kafka-producer-perf-test
# Test production: 1M messages, 1KB chacun
kafka-producer-perf-test \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092 \
acks=all \
batch.size=1048576 \
linger.ms=10 \
compression.type=lz4
# Résultat:
# 500000 records sent, 99850.0 records/sec (97.51 MB/sec),
# 1250.2 ms avg latency, 2531.0 ms max latency
Paramètres de Test
# Ultra-haute performance (acks=1, gros batches)
kafka-producer-perf-test \
--topic perf-test \
--num-records 10000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
acks=1 \
batch.size=1048576 \
linger.ms=100 \
compression.type=lz4 \
buffer.memory=67108864
# Durabilité max (acks=all, petits batches)
kafka-producer-perf-test \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
acks=all \
batch.size=16384 \
linger.ms=0 \
compression.type=none
kafka-consumer-perf-test
# Test consommation
kafka-consumer-perf-test \
--broker-list localhost:9092 \
--topic perf-test \
--messages 1000000 \
--threads 1 \
--consumer-props \
fetch.min.bytes=1048576 \
fetch.max.wait.ms=500
# Résultat:
# data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
# nMsg.sec, rebalance.time.ms, fetch.time.ms, ...
# 976.5625, 195.3125, 1000000, 200000, 3024, 5000
Dashboard Grafana
Query Prometheus
# Throughput entrant (MB/s)
sum(rate(kafka_server_brokertopicmetrics_bytesinpersec_count[1m])) / 1024 / 1024
# Throughput sortant (MB/s)
sum(rate(kafka_server_brokertopicmetrics_bytesoutpersec_count[1m])) / 1024 / 1024
# Latency P99 produce
histogram_quantile(0.99,
sum(rate(kafka_network_requestmetrics_totaltimems_bucket{request="Produce"}[5m]))
by (le)
)
# Under-replicated partitions (alerte si > 0)
kafka_server_replicamanager_underreplicatedpartitions
# Consumer lag max
max(kafka_consumer_fetch_manager_metrics_records_lag_max)
Cas d'Usage et Configurations
Haute Disponibilité (Financial Trading)
Contraintes
- Latence P99 < 10ms
- Zéro perte de données
- Availability > 99.99%
Configuration
# Broker
min.insync.replicas=2
default.replication.factor=3
unclean.leader.election.enable=false
# Producer
acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=2147483647
# Consumer
enable.auto.commit=false
isolation.level=read_committed
Haut Débit (Log Aggregation)
Contraintes
- Throughput > 500 MB/s
- Latence acceptable (< 100ms P99)
- Retention courte (24h)
Configuration
# Broker
num.network.threads=16
num.io.threads=32
log.segment.bytes=268435456 # 256 MB
log.retention.hours=24
# Producer
acks=1
batch.size=1048576 # 1 MB
linger.ms=100
compression.type=lz4
buffer.memory=134217728 # 128 MB
# Consumer
fetch.min.bytes=1048576 # 1 MB
fetch.max.wait.ms=500
max.poll.records=10000
IoT / Edge Computing
Contraintes
- Bandwidth limité
- CPU limité
- Stockage limité
Configuration
# Broker
log.retention.bytes=10737418240 # 10 GB max
log.segment.bytes=104857600 # 100 MB
compression.type=zstd # Meilleur ratio
# Producer
acks=1
batch.size=524288 # 512 KB
linger.ms=1000 # 1s (plus de batching)
compression.type=zstd
# Consumer
fetch.min.bytes=524288
fetch.max.wait.ms=1000
Checklist d'Optimisation
Niveau Système (Linux)
- [ ] File descriptors: ulimit -n ≥ 100000
- [ ] vm.swappiness = 1
- [ ] vm.dirty_ratio = 80
- [ ] TCP buffers augmentés
- [ ] XFS avec noatime,nodiratime
- [ ] Désactiver THP (Transparent Huge Pages)
Niveau JVM
- [ ] Heap: 6-8 GB max
- [ ] G1GC configuré
- [ ] GC logs activés
- [ ] -XX:MaxGCPauseMillis=20
Niveau Broker
- [ ] num.io.threads = nombre cores
- [ ] num.network.threads = cores / 2
- [ ] log.segment.bytes approprié
- [ ] Compression activée (lz4)
- [ ] Réplication factor ≥ 3
Niveau Topic
- [ ] Partitions = throughput / 10-30 MB/s
- [ ] min.insync.replicas ≥ 2
- [ ] Retention appropriée
Niveau Producteur
- [ ] batch.size ≥ 256 KB
- [ ] linger.ms = 10-100ms
- [ ] acks selon besoins
- [ ] Compression lz4
- [ ] Asynchrone avec callbacks
Niveau Consommateur
- [ ] fetch.min.bytes ≥ 1 MB
- [ ] max.poll.records optimisé
- [ ] Traitement parallèle
- [ ] Commit asynchrone
Monitoring
- [ ] JMX metrics exportées
- [ ] Grafana dashboards
- [ ] Alertes configurées
- [ ] Consumer lag surveillé
Conclusion
L'optimisation de Kafka est un processus itératif qui nécessite :
- Comprendre votre workload : Throughput cible, latence acceptable, durabilité requise
- Mesurer d'abord : Benchmarker avant toute optimisation
- Optimiser par couches : OS → JVM → Broker → Application
- Tester en production : Les benchmarks ne remplacent pas le réel
- Monitorer en continu : Les performances dégradent avec le temps
Gains Typiques
Configuration Naïve:
- 50K msg/s
- P99 latency: 50ms
Configuration Optimisée:
- 500K msg/s (10x)
- P99 latency: 15ms (3x mieux)
Les optimisations décrites dans ce guide vous permettront d'atteindre ces performances. Kafka peut gérer des millions de messages par seconde quand correctement configuré.
Ressources
Bonne optimisation ! 🚀