KAFKA
Avancé

Atteindre 1 Million de Messages par Seconde avec Kafka

Atteindre 1 Million de Messages par Seconde avec Kafka

Guide expert pour atteindre des performances extrêmes avec Apache Kafka : benchmarks détaillés, configurations matérielles optimales, tuning avancé des brokers, producteurs et consommateurs pour le million de messages par seconde.

Florian Courouge
35 min de lecture
3,515 mots
0 vues
Kafka
Performance
Benchmark
High-Throughput
Production
Scalability

Atteindre 1 Million de Messages par Seconde avec Kafka

Introduction

Atteindre 1 million de messages par seconde avec Apache Kafka n'est pas un mythe marketing. C'est un objectif atteignable avec une compréhension approfondie de l'architecture, un matériel adapté et un tuning méticuleux. Ce guide vous accompagne étape par étape pour atteindre ces performances extrêmes.

Dans cet article, nous explorerons :

  • La méthodologie de benchmark rigoureuse
  • Les configurations matérielles recommandées
  • Le tuning avancé à tous les niveaux du stack
  • Les résultats réels et reproductibles

Architecture Haute Performance Kafka

Prérequis et Contexte

Comprendre les Unités de Mesure

Avant tout, clarifions les métriques :

Messages/seconde (msg/s) : Nombre de messages
├─ 1M msg/s avec messages de 100 bytes = 100 MB/s
├─ 1M msg/s avec messages de 1 KB = 1 GB/s
└─ 1M msg/s avec messages de 10 KB = 10 GB/s

Throughput (MB/s ou GB/s) : Volume de données
├─ Plus représentatif de la charge réelle
└─ Détermine les besoins réseau/disque

Facteurs Limitants Identifiés

┌─────────────────────────────────────────────────────────────┐
│                 GOULOTS D'ÉTRANGLEMENT                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1. RÉSEAU (souvent le premier)                             │
│     └─ 10 Gbps = ~1.2 GB/s théorique                        │
│                                                              │
│  2. DISQUE (I/O séquentielles)                              │
│     └─ NVMe SSD = 3-7 GB/s                                  │
│                                                              │
│  3. CPU (compression/sérialisation)                         │
│     └─ LZ4 = ~500 MB/s par core                             │
│                                                              │
│  4. MÉMOIRE (page cache)                                    │
│     └─ Plus de RAM = moins de disk I/O                      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Méthodologie de Benchmark

Principes Fondamentaux

Un benchmark Kafka fiable doit respecter ces règles :

1. Isoler les Variables

# Test un paramètre à la fois
# Baseline → Change batch.size → Mesure → Restore → Change linger.ms → Mesure

2. Warm-up Obligatoire

# Les premières minutes ne comptent pas
# - JIT compilation (JVM)
# - Page cache population
# - Connection pool warmup

3. Durée Suffisante

# Minimum 5-10 minutes par test
# Évite les pics transitoires
# Capture les variations GC

4. Mesures Multiples

# Au moins 3 runs par configuration
# Moyenne + écart-type
# Éliminer les outliers

Environnement de Benchmark

Infrastructure de Test

# Architecture recommandée pour benchmark
Cluster Kafka:
  Brokers: 3
  Hardware par broker:
    CPU: 16+ cores (AMD EPYC ou Intel Xeon)
    RAM: 64 GB
    Disque: 2x NVMe SSD 1TB (RAID 0)
    Réseau: 25 Gbps

Machines de Test:
  Producers: 3 machines dédiées
  Consumers: 3 machines dédiées
  Hardware:
    CPU: 8+ cores
    RAM: 32 GB
    Réseau: 25 Gbps

Configuration Réseau

# Vérifier la bande passante disponible
iperf3 -s  # Sur broker
iperf3 -c broker-ip -t 60 -P 4  # Sur client

# Résultat attendu: ~24 Gbps sur 25 Gbps interface

Outils de Benchmark

kafka-producer-perf-test (Natif)

# Test baseline
kafka-producer-perf-test \
  --topic benchmark-topic \
  --num-records 10000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props \
    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
    acks=1 \
    batch.size=16384 \
    linger.ms=0

# Output:
# 10000000 records sent, 523456.7 records/sec (511.18 MB/sec),
# 45.2 ms avg latency, 234.0 ms max latency

OpenMessaging Benchmark (Avancé)

# Clone du benchmark framework
git clone https://github.com/openmessaging/benchmark.git
cd benchmark

# Build
mvn clean install -DskipTests

# Run avec driver Kafka
bin/benchmark \
  --drivers driver-kafka/kafka.yaml \
  --workloads workloads/1-topic-16-partitions-1kb.yaml

Script de Benchmark Personnalisé

#!/bin/bash
# benchmark-kafka.sh

BOOTSTRAP="kafka1:9092,kafka2:9092,kafka3:9092"
TOPIC="benchmark-1m"
RECORDS=50000000
RECORD_SIZE=1024

# Créer le topic
kafka-topics --create \
  --bootstrap-server $BOOTSTRAP \
  --topic $TOPIC \
  --partitions 100 \
  --replication-factor 3 \
  --config min.insync.replicas=2

# Test avec différentes configurations
for BATCH_SIZE in 16384 65536 262144 1048576; do
  for LINGER_MS in 0 5 10 50 100; do
    echo "Testing batch.size=$BATCH_SIZE, linger.ms=$LINGER_MS"

    kafka-producer-perf-test \
      --topic $TOPIC \
      --num-records $RECORDS \
      --record-size $RECORD_SIZE \
      --throughput -1 \
      --producer-props \
        bootstrap.servers=$BOOTSTRAP \
        acks=1 \
        batch.size=$BATCH_SIZE \
        linger.ms=$LINGER_MS \
        compression.type=lz4 \
      2>&1 | tee results/batch${BATCH_SIZE}_linger${LINGER_MS}.log

    sleep 30  # Cooldown
  done
done

Configuration Matérielle Optimale

Dimensionnement CPU

Besoins CPU par composant:
┌──────────────────────────────────────────────────────┐
│ Broker Kafka                                          │
├──────────────────────────────────────────────────────┤
│ Network threads: 1 core par thread                   │
│ I/O threads: 1 core par thread                       │
│ Compression: 1-2 cores supplémentaires               │
│ GC: 1-2 cores pour G1GC                              │
│                                                       │
│ Recommandation: 16-32 cores par broker               │
└──────────────────────────────────────────────────────┘

Configuration Recommandée

# server.properties
# Pour 32 cores
num.network.threads=16
num.io.threads=32
num.replica.fetchers=4

Dimensionnement Mémoire

Distribution RAM (64 GB):
┌──────────────────────────────────────────────────────┐
│                                                       │
│  JVM Heap: 6-8 GB                                    │
│  ├─ Au-delà, GC pauses augmentent                    │
│  └─ Kafka n'a pas besoin de plus                     │
│                                                       │
│  Page Cache: ~54 GB                                  │
│  ├─ Cache des segments de log                        │
│  ├─ Évite les disk reads                             │
│  └─ CRUCIAL pour performance                         │
│                                                       │
│  OS + Overhead: ~2-4 GB                              │
│                                                       │
└──────────────────────────────────────────────────────┘

Règle d'Or

# Page cache doit contenir au minimum:
# - Active segments de tous les topics
# - Index files
# - Messages "chauds" (récemment écrits)

# Calcul approximatif:
# RAM_page_cache = partitions * segment_size * 2
# Ex: 100 partitions * 1 GB * 2 = 200 GB (idéal)
# Minimum: 32-64 GB de page cache

Dimensionnement Stockage

NVMe SSD : Le Standard

# Caractéristiques requises
IOPS séquentielles: >500K read, >400K write
Throughput: >3 GB/s read, >2 GB/s write
Endurance: >1 DWPD (Drive Writes Per Day)
Capacité: Selon retention (ex: 7 jours * 1 GB/s = 604 TB)

Configuration RAID

# Option 1: RAID 0 (Performance max)
# ⚠️ Risque: Perte totale si un disque fail
# ✅ Acceptable car Kafka réplique déjà
mdadm --create /dev/md0 --level=0 --raid-devices=2 /dev/nvme0n1 /dev/nvme1n1

# Option 2: JBOD (Just a Bunch Of Disks)
# Kafka gère le striping via log.dirs
log.dirs=/data/kafka1,/data/kafka2

Mount Options Optimales

# /etc/fstab
/dev/md0 /var/lib/kafka xfs noatime,nodiratime,nobarrier,allocsize=64m 0 0

# Vérification
mount | grep kafka
# /dev/md0 on /var/lib/kafka type xfs (rw,noatime,nodiratime,nobarrier,allocsize=64m)

Dimensionnement Réseau

Calcul bande passante:
┌──────────────────────────────────────────────────────┐
│ 1M msg/s * 1 KB = 1 GB/s en entrée                   │
│                                                       │
│ Avec réplication factor 3:                           │
│ - Inter-broker: 1 GB/s * 2 = 2 GB/s                  │
│ - Consommation: 1 GB/s (si 1x)                       │
│                                                       │
│ Total par broker: ~4 GB/s = 32 Gbps                  │
│                                                       │
│ Recommandation: 25 Gbps minimum, 100 Gbps idéal      │
└──────────────────────────────────────────────────────┘

Tuning Niveau Système (Linux)

Optimisation Kernel

# /etc/sysctl.conf

# === MÉMOIRE ===
vm.swappiness=1
vm.dirty_ratio=80
vm.dirty_background_ratio=5
vm.dirty_expire_centisecs=12000
vm.dirty_writeback_centisecs=500
vm.vfs_cache_pressure=50

# === RÉSEAU ===
# TCP buffers (pour 25 Gbps)
net.core.rmem_default=134217728
net.core.rmem_max=134217728
net.core.wmem_default=134217728
net.core.wmem_max=134217728
net.core.optmem_max=40960

net.ipv4.tcp_rmem=4096 87380 134217728
net.ipv4.tcp_wmem=4096 65536 134217728

# Connection handling
net.core.netdev_max_backlog=50000
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=30000
net.ipv4.tcp_max_tw_buckets=2000000
net.ipv4.tcp_tw_reuse=1
net.ipv4.tcp_fin_timeout=10

# Disable slow start after idle
net.ipv4.tcp_slow_start_after_idle=0

# === FILE DESCRIPTORS ===
fs.file-max=2097152
fs.nr_open=2097152

# Appliquer
sysctl -p

Limites Système

# /etc/security/limits.conf
kafka soft nofile 1048576
kafka hard nofile 1048576
kafka soft nproc 65536
kafka hard nproc 65536
kafka soft memlock unlimited
kafka hard memlock unlimited

# Vérification
su - kafka -c "ulimit -n"
# 1048576

Désactiver Transparent Huge Pages

# THP cause des latency spikes avec Kafka
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag

# Permanent via GRUB
# /etc/default/grub
GRUB_CMDLINE_LINUX="transparent_hugepage=never"
update-grub

Tuning JVM Avancé

Configuration G1GC Optimisée

# kafka-server-start.sh ou via KAFKA_HEAP_OPTS

export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"

export KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16m
-XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80
-XX:+ParallelRefProcEnabled
-XX:+ExplicitGCInvokesConcurrent
-XX:+AlwaysPreTouch
-XX:-UseBiasedLocking
-Djava.awt.headless=true
"

Explication des Paramètres

-XX:MaxGCPauseMillis=20
├─ Objectif: pauses < 20ms
└─ G1 adapte son comportement pour respecter

-XX:InitiatingHeapOccupancyPercent=35
├─ Déclenche GC à 35% heap utilisé
└─ Évite les Full GC tardifs

-XX:G1HeapRegionSize=16m
├─ Taille des régions G1
└─ Optimal pour heap 6-8 GB

-XX:+AlwaysPreTouch
├─ Alloue toute la heap au démarrage
└─ Évite les page faults en runtime

-XX:-UseBiasedLocking
├─ Désactive biased locking
└─ Réduit les pauses de safepoint

GC Logging

export KAFKA_GC_LOG_OPTS="
-Xlog:gc*:file=/var/log/kafka/gc.log:time,uptime,level,tags:filecount=10,filesize=100M
-Xlog:safepoint:file=/var/log/kafka/safepoint.log:time,uptime,level,tags:filecount=5,filesize=50M
"

Configuration Broker pour 1M msg/s

server.properties Optimisé

############################# Server Basics #############################
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka1.example.com:9092

############################# Threading #############################
# Threads réseau (connections entrantes)
num.network.threads=16

# Threads I/O (disk operations)
num.io.threads=32

# Threads pour réplication
num.replica.fetchers=8

# Background threads
background.threads=10

############################# Socket Settings #############################
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
queued.max.requests=1000

############################# Log Basics #############################
log.dirs=/data/kafka1,/data/kafka2
num.partitions=12
default.replication.factor=3
min.insync.replicas=2

############################# Log Segment #############################
log.segment.bytes=1073741824
log.index.size.max.bytes=10485760
log.index.interval.bytes=4096

############################# Log Retention #############################
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms=300000

############################# Log Flush Policy #############################
# Laisser OS gérer (via page cache)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=9223372036854775807

############################# Replication #############################
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.socket.receive.buffer.bytes=1048576

############################# Compression #############################
compression.type=producer

############################# ZooKeeper (ou KRaft) #############################
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=18000
zookeeper.session.timeout.ms=18000

Configuration du Topic de Benchmark

# Topic optimisé pour haut débit
kafka-topics --create \
  --bootstrap-server kafka1:9092 \
  --topic high-throughput-topic \
  --partitions 100 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config segment.bytes=536870912 \
  --config retention.ms=86400000 \
  --config compression.type=lz4

Configuration Producer pour 1M msg/s

Producer Java Optimisé

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class HighThroughputProducer {

    public static KafkaProducer<String, byte[]> createProducer() {
        Properties props = new Properties();

        // === Connexion ===
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "high-throughput-producer");

        // === Sérialisation ===
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  ByteArraySerializer.class.getName());

        // === Durabilité (trade-off avec performance) ===
        // acks=1 : Leader only (meilleur throughput)
        // acks=all : Tous ISR (meilleure durabilité)
        props.put(ProducerConfig.ACKS_CONFIG, "1");

        // === Batching (CRITIQUE!) ===
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576); // 1 MB
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms max wait

        // === Compression ===
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

        // === Buffer ===
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 268435456); // 256 MB
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);

        // === In-flight Requests ===
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        // === Retry ===
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);

        // === Timeout ===
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws Exception {
        KafkaProducer<String, byte[]> producer = createProducer();
        String topic = "high-throughput-topic";
        byte[] payload = new byte[1024]; // 1 KB message

        AtomicLong successCount = new AtomicLong(0);
        AtomicLong errorCount = new AtomicLong(0);

        long startTime = System.currentTimeMillis();
        long totalMessages = 10_000_000;

        for (long i = 0; i < totalMessages; i++) {
            ProducerRecord<String, byte[]> record =
                new ProducerRecord<>(topic, null, payload);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    successCount.incrementAndGet();
                } else {
                    errorCount.incrementAndGet();
                }
            });

            // Progress logging
            if (i % 1_000_000 == 0 && i > 0) {
                double elapsed = (System.currentTimeMillis() - startTime) / 1000.0;
                double rate = i / elapsed;
                System.out.printf("Sent %d messages, %.0f msg/s%n", i, rate);
            }
        }

        producer.flush();
        producer.close();

        long endTime = System.currentTimeMillis();
        double totalSeconds = (endTime - startTime) / 1000.0;
        double messagesPerSecond = totalMessages / totalSeconds;

        System.out.printf("Total: %d messages in %.2f seconds%n",
                          totalMessages, totalSeconds);
        System.out.printf("Throughput: %.0f msg/s (%.2f MB/s)%n",
                          messagesPerSecond, messagesPerSecond * 1024 / 1_000_000);
        System.out.printf("Success: %d, Errors: %d%n",
                          successCount.get(), errorCount.get());
    }
}

Multi-Threaded Producer

import java.util.concurrent.*;

public class MultiThreadedProducer {

    private static final int NUM_THREADS = 10;
    private static final int MESSAGES_PER_THREAD = 1_000_000;

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        CountDownLatch latch = new CountDownLatch(NUM_THREADS);
        AtomicLong totalMessages = new AtomicLong(0);

        long startTime = System.currentTimeMillis();

        for (int t = 0; t < NUM_THREADS; t++) {
            final int threadId = t;
            executor.submit(() -> {
                try {
                    KafkaProducer<String, byte[]> producer = createProducer();
                    byte[] payload = new byte[1024];

                    for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
                        producer.send(new ProducerRecord<>("topic", null, payload));
                        totalMessages.incrementAndGet();
                    }

                    producer.flush();
                    producer.close();
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executor.shutdown();

        long endTime = System.currentTimeMillis();
        double seconds = (endTime - startTime) / 1000.0;
        double rate = totalMessages.get() / seconds;

        System.out.printf("Total: %d msg in %.2f s = %.0f msg/s%n",
                          totalMessages.get(), seconds, rate);
    }
}

Configuration Consumer pour 1M msg/s

Consumer Haute Performance

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class HighThroughputConsumer {

    public static KafkaConsumer<String, byte[]> createConsumer() {
        Properties props = new Properties();

        // === Connexion ===
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-group");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");

        // === Désérialisation ===
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                  ByteArrayDeserializer.class.getName());

        // === Fetch Configuration (CRITIQUE!) ===
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50 MB
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760); // 10 MB

        // === Poll Configuration ===
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        // === Offset Management ===
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // === Session ===
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

        return new KafkaConsumer<>(props);
    }

    public static void main(String[] args) {
        KafkaConsumer<String, byte[]> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList("high-throughput-topic"));

        long messageCount = 0;
        long byteCount = 0;
        long startTime = System.currentTimeMillis();

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, byte[]> record : records) {
                    messageCount++;
                    byteCount += record.value().length;
                }

                // Progress logging
                if (messageCount % 1_000_000 == 0 && messageCount > 0) {
                    double elapsed = (System.currentTimeMillis() - startTime) / 1000.0;
                    double msgRate = messageCount / elapsed;
                    double mbRate = byteCount / elapsed / 1_000_000;
                    System.out.printf("Consumed %d messages, %.0f msg/s, %.2f MB/s%n",
                                      messageCount, msgRate, mbRate);
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Résultats de Benchmark

Configuration de Test

Hardware:
- 3x Brokers: 32 cores, 128 GB RAM, 2x NVMe SSD
- 5x Producer machines: 16 cores, 64 GB RAM
- 5x Consumer machines: 16 cores, 64 GB RAM
- Network: 25 Gbps entre toutes les machines

Topic:
- Partitions: 100
- Replication factor: 3
- Message size: 1 KB

Résultats Production

┌─────────────────────────────────────────────────────────────┐
│                RÉSULTATS BENCHMARK                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Configuration: acks=1, batch=1MB, linger=10ms, lz4         │
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ Messages/sec:     1,247,000                          │    │
│  │ Throughput:       1.19 GB/s                          │    │
│  │ Latency P50:      8 ms                               │    │
│  │ Latency P99:      23 ms                              │    │
│  │ Latency P99.9:    45 ms                              │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
│  Configuration: acks=all, batch=1MB, linger=10ms, lz4       │
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ Messages/sec:     892,000                            │    │
│  │ Throughput:       871 MB/s                           │    │
│  │ Latency P50:      12 ms                              │    │
│  │ Latency P99:      35 ms                              │    │
│  │ Latency P99.9:    78 ms                              │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Résultats Consommation

┌─────────────────────────────────────────────────────────────┐
│                CONSOMMATION BENCHMARK                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Consumer Group: 10 instances, 100 partitions               │
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ Messages/sec:     2,100,000                          │    │
│  │ Throughput:       2.0 GB/s                           │    │
│  │ Fetch latency:    45 ms avg                          │    │
│  │ Consumer lag:     < 10,000 messages                  │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
│  Note: Consommation > Production car lecture depuis cache   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Impact des Paramètres

Impact de batch.size:
┌────────────┬─────────────┬────────────┐
│ batch.size │ Throughput  │ Latency    │
├────────────┼─────────────┼────────────┤
│ 16 KB      │ 180K msg/s  │ P99: 8ms   │
│ 64 KB      │ 420K msg/s  │ P99: 12ms  │
│ 256 KB     │ 780K msg/s  │ P99: 18ms  │
│ 1 MB       │ 1.2M msg/s  │ P99: 23ms  │
└────────────┴─────────────┴────────────┘

Impact de linger.ms:
┌────────────┬─────────────┬────────────┐
│ linger.ms  │ Throughput  │ Latency    │
├────────────┼─────────────┼────────────┤
│ 0          │ 650K msg/s  │ P99: 5ms   │
│ 5          │ 950K msg/s  │ P99: 12ms  │
│ 10         │ 1.2M msg/s  │ P99: 18ms  │
│ 50         │ 1.3M msg/s  │ P99: 60ms  │
│ 100        │ 1.35M msg/s │ P99: 110ms │
└────────────┴─────────────┴────────────┘

Impact de compression:
┌────────────┬─────────────┬────────────┐
│ Compression│ Throughput  │ CPU Usage  │
├────────────┼─────────────┼────────────┤
│ none       │ 1.0M msg/s  │ 15%        │
│ lz4        │ 1.2M msg/s  │ 25%        │
│ snappy     │ 1.1M msg/s  │ 30%        │
│ zstd       │ 1.0M msg/s  │ 45%        │
│ gzip       │ 0.6M msg/s  │ 70%        │
└────────────┴─────────────┴────────────┘

Troubleshooting Performances

Diagnostic des Goulots d'Étranglement

# 1. Vérifier utilisation réseau
sar -n DEV 1 5
# Si >80% utilisation → Goulot réseau

# 2. Vérifier I/O disque
iostat -x 1 5
# await >10ms ou %util >80% → Goulot disque

# 3. Vérifier CPU
mpstat -P ALL 1 5
# Si >80% sur cores → Goulot CPU

# 4. Vérifier GC
grep "Pause" /var/log/kafka/gc.log | tail -20
# Pauses >100ms → Problème GC

# 5. Vérifier network threads
kafka-run-class kafka.tools.JmxTool \
  --object-name 'kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent' \
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
# <30% idle → Augmenter num.network.threads

Problèmes Courants

Symptôme: Throughput plafonne
├─ Vérifier: batch.size trop petit?
├─ Vérifier: linger.ms = 0?
├─ Vérifier: Compression désactivée?
└─ Solution: Augmenter batching

Symptôme: Latence élevée
├─ Vérifier: linger.ms trop élevé?
├─ Vérifier: fetch.min.bytes trop grand?
├─ Vérifier: GC pauses?
└─ Solution: Réduire batching ou tuner GC

Symptôme: Producer buffer full
├─ Vérifier: buffer.memory insuffisant?
├─ Vérifier: max.block.ms atteint?
└─ Solution: Augmenter buffer ou ralentir production

Symptôme: Consumer lag croissant
├─ Vérifier: max.poll.records trop bas?
├─ Vérifier: Traitement trop lent?
├─ Vérifier: Pas assez de partitions?
└─ Solution: Scale consumers ou optimiser traitement

Checklist pour 1M msg/s

Infrastructure

  • 3+ brokers avec 16+ cores chacun
  • 64+ GB RAM par broker (6-8 GB heap, reste page cache)
  • NVMe SSD (>3 GB/s throughput)
  • 25+ Gbps réseau entre machines
  • XFS avec noatime, nodiratime

Système

  • ulimit -n >= 1048576
  • vm.swappiness=1
  • vm.dirty_ratio=80
  • TCP buffers augmentés
  • THP désactivé

JVM

  • Heap 6-8 GB (-Xms8g -Xmx8g)
  • G1GC avec MaxGCPauseMillis=20
  • GC logging activé

Broker

  • num.io.threads = nombre cores
  • num.network.threads = cores / 2
  • socket buffers >= 1 MB
  • Compression producer-side

Topic

  • 50-100+ partitions
  • Replication factor = 3
  • min.insync.replicas = 2

Producer

  • batch.size = 1 MB
  • linger.ms = 10-50
  • compression.type = lz4
  • buffer.memory >= 256 MB
  • Envoi asynchrone avec callbacks

Consumer

  • fetch.min.bytes = 1 MB
  • max.poll.records = 5000-10000
  • 10+ instances par consumer group

A Retenir

  1. Le million est atteignable : Avec le bon matériel et tuning, 1M+ msg/s est réaliste
  2. Batching est roi : batch.size et linger.ms sont les paramètres les plus impactants
  3. Page cache > Heap : Laissez de la RAM pour le page cache Linux
  4. Compression LZ4 : Meilleur ratio performance/compression
  5. Partitions suffisantes : 50-100 partitions minimum pour parallélisme
  6. Mesurer avant d'optimiser : Benchmark rigoureux pour identifier les vrais goulots
  7. Trade-offs conscients : Latence vs throughput, durabilité vs performance

Besoin d'aide pour atteindre ces performances sur votre infrastructure Kafka ? Contactez-moi pour un accompagnement personnalisé sur vos problématiques de scalabilité et performance.

F

Florian Courouge

Expert DevOps & Kafka | Consultant freelance specialise dans les architectures distribuees et le streaming de donnees.

Articles similaires