Atteindre 1 Million de Messages par Seconde avec Kafka
Introduction
Atteindre 1 million de messages par seconde avec Apache Kafka n'est pas un mythe marketing. C'est un objectif atteignable avec une compréhension approfondie de l'architecture, un matériel adapté et un tuning méticuleux. Ce guide vous accompagne étape par étape pour atteindre ces performances extrêmes.
Dans cet article, nous explorerons :
- La méthodologie de benchmark rigoureuse
- Les configurations matérielles recommandées
- Le tuning avancé à tous les niveaux du stack
- Les résultats réels et reproductibles
Prérequis et Contexte
Comprendre les Unités de Mesure
Avant tout, clarifions les métriques :
Messages/seconde (msg/s) : Nombre de messages
├─ 1M msg/s avec messages de 100 bytes = 100 MB/s
├─ 1M msg/s avec messages de 1 KB = 1 GB/s
└─ 1M msg/s avec messages de 10 KB = 10 GB/s
Throughput (MB/s ou GB/s) : Volume de données
├─ Plus représentatif de la charge réelle
└─ Détermine les besoins réseau/disque
Facteurs Limitants Identifiés
┌─────────────────────────────────────────────────────────────┐
│ GOULOTS D'ÉTRANGLEMENT │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. RÉSEAU (souvent le premier) │
│ └─ 10 Gbps = ~1.2 GB/s théorique │
│ │
│ 2. DISQUE (I/O séquentielles) │
│ └─ NVMe SSD = 3-7 GB/s │
│ │
│ 3. CPU (compression/sérialisation) │
│ └─ LZ4 = ~500 MB/s par core │
│ │
│ 4. MÉMOIRE (page cache) │
│ └─ Plus de RAM = moins de disk I/O │
│ │
└─────────────────────────────────────────────────────────────┘
Méthodologie de Benchmark
Principes Fondamentaux
Un benchmark Kafka fiable doit respecter ces règles :
1. Isoler les Variables
# Test un paramètre à la fois
# Baseline → Change batch.size → Mesure → Restore → Change linger.ms → Mesure
2. Warm-up Obligatoire
# Les premières minutes ne comptent pas
# - JIT compilation (JVM)
# - Page cache population
# - Connection pool warmup
3. Durée Suffisante
# Minimum 5-10 minutes par test
# Évite les pics transitoires
# Capture les variations GC
4. Mesures Multiples
# Au moins 3 runs par configuration
# Moyenne + écart-type
# Éliminer les outliers
Environnement de Benchmark
Infrastructure de Test
# Architecture recommandée pour benchmark
Cluster Kafka:
Brokers: 3
Hardware par broker:
CPU: 16+ cores (AMD EPYC ou Intel Xeon)
RAM: 64 GB
Disque: 2x NVMe SSD 1TB (RAID 0)
Réseau: 25 Gbps
Machines de Test:
Producers: 3 machines dédiées
Consumers: 3 machines dédiées
Hardware:
CPU: 8+ cores
RAM: 32 GB
Réseau: 25 Gbps
Configuration Réseau
# Vérifier la bande passante disponible
iperf3 -s # Sur broker
iperf3 -c broker-ip -t 60 -P 4 # Sur client
# Résultat attendu: ~24 Gbps sur 25 Gbps interface
Outils de Benchmark
kafka-producer-perf-test (Natif)
# Test baseline
kafka-producer-perf-test \
--topic benchmark-topic \
--num-records 10000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
acks=1 \
batch.size=16384 \
linger.ms=0
# Output:
# 10000000 records sent, 523456.7 records/sec (511.18 MB/sec),
# 45.2 ms avg latency, 234.0 ms max latency
OpenMessaging Benchmark (Avancé)
# Clone du benchmark framework
git clone https://github.com/openmessaging/benchmark.git
cd benchmark
# Build
mvn clean install -DskipTests
# Run avec driver Kafka
bin/benchmark \
--drivers driver-kafka/kafka.yaml \
--workloads workloads/1-topic-16-partitions-1kb.yaml
Script de Benchmark Personnalisé
#!/bin/bash
# benchmark-kafka.sh
BOOTSTRAP="kafka1:9092,kafka2:9092,kafka3:9092"
TOPIC="benchmark-1m"
RECORDS=50000000
RECORD_SIZE=1024
# Créer le topic
kafka-topics --create \
--bootstrap-server $BOOTSTRAP \
--topic $TOPIC \
--partitions 100 \
--replication-factor 3 \
--config min.insync.replicas=2
# Test avec différentes configurations
for BATCH_SIZE in 16384 65536 262144 1048576; do
for LINGER_MS in 0 5 10 50 100; do
echo "Testing batch.size=$BATCH_SIZE, linger.ms=$LINGER_MS"
kafka-producer-perf-test \
--topic $TOPIC \
--num-records $RECORDS \
--record-size $RECORD_SIZE \
--throughput -1 \
--producer-props \
bootstrap.servers=$BOOTSTRAP \
acks=1 \
batch.size=$BATCH_SIZE \
linger.ms=$LINGER_MS \
compression.type=lz4 \
2>&1 | tee results/batch${BATCH_SIZE}_linger${LINGER_MS}.log
sleep 30 # Cooldown
done
done
Configuration Matérielle Optimale
Dimensionnement CPU
Besoins CPU par composant:
┌──────────────────────────────────────────────────────┐
│ Broker Kafka │
├──────────────────────────────────────────────────────┤
│ Network threads: 1 core par thread │
│ I/O threads: 1 core par thread │
│ Compression: 1-2 cores supplémentaires │
│ GC: 1-2 cores pour G1GC │
│ │
│ Recommandation: 16-32 cores par broker │
└──────────────────────────────────────────────────────┘
Configuration Recommandée
# server.properties
# Pour 32 cores
num.network.threads=16
num.io.threads=32
num.replica.fetchers=4
Dimensionnement Mémoire
Distribution RAM (64 GB):
┌──────────────────────────────────────────────────────┐
│ │
│ JVM Heap: 6-8 GB │
│ ├─ Au-delà, GC pauses augmentent │
│ └─ Kafka n'a pas besoin de plus │
│ │
│ Page Cache: ~54 GB │
│ ├─ Cache des segments de log │
│ ├─ Évite les disk reads │
│ └─ CRUCIAL pour performance │
│ │
│ OS + Overhead: ~2-4 GB │
│ │
└──────────────────────────────────────────────────────┘
Règle d'Or
# Page cache doit contenir au minimum:
# - Active segments de tous les topics
# - Index files
# - Messages "chauds" (récemment écrits)
# Calcul approximatif:
# RAM_page_cache = partitions * segment_size * 2
# Ex: 100 partitions * 1 GB * 2 = 200 GB (idéal)
# Minimum: 32-64 GB de page cache
Dimensionnement Stockage
NVMe SSD : Le Standard
# Caractéristiques requises
IOPS séquentielles: >500K read, >400K write
Throughput: >3 GB/s read, >2 GB/s write
Endurance: >1 DWPD (Drive Writes Per Day)
Capacité: Selon retention (ex: 7 jours * 1 GB/s = 604 TB)
Configuration RAID
# Option 1: RAID 0 (Performance max)
# ⚠️ Risque: Perte totale si un disque fail
# ✅ Acceptable car Kafka réplique déjà
mdadm --create /dev/md0 --level=0 --raid-devices=2 /dev/nvme0n1 /dev/nvme1n1
# Option 2: JBOD (Just a Bunch Of Disks)
# Kafka gère le striping via log.dirs
log.dirs=/data/kafka1,/data/kafka2
Mount Options Optimales
# /etc/fstab
/dev/md0 /var/lib/kafka xfs noatime,nodiratime,nobarrier,allocsize=64m 0 0
# Vérification
mount | grep kafka
# /dev/md0 on /var/lib/kafka type xfs (rw,noatime,nodiratime,nobarrier,allocsize=64m)
Dimensionnement Réseau
Calcul bande passante:
┌──────────────────────────────────────────────────────┐
│ 1M msg/s * 1 KB = 1 GB/s en entrée │
│ │
│ Avec réplication factor 3: │
│ - Inter-broker: 1 GB/s * 2 = 2 GB/s │
│ - Consommation: 1 GB/s (si 1x) │
│ │
│ Total par broker: ~4 GB/s = 32 Gbps │
│ │
│ Recommandation: 25 Gbps minimum, 100 Gbps idéal │
└──────────────────────────────────────────────────────┘
Tuning Niveau Système (Linux)
Optimisation Kernel
# /etc/sysctl.conf
# === MÉMOIRE ===
vm.swappiness=1
vm.dirty_ratio=80
vm.dirty_background_ratio=5
vm.dirty_expire_centisecs=12000
vm.dirty_writeback_centisecs=500
vm.vfs_cache_pressure=50
# === RÉSEAU ===
# TCP buffers (pour 25 Gbps)
net.core.rmem_default=134217728
net.core.rmem_max=134217728
net.core.wmem_default=134217728
net.core.wmem_max=134217728
net.core.optmem_max=40960
net.ipv4.tcp_rmem=4096 87380 134217728
net.ipv4.tcp_wmem=4096 65536 134217728
# Connection handling
net.core.netdev_max_backlog=50000
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=30000
net.ipv4.tcp_max_tw_buckets=2000000
net.ipv4.tcp_tw_reuse=1
net.ipv4.tcp_fin_timeout=10
# Disable slow start after idle
net.ipv4.tcp_slow_start_after_idle=0
# === FILE DESCRIPTORS ===
fs.file-max=2097152
fs.nr_open=2097152
# Appliquer
sysctl -p
Limites Système
# /etc/security/limits.conf
kafka soft nofile 1048576
kafka hard nofile 1048576
kafka soft nproc 65536
kafka hard nproc 65536
kafka soft memlock unlimited
kafka hard memlock unlimited
# Vérification
su - kafka -c "ulimit -n"
# 1048576
Désactiver Transparent Huge Pages
# THP cause des latency spikes avec Kafka
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
# Permanent via GRUB
# /etc/default/grub
GRUB_CMDLINE_LINUX="transparent_hugepage=never"
update-grub
Tuning JVM Avancé
Configuration G1GC Optimisée
# kafka-server-start.sh ou via KAFKA_HEAP_OPTS
export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
export KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16m
-XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80
-XX:+ParallelRefProcEnabled
-XX:+ExplicitGCInvokesConcurrent
-XX:+AlwaysPreTouch
-XX:-UseBiasedLocking
-Djava.awt.headless=true
"
Explication des Paramètres
-XX:MaxGCPauseMillis=20
├─ Objectif: pauses < 20ms
└─ G1 adapte son comportement pour respecter
-XX:InitiatingHeapOccupancyPercent=35
├─ Déclenche GC à 35% heap utilisé
└─ Évite les Full GC tardifs
-XX:G1HeapRegionSize=16m
├─ Taille des régions G1
└─ Optimal pour heap 6-8 GB
-XX:+AlwaysPreTouch
├─ Alloue toute la heap au démarrage
└─ Évite les page faults en runtime
-XX:-UseBiasedLocking
├─ Désactive biased locking
└─ Réduit les pauses de safepoint
GC Logging
export KAFKA_GC_LOG_OPTS="
-Xlog:gc*:file=/var/log/kafka/gc.log:time,uptime,level,tags:filecount=10,filesize=100M
-Xlog:safepoint:file=/var/log/kafka/safepoint.log:time,uptime,level,tags:filecount=5,filesize=50M
"
Configuration Broker pour 1M msg/s
server.properties Optimisé
############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka1.example.com:9092
############################# Threading #############################
# Threads réseau (connections entrantes)
num.network.threads=16
# Threads I/O (disk operations)
num.io.threads=32
# Threads pour réplication
num.replica.fetchers=8
# Background threads
background.threads=10
############################# Socket Settings #############################
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
queued.max.requests=1000
############################# Log Basics #############################
log.dirs=/data/kafka1,/data/kafka2
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
############################# Log Segment #############################
log.segment.bytes=1073741824
log.index.size.max.bytes=10485760
log.index.interval.bytes=4096
############################# Log Retention #############################
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms=300000
############################# Log Flush Policy #############################
# Laisser OS gérer (via page cache)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=9223372036854775807
############################# Replication #############################
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.socket.receive.buffer.bytes=1048576
############################# Compression #############################
compression.type=producer
############################# ZooKeeper (ou KRaft) #############################
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=18000
zookeeper.session.timeout.ms=18000
Configuration du Topic de Benchmark
# Topic optimisé pour haut débit
kafka-topics --create \
--bootstrap-server kafka1:9092 \
--topic high-throughput-topic \
--partitions 100 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config segment.bytes=536870912 \
--config retention.ms=86400000 \
--config compression.type=lz4
Configuration Producer pour 1M msg/s
Producer Java Optimisé
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
public class HighThroughputProducer {
public static KafkaProducer<String, byte[]> createProducer() {
Properties props = new Properties();
// === Connexion ===
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "high-throughput-producer");
// === Sérialisation ===
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
// === Durabilité (trade-off avec performance) ===
// acks=1 : Leader only (meilleur throughput)
// acks=all : Tous ISR (meilleure durabilité)
props.put(ProducerConfig.ACKS_CONFIG, "1");
// === Batching (CRITIQUE!) ===
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576); // 1 MB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms max wait
// === Compression ===
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// === Buffer ===
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 268435456); // 256 MB
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
// === In-flight Requests ===
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// === Retry ===
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// === Timeout ===
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws Exception {
KafkaProducer<String, byte[]> producer = createProducer();
String topic = "high-throughput-topic";
byte[] payload = new byte[1024]; // 1 KB message
AtomicLong successCount = new AtomicLong(0);
AtomicLong errorCount = new AtomicLong(0);
long startTime = System.currentTimeMillis();
long totalMessages = 10_000_000;
for (long i = 0; i < totalMessages; i++) {
ProducerRecord<String, byte[]> record =
new ProducerRecord<>(topic, null, payload);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
successCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
});
// Progress logging
if (i % 1_000_000 == 0 && i > 0) {
double elapsed = (System.currentTimeMillis() - startTime) / 1000.0;
double rate = i / elapsed;
System.out.printf("Sent %d messages, %.0f msg/s%n", i, rate);
}
}
producer.flush();
producer.close();
long endTime = System.currentTimeMillis();
double totalSeconds = (endTime - startTime) / 1000.0;
double messagesPerSecond = totalMessages / totalSeconds;
System.out.printf("Total: %d messages in %.2f seconds%n",
totalMessages, totalSeconds);
System.out.printf("Throughput: %.0f msg/s (%.2f MB/s)%n",
messagesPerSecond, messagesPerSecond * 1024 / 1_000_000);
System.out.printf("Success: %d, Errors: %d%n",
successCount.get(), errorCount.get());
}
}
Multi-Threaded Producer
import java.util.concurrent.*;
public class MultiThreadedProducer {
private static final int NUM_THREADS = 10;
private static final int MESSAGES_PER_THREAD = 1_000_000;
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CountDownLatch latch = new CountDownLatch(NUM_THREADS);
AtomicLong totalMessages = new AtomicLong(0);
long startTime = System.currentTimeMillis();
for (int t = 0; t < NUM_THREADS; t++) {
final int threadId = t;
executor.submit(() -> {
try {
KafkaProducer<String, byte[]> producer = createProducer();
byte[] payload = new byte[1024];
for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
producer.send(new ProducerRecord<>("topic", null, payload));
totalMessages.incrementAndGet();
}
producer.flush();
producer.close();
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
long endTime = System.currentTimeMillis();
double seconds = (endTime - startTime) / 1000.0;
double rate = totalMessages.get() / seconds;
System.out.printf("Total: %d msg in %.2f s = %.0f msg/s%n",
totalMessages.get(), seconds, rate);
}
}
Configuration Consumer pour 1M msg/s
Consumer Haute Performance
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class HighThroughputConsumer {
public static KafkaConsumer<String, byte[]> createConsumer() {
Properties props = new Properties();
// === Connexion ===
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
// === Désérialisation ===
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
// === Fetch Configuration (CRITIQUE!) ===
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50 MB
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760); // 10 MB
// === Poll Configuration ===
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// === Offset Management ===
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// === Session ===
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
return new KafkaConsumer<>(props);
}
public static void main(String[] args) {
KafkaConsumer<String, byte[]> consumer = createConsumer();
consumer.subscribe(Collections.singletonList("high-throughput-topic"));
long messageCount = 0;
long byteCount = 0;
long startTime = System.currentTimeMillis();
try {
while (true) {
ConsumerRecords<String, byte[]> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
messageCount++;
byteCount += record.value().length;
}
// Progress logging
if (messageCount % 1_000_000 == 0 && messageCount > 0) {
double elapsed = (System.currentTimeMillis() - startTime) / 1000.0;
double msgRate = messageCount / elapsed;
double mbRate = byteCount / elapsed / 1_000_000;
System.out.printf("Consumed %d messages, %.0f msg/s, %.2f MB/s%n",
messageCount, msgRate, mbRate);
}
}
} finally {
consumer.close();
}
}
}
Résultats de Benchmark
Configuration de Test
Hardware:
- 3x Brokers: 32 cores, 128 GB RAM, 2x NVMe SSD
- 5x Producer machines: 16 cores, 64 GB RAM
- 5x Consumer machines: 16 cores, 64 GB RAM
- Network: 25 Gbps entre toutes les machines
Topic:
- Partitions: 100
- Replication factor: 3
- Message size: 1 KB
Résultats Production
┌─────────────────────────────────────────────────────────────┐
│ RÉSULTATS BENCHMARK │
├─────────────────────────────────────────────────────────────┤
│ │
│ Configuration: acks=1, batch=1MB, linger=10ms, lz4 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Messages/sec: 1,247,000 │ │
│ │ Throughput: 1.19 GB/s │ │
│ │ Latency P50: 8 ms │ │
│ │ Latency P99: 23 ms │ │
│ │ Latency P99.9: 45 ms │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Configuration: acks=all, batch=1MB, linger=10ms, lz4 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Messages/sec: 892,000 │ │
│ │ Throughput: 871 MB/s │ │
│ │ Latency P50: 12 ms │ │
│ │ Latency P99: 35 ms │ │
│ │ Latency P99.9: 78 ms │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Résultats Consommation
┌─────────────────────────────────────────────────────────────┐
│ CONSOMMATION BENCHMARK │
├─────────────────────────────────────────────────────────────┤
│ │
│ Consumer Group: 10 instances, 100 partitions │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Messages/sec: 2,100,000 │ │
│ │ Throughput: 2.0 GB/s │ │
│ │ Fetch latency: 45 ms avg │ │
│ │ Consumer lag: < 10,000 messages │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Note: Consommation > Production car lecture depuis cache │
│ │
└─────────────────────────────────────────────────────────────┘
Impact des Paramètres
Impact de batch.size:
┌────────────┬─────────────┬────────────┐
│ batch.size │ Throughput │ Latency │
├────────────┼─────────────┼────────────┤
│ 16 KB │ 180K msg/s │ P99: 8ms │
│ 64 KB │ 420K msg/s │ P99: 12ms │
│ 256 KB │ 780K msg/s │ P99: 18ms │
│ 1 MB │ 1.2M msg/s │ P99: 23ms │
└────────────┴─────────────┴────────────┘
Impact de linger.ms:
┌────────────┬─────────────┬────────────┐
│ linger.ms │ Throughput │ Latency │
├────────────┼─────────────┼────────────┤
│ 0 │ 650K msg/s │ P99: 5ms │
│ 5 │ 950K msg/s │ P99: 12ms │
│ 10 │ 1.2M msg/s │ P99: 18ms │
│ 50 │ 1.3M msg/s │ P99: 60ms │
│ 100 │ 1.35M msg/s │ P99: 110ms │
└────────────┴─────────────┴────────────┘
Impact de compression:
┌────────────┬─────────────┬────────────┐
│ Compression│ Throughput │ CPU Usage │
├────────────┼─────────────┼────────────┤
│ none │ 1.0M msg/s │ 15% │
│ lz4 │ 1.2M msg/s │ 25% │
│ snappy │ 1.1M msg/s │ 30% │
│ zstd │ 1.0M msg/s │ 45% │
│ gzip │ 0.6M msg/s │ 70% │
└────────────┴─────────────┴────────────┘
Troubleshooting Performances
Diagnostic des Goulots d'Étranglement
# 1. Vérifier utilisation réseau
sar -n DEV 1 5
# Si >80% utilisation → Goulot réseau
# 2. Vérifier I/O disque
iostat -x 1 5
# await >10ms ou %util >80% → Goulot disque
# 3. Vérifier CPU
mpstat -P ALL 1 5
# Si >80% sur cores → Goulot CPU
# 4. Vérifier GC
grep "Pause" /var/log/kafka/gc.log | tail -20
# Pauses >100ms → Problème GC
# 5. Vérifier network threads
kafka-run-class kafka.tools.JmxTool \
--object-name 'kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent' \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
# <30% idle → Augmenter num.network.threads
Problèmes Courants
Symptôme: Throughput plafonne
├─ Vérifier: batch.size trop petit?
├─ Vérifier: linger.ms = 0?
├─ Vérifier: Compression désactivée?
└─ Solution: Augmenter batching
Symptôme: Latence élevée
├─ Vérifier: linger.ms trop élevé?
├─ Vérifier: fetch.min.bytes trop grand?
├─ Vérifier: GC pauses?
└─ Solution: Réduire batching ou tuner GC
Symptôme: Producer buffer full
├─ Vérifier: buffer.memory insuffisant?
├─ Vérifier: max.block.ms atteint?
└─ Solution: Augmenter buffer ou ralentir production
Symptôme: Consumer lag croissant
├─ Vérifier: max.poll.records trop bas?
├─ Vérifier: Traitement trop lent?
├─ Vérifier: Pas assez de partitions?
└─ Solution: Scale consumers ou optimiser traitement
Checklist pour 1M msg/s
Infrastructure
- 3+ brokers avec 16+ cores chacun
- 64+ GB RAM par broker (6-8 GB heap, reste page cache)
- NVMe SSD (>3 GB/s throughput)
- 25+ Gbps réseau entre machines
- XFS avec noatime, nodiratime
Système
- ulimit -n >= 1048576
- vm.swappiness=1
- vm.dirty_ratio=80
- TCP buffers augmentés
- THP désactivé
JVM
- Heap 6-8 GB (-Xms8g -Xmx8g)
- G1GC avec MaxGCPauseMillis=20
- GC logging activé
Broker
- num.io.threads = nombre cores
- num.network.threads = cores / 2
- socket buffers >= 1 MB
- Compression producer-side
Topic
- 50-100+ partitions
- Replication factor = 3
- min.insync.replicas = 2
Producer
- batch.size = 1 MB
- linger.ms = 10-50
- compression.type = lz4
- buffer.memory >= 256 MB
- Envoi asynchrone avec callbacks
Consumer
- fetch.min.bytes = 1 MB
- max.poll.records = 5000-10000
- 10+ instances par consumer group
A Retenir
- Le million est atteignable : Avec le bon matériel et tuning, 1M+ msg/s est réaliste
- Batching est roi : batch.size et linger.ms sont les paramètres les plus impactants
- Page cache > Heap : Laissez de la RAM pour le page cache Linux
- Compression LZ4 : Meilleur ratio performance/compression
- Partitions suffisantes : 50-100 partitions minimum pour parallélisme
- Mesurer avant d'optimiser : Benchmark rigoureux pour identifier les vrais goulots
- Trade-offs conscients : Latence vs throughput, durabilité vs performance
Besoin d'aide pour atteindre ces performances sur votre infrastructure Kafka ? Contactez-moi pour un accompagnement personnalisé sur vos problématiques de scalabilité et performance.