Aller au contenu principal
DATA ENGINEERING

Kafka Performance Tuning : Optimisation et Scalabilité

Florian Courouge
min de lecture
Kafka
Performance
Optimization
Scalability
Production
Niveau:
Intermédiaire

Kafka Performance Tuning : Optimisation et Scalabilité

Introduction

Apache Kafka est conçu pour gérer des millions de messages par seconde, mais atteindre ces performances en production nécessite une compréhension approfondie de son architecture et un tuning méticuleux. La performance de Kafka n'est pas magique : elle résulte d'optimisations à tous les niveaux du stack (système d'exploitation, réseau, stockage, JVM, configuration Kafka).

Dans ce guide exhaustif, nous explorerons toutes les dimensions de l'optimisation de Kafka :

  • Architecture haute performance
  • Tuning du système d'exploitation (Linux)
  • Optimisation des brokers
  • Configuration des producteurs et consommateurs
  • Stratégies de partitionnement
  • Monitoring et benchmarking

Comprendre les Limites de Performance

Modèle de Performance Kafka

Kafka utilise un modèle de séquentialité qui le rend exceptionnellement rapide :

┌────────────────────────────────────────────────────┐
│  Écriture Séquentielle (Append-Only Log)          │
│                                                    │
│  Disque SSD: ~500 MB/s en séquentiel              │
│  vs ~10 MB/s en aléatoire (50x plus lent)         │
│                                                    │
│  Zero-Copy Transfert (sendfile syscall)           │
│  Données: Disque → Socket réseau (pas de userspace)│
└────────────────────────────────────────────────────┘

Facteurs Limitants

  1. Réseau : Souvent le goulot d'étranglement

    • 1 Gbps = ~120 MB/s max théorique
    • 10 Gbps = ~1.2 GB/s max théorique
  2. Disque : I/O séquentielles

    • HDD: 100-200 MB/s
    • SSD SATA: 500-600 MB/s
    • NVMe SSD: 3-7 GB/s
  3. CPU : Compression/décompression

    • LZ4: ~500 MB/s par core
    • Snappy: ~250 MB/s par core
    • Gzip: ~100 MB/s par core
  4. Mémoire : Page cache et buffers

    • Plus de RAM = Plus de données en cache
    • Moins de disk I/O

Benchmarks de Référence

Capacité Théorique (Single Broker)

Configuration:
- CPU: 16 cores
- RAM: 64 GB
- Disque: NVMe SSD (3 GB/s)
- Réseau: 10 Gbps

Messages: 1 KB, réplication factor 3, acks=all

Throughput max:
- Producteurs: ~800K msg/s (~800 MB/s)
- Consommateurs: ~1M msg/s (~1 GB/s)

Latence Typique

Percentile    Latency
─────────────────────
P50           3-5 ms
P95           10-15 ms
P99           20-30 ms
P99.9         50-100 ms

Optimisation du Système d'Exploitation

Linux Kernel Tuning

1. File Descriptors

Kafka ouvre beaucoup de fichiers (logs, sockets).

# /etc/security/limits.conf
kafka soft nofile 100000
kafka hard nofile 100000

# Vérification
ulimit -n

2. Virtual Memory

# /etc/sysctl.conf

# Swappiness (éviter le swap)
vm.swappiness=1

# Dirty pages (contrôle du flush)
vm.dirty_ratio=80
vm.dirty_background_ratio=5
vm.dirty_expire_centisecs=12000

# Page cache
vm.vfs_cache_pressure=50

# Appliquer
sysctl -p

Explication des Paramètres

vm.swappiness=1
→ Ne swap que si absolument nécessaire
→ Kafka préfère RAM pour page cache

vm.dirty_ratio=80
→ Max 80% de RAM pour pages "sales" avant flush synchrone
→ Évite les pauses I/O soudaines

vm.dirty_background_ratio=5
→ Démarrer flush background à 5% de RAM sale
→ Flush progressif et continu

vm.dirty_expire_centisecs=12000
→ Flush pages sales après 120 secondes
→ Balance entre latence et throughput

3. Réseau (TCP Tuning)

# /etc/sysctl.conf

# TCP buffers (important pour hauts débits)
net.core.rmem_default=262144
net.core.rmem_max=134217728
net.core.wmem_default=262144
net.core.wmem_max=134217728
net.core.optmem_max=16777216

# TCP window scaling
net.ipv4.tcp_rmem=4096 87380 134217728
net.ipv4.tcp_wmem=4096 65536 134217728

# TCP connection queue
net.core.netdev_max_backlog=5000
net.core.somaxconn=4096

# TCP keepalive
net.ipv4.tcp_keepalive_time=600
net.ipv4.tcp_keepalive_intvl=60
net.ipv4.tcp_keepalive_probes=3

# Désactiver slow start après idle
net.ipv4.tcp_slow_start_after_idle=0

4. Système de Fichiers

XFS (Recommandé)

# Mount avec options optimisées
/dev/nvme0n1 /var/lib/kafka xfs noatime,nodiratime 0 0

# Dans /etc/fstab
UUID=xxx /var/lib/kafka xfs noatime,nodiratime,nobarrier 0 0

Options Expliquées

noatime : Ne pas mettre à jour access time
→ Réduit les écritures inutiles

nodiratime : Ne pas mettre à jour directory access time
→ Économise encore plus d'I/O

nobarrier : Désactive write barriers (si UPS présent)
→ +20-30% throughput écriture
→ ⚠️ DANGER si pas de protection coupure électrique

EXT4 (Alternative)

# Mount
/dev/sda1 /var/lib/kafka ext4 noatime,nodiratime,data=writeback 0 0

# data=writeback : Pas d'ordre garanti entre metadata et data
# → Meilleure performance mais moins de garantie en cas de crash

JVM Tuning

Garbage Collection (G1GC)

# Kafka broker JVM options
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"

export KAFKA_JVM_PERFORMANCE_OPTS="
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M
-XX:MinMetaspaceSize=96m
-XX:MaxMetaspaceSize=256m
-XX:+ParallelRefProcEnabled
-XX:+ExplicitGCInvokesConcurrent
"

Taille de Heap

Règles:
1. Heap size = 25-50% de RAM totale
2. Max 6-8 GB (au-delà, GC pause augmente)
3. Reste de RAM → Page cache (crucial!)

Exemple avec 32 GB RAM:
- Heap: 6 GB
- Page cache: ~24 GB
- OS + autres: ~2 GB

GC Logging

export KAFKA_GC_LOG_OPTS="
-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M
"

Analyse des Logs GC

# Identifier les longues pauses
grep "Pause" gc.log | awk '{print $NF}' | sort -n | tail -20

# Moyenne des pauses
grep "Pause" gc.log | awk '{sum+=$NF; count++} END {print sum/count}'

Optimisation des Brokers

Configuration Broker Critique

1. Réplication et Durabilité

# server.properties

######################## Réplication ########################

# Nombre de brokers dans le cluster
broker.id=1
num.network.threads=8
num.io.threads=16

# Réplication factor par défaut (créations auto topics)
default.replication.factor=3

# Min ISR pour accepter écritures (balance durabilité/disponibilité)
min.insync.replicas=2

# Nombre de replicas pour offset commits (consumer groups)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# Leader imbalance (redistribution automatique)
auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=10
leader.imbalance.check.interval.seconds=300

Explication ISR (In-Sync Replicas)

Topic: payments (replication.factor=3, min.insync.replicas=2)

Scenario 1: Tous replicas synchronisés
Leader: Broker-1 ✅
ISR: [Broker-1, Broker-2, Broker-3]
→ Écriture OK

Scenario 2: Un replica en retard
Leader: Broker-1 ✅
ISR: [Broker-1, Broker-2]
Follower hors ISR: Broker-3 (lag > replica.lag.time.max.ms)
→ Écriture OK (min.insync.replicas=2 respecté)

Scenario 3: Deux replicas down
Leader: Broker-1 ✅
ISR: [Broker-1]
→ Écriture REFUSÉE (NotEnoughReplicasException)
→ Garantit durabilité

2. Log Segments et Retention

######################## Log Management ########################

# Taille max d'un segment (défaut: 1 GB)
log.segment.bytes=1073741824

# Temps avant rotation d'un segment (défaut: 7 jours)
log.roll.hours=168

# Retention par temps (7 jours)
log.retention.hours=168

# Retention par taille (ex: 500 GB par partition)
log.retention.bytes=536870912000

# Fréquence de vérification pour suppression (5 min)
log.retention.check.interval.ms=300000

# Compression des anciens segments
log.cleanup.policy=delete
# Ou pour compaction (topics avec clés)
# log.cleanup.policy=compact

# Index interval (balance RAM vs lookup speed)
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760

Impact de la Taille des Segments

Segment Size = 1 GB (défaut)
├─ Avantages:
│  ├─ Moins de fichiers ouverts
│  └─ Moins de overhead metadata
└─ Inconvénients:
   ├─ Retention moins granulaire
   └─ Plus long à compacter

Segment Size = 256 MB (haute volumétrie)
├─ Avantages:
│  ├─ Rotation fréquente
│  ├─ Deletion/Compaction plus rapide
│  └─ Meilleure granularité retention
└─ Inconvénients:
   └─ Plus de fichiers (attention file descriptors!)

3. Flush et Durabilité

######################## Flush Configuration ########################

# NE PAS flush après chaque message (laisse OS gérer)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=

# Laisse le page cache faire le travail
# Kafka fait confiance à:
# 1. Réplication (min.insync.replicas)
# 2. OS page cache flush
# 3. Filesystem journaling

# ⚠️ Ne configurer flush que si vraiment nécessaire:
# log.flush.interval.messages=10000
# → Impact négatif sur performance

4. Compression

######################## Compression ########################

# Type de compression par défaut
compression.type=producer

# Options: none, gzip, snappy, lz4, zstd
# Recommandation: lz4 (bon ratio compression/CPU)

Comparaison des Algorithmes

| Algo | Ratio Compression | Vitesse Compress | Vitesse Decompress | CPU Usage | |------|-------------------|------------------|---------------------|-----------| | none | 1.0x | N/A | N/A | Aucun | | lz4 | 2.0x | ⭐⭐⭐⭐⭐ 500MB/s | ⭐⭐⭐⭐⭐ 2GB/s | Faible | | snappy | 2.5x | ⭐⭐⭐⭐ 250MB/s | ⭐⭐⭐⭐ 500MB/s | Moyen | | zstd | 3.5x | ⭐⭐⭐ 150MB/s | ⭐⭐⭐⭐ 600MB/s | Moyen | | gzip | 4.0x | ⭐⭐ 100MB/s | ⭐⭐⭐ 300MB/s | Élevé |

Recommandation

Cas d'usage           Compression
────────────────────────────────────
Logs JSON             lz4 ou zstd
Données binaires      lz4
Très haut débit       none ou lz4
Stockage limité       zstd ou gzip
CPU limité            lz4 ou none

5. Networking

######################## Network Threading ########################

# Threads réseau (acceptent connexions)
num.network.threads=8

# Threads I/O (lecture/écriture disque)
num.io.threads=16

# Règle générale:
# num.network.threads = nombre de cores / 2
# num.io.threads = nombre de cores

# Taille des buffers socket
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# Max taille requête (protection DoS)
socket.request.max.bytes=104857600

# Queued requests max
queued.max.requests=500

# Nombre de connections max par IP
max.connections.per.ip=2147483647

# Connections max totales
max.connections=10000

Stratégies de Partitionnement

Nombre Optimal de Partitions

Formule de départ:
Partitions = max(T_producer / T_p, T_consumer / T_c)

Où:
T_producer = Throughput cible producteur (MB/s)
T_p = Throughput max par partition producteur (~10-30 MB/s)
T_consumer = Throughput cible consommateur (MB/s)
T_c = Throughput max par partition consommateur (~50-100 MB/s)

Exemple:
Besoin: 300 MB/s en écriture
T_p = 15 MB/s (mesuré)
Partitions = 300 / 15 = 20 partitions

Impacts du Nombre de Partitions

Trop peu de partitions (ex: 3):
❌ Parallélisme limité
❌ Consumer groups limités à 3 instances max
❌ Goulot d'étranglement throughput

Trop de partitions (ex: 500):
❌ Overhead metadata (ZooKeeper/KRaft)
❌ Leader election plus lente
❌ Plus de fichiers ouverts
❌ End-to-end latency augmentée

Bonnes Pratiques

# Topic haute volumétrie (logs)
kafka-topics --create \
  --topic app-logs \
  --partitions 50 \
  --replication-factor 3

# Topic transactionnel (orders)
kafka-topics --create \
  --topic orders \
  --partitions 12 \
  --replication-factor 3

# Topic low-volume (config)
kafka-topics --create \
  --topic app-config \
  --partitions 1 \
  --replication-factor 3

Répartition des Partitions

# Distribution optimale sur 5 brokers
kafka-topics --create \
  --topic users \
  --partitions 30 \
  --replication-factor 3 \
  --replica-assignment \
  "0:1:2,1:2:3,2:3:4,3:4:0,4:0:1,
   0:2:3,1:3:4,2:4:0,3:0:1,4:1:2,
   ..."

# Ou laisser Kafka auto-assigner (recommandé)
kafka-topics --create \
  --topic users \
  --partitions 30 \
  --replication-factor 3

Optimisation des Producteurs

Configuration Producteur Haute Performance

Properties props = new Properties();

// =============== Connexion ===============
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "high-perf-producer-1");

// =============== Sérialisation ===============
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

// =============== Durabilité vs Performance ===============
// acks=all (plus sûr, latence +2-3ms)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// acks=1 (leader seulement, latence minimale)
// props.put(ProducerConfig.ACKS_CONFIG, "1");
// acks=0 (fire-and-forget, max throughput)
// props.put(ProducerConfig.ACKS_CONFIG, "0");

// =============== Batching (CRITIQUE!) ===============
// Taille max d'un batch (16 KB → 1 MB pour high throughput)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576); // 1 MB

// Délai d'attente avant envoi batch (0 → 100ms)
// Trade-off latence vs throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms

// =============== Compression ===============
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

// =============== Buffer Memory ===============
// Mémoire totale pour buffering (32 MB → 256 MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64 MB

// =============== Retry et Timeout ===============
// Retry automatique (2147483647 = infini)
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// Enable idempotence (évite duplicates en cas de retry)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Timeouts
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

// =============== Partitioning ===============
// Partitioner par défaut (hash de la clé)
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

Impact du Batching

Sans Batching Optimisé

batch.size=16384 (16 KB)
linger.ms=0 (envoi immédiat)

Résultat:
- Throughput: 50K msg/s
- Latency P99: 5ms
- Network: Beaucoup de petites requêtes

Avec Batching Optimisé

batch.size=1048576 (1 MB)
linger.ms=10 (attente 10ms)

Résultat:
- Throughput: 500K msg/s (10x!)
- Latency P99: 15ms (+10ms acceptable)
- Network: Peu de grosses requêtes

Tuning selon Cas d'Usage

// CAS 1: Ultra-faible latence (trading, alertes)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);    // 16 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);         // Immédiat
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

// CAS 2: Haut débit (logs, événements)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);  // 1 MB
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);       // 100ms
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

// CAS 3: Balance (transactionnel)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144);   // 256 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);        // 10ms
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

Producteur Asynchrone vs Synchrone

Producteur Synchrone (Lent)

// ❌ Bloque sur chaque send()
for (int i = 0; i < 1000; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
    Future<RecordMetadata> future = producer.send(record);
    future.get(); // BLOQUE ici ! (~5-10ms par message)
}
// Throughput: ~100-200 msg/s (terrible)

Producteur Asynchrone (Rapide)

// ✅ Fire-and-forget avec callback
for (int i = 0; i < 1000000; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            // Log erreur
            log.error("Send failed", exception);
        } else {
            // Success (optionnel)
            // log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
        }
    });
}
// Throughput: 500K+ msg/s

Custom Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {

        int numPartitions = cluster.partitionCountForTopic(topic);

        if (key == null) {
            // Round-robin pour messages sans clé
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }

        // Exemple: Router VIP users vers partition dédiée
        String keyString = (String) key;
        if (keyString.startsWith("vip-")) {
            return 0; // Partition 0 réservée VIP
        }

        // Hash standard pour autres
        return Math.abs(keyString.hashCode()) % numPartitions;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

// Utilisation
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

Optimisation des Consommateurs

Configuration Consommateur Haute Performance

Properties props = new Properties();

// =============== Connexion ===============
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-perf-consumer-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");

// =============== Sérialisation ===============
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

// =============== Fetch Configuration (CRITIQUE!) ===============
// Taille min avant retour (1 byte → 1 MB)
// Augmenter = moins de requêtes, meilleur throughput
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB

// Temps d'attente max si fetch.min.bytes pas atteint
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms

// Taille max d'un fetch (50 MB → 100 MB)
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50 MB

// Taille max par partition dans un fetch (1 MB → 10 MB)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1 MB

// =============== Poll Configuration ===============
// Nombre max de records par poll() (500 → 5000)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);

// Timeout max entre deux poll() avant considéré mort
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 min

// =============== Offset Management ===============
// Auto-commit des offsets (false pour contrôle manuel)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// Si auto-commit=true, fréquence de commit
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

// Point de démarrage si pas d'offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Options: earliest, latest, none

// =============== Session et Heartbeat ===============
// Timeout session avant rebalance (10s → 30s)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

// Intervalle heartbeat (1/3 de session.timeout)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

Impact de fetch.min.bytes

fetch.min.bytes = 1 (défaut)

Comportement:
- Retourne données dès qu'au moins 1 byte disponible
- Beaucoup de requêtes réseau
- Latence minimale

Throughput: Moyen
Latence: Très faible (1-2ms)
CPU: Élevé (beaucoup d'appels système)

fetch.min.bytes = 1 MB

Comportement:
- Attend 1 MB ou fetch.max.wait.ms (500ms)
- Moins de requêtes réseau
- Batching côté broker

Throughput: Élevé (+50-100%)
Latence: Moyenne (+500ms max)
CPU: Faible

Trade-off Optimal

// Haute latence acceptable (batch processing)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1 MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);

// Faible latence requise (real-time)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);

Pattern de Consommation Optimisé

Consumer Loop Standard

consumer.subscribe(Arrays.asList("orders"));

try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, byte[]> record : records) {
            processRecord(record);
        }

        // Commit manuel synchrone (bloquant mais sûr)
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

Consumer Asynchrone Haute Performance

consumer.subscribe(Arrays.asList("orders"));

ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<>();

try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));

        if (records.isEmpty()) {
            continue;
        }

        // Process records en parallèle
        List<Future<?>> futures = new ArrayList<>();

        for (ConsumerRecord<String, byte[]> record : records) {
            futures.add(executor.submit(() -> {
                processRecord(record);

                // Track offset
                currentOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            }));
        }

        // Attendre fin du traitement
        for (Future<?> future : futures) {
            future.get();
        }

        // Commit asynchrone (non-bloquant)
        consumer.commitAsync(currentOffsets, (offsets, exception) -> {
            if (exception != null) {
                log.error("Commit failed", exception);
            }
        });
    }
} finally {
    executor.shutdown();
    consumer.close();
}

Consumer Groups et Parallélisme

Scaling Horizontal

Topic: orders (20 partitions, replication.factor=3)

Consumer Group: order-processors
├─ Instance 1: Partitions [0, 1, 2, 3, 4]
├─ Instance 2: Partitions [5, 6, 7, 8, 9]
├─ Instance 3: Partitions [10, 11, 12, 13, 14]
└─ Instance 4: Partitions [15, 16, 17, 18, 19]

Throughput total = 4 × throughput_instance

Rebalancing (Coût)

Événement déclencheur:
- Nouveau consumer rejoint
- Consumer quitte (crash, shutdown)
- Nouveau partitions ajoutées

Durée rebalance:
- 3-10 secondes (cluster normal)
- 30-60 secondes (gros cluster, 100+ consumers)

Impact:
- STOP de tous les consumers du groupe
- Pas de consommation pendant rebalance
- Perte temporaire de throughput

Minimiser Rebalancing

// Augmenter session timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); // 60s

// Augmenter max.poll.interval (si traitement lent)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 10min

// Sticky assignor (minimise reassignment)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
          StickyAssignor.class.getName());

Monitoring et Benchmarking

Métriques JMX Critiques

Broker Metrics

# Messages in per second
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

# Bytes in/out per second
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec

# Request latency (produce, fetch)
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer

# Under-replicated partitions (CRITIQUE!)
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

# Offline partitions (ALERTE!)
kafka.controller:type=KafkaController,name=OfflinePartitionsCount

# Active controller count (doit être 1)
kafka.controller:type=KafkaController,name=ActiveControllerCount

Producer Metrics

// Accès aux métriques du producer
Map<MetricName, ? extends Metric> metrics = producer.metrics();

// Métriques clés:
// - record-send-rate: Messages/s
// - record-send-total: Total messages
// - batch-size-avg: Taille moyenne des batches
// - compression-rate-avg: Ratio compression
// - record-error-rate: Taux d'erreur
// - buffer-available-bytes: Mémoire buffer disponible

for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
    System.out.printf("%s: %s%n", entry.getKey().name(), entry.getValue().metricValue());
}

Consumer Metrics

// Métriques clés:
// - records-consumed-rate: Records/s
// - fetch-latency-avg: Latence moyenne fetch
// - fetch-size-avg: Taille moyenne fetch
// - records-lag-max: Lag max sur partitions
// - commit-latency-avg: Latence commit

Map<MetricName, ? extends Metric> metrics = consumer.metrics();

Kafka Performance Test Tool

kafka-producer-perf-test

# Test production: 1M messages, 1KB chacun
kafka-producer-perf-test \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props \
    bootstrap.servers=localhost:9092 \
    acks=all \
    batch.size=1048576 \
    linger.ms=10 \
    compression.type=lz4

# Résultat:
# 500000 records sent, 99850.0 records/sec (97.51 MB/sec),
# 1250.2 ms avg latency, 2531.0 ms max latency

Paramètres de Test

# Ultra-haute performance (acks=1, gros batches)
kafka-producer-perf-test \
  --topic perf-test \
  --num-records 10000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props \
    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
    acks=1 \
    batch.size=1048576 \
    linger.ms=100 \
    compression.type=lz4 \
    buffer.memory=67108864

# Durabilité max (acks=all, petits batches)
kafka-producer-perf-test \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props \
    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \
    acks=all \
    batch.size=16384 \
    linger.ms=0 \
    compression.type=none

kafka-consumer-perf-test

# Test consommation
kafka-consumer-perf-test \
  --broker-list localhost:9092 \
  --topic perf-test \
  --messages 1000000 \
  --threads 1 \
  --consumer-props \
    fetch.min.bytes=1048576 \
    fetch.max.wait.ms=500

# Résultat:
# data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
# nMsg.sec, rebalance.time.ms, fetch.time.ms, ...
# 976.5625, 195.3125, 1000000, 200000, 3024, 5000

Dashboard Grafana

Query Prometheus

# Throughput entrant (MB/s)
sum(rate(kafka_server_brokertopicmetrics_bytesinpersec_count[1m])) / 1024 / 1024

# Throughput sortant (MB/s)
sum(rate(kafka_server_brokertopicmetrics_bytesoutpersec_count[1m])) / 1024 / 1024

# Latency P99 produce
histogram_quantile(0.99,
  sum(rate(kafka_network_requestmetrics_totaltimems_bucket{request="Produce"}[5m]))
  by (le)
)

# Under-replicated partitions (alerte si > 0)
kafka_server_replicamanager_underreplicatedpartitions

# Consumer lag max
max(kafka_consumer_fetch_manager_metrics_records_lag_max)

Cas d'Usage et Configurations

Haute Disponibilité (Financial Trading)

Contraintes

  • Latence P99 < 10ms
  • Zéro perte de données
  • Availability > 99.99%

Configuration

# Broker
min.insync.replicas=2
default.replication.factor=3
unclean.leader.election.enable=false

# Producer
acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=2147483647

# Consumer
enable.auto.commit=false
isolation.level=read_committed

Haut Débit (Log Aggregation)

Contraintes

  • Throughput > 500 MB/s
  • Latence acceptable (< 100ms P99)
  • Retention courte (24h)

Configuration

# Broker
num.network.threads=16
num.io.threads=32
log.segment.bytes=268435456  # 256 MB
log.retention.hours=24

# Producer
acks=1
batch.size=1048576  # 1 MB
linger.ms=100
compression.type=lz4
buffer.memory=134217728  # 128 MB

# Consumer
fetch.min.bytes=1048576  # 1 MB
fetch.max.wait.ms=500
max.poll.records=10000

IoT / Edge Computing

Contraintes

  • Bandwidth limité
  • CPU limité
  • Stockage limité

Configuration

# Broker
log.retention.bytes=10737418240  # 10 GB max
log.segment.bytes=104857600  # 100 MB
compression.type=zstd  # Meilleur ratio

# Producer
acks=1
batch.size=524288  # 512 KB
linger.ms=1000  # 1s (plus de batching)
compression.type=zstd

# Consumer
fetch.min.bytes=524288
fetch.max.wait.ms=1000

Checklist d'Optimisation

Niveau Système (Linux)

  • [ ] File descriptors: ulimit -n ≥ 100000
  • [ ] vm.swappiness = 1
  • [ ] vm.dirty_ratio = 80
  • [ ] TCP buffers augmentés
  • [ ] XFS avec noatime,nodiratime
  • [ ] Désactiver THP (Transparent Huge Pages)

Niveau JVM

  • [ ] Heap: 6-8 GB max
  • [ ] G1GC configuré
  • [ ] GC logs activés
  • [ ] -XX:MaxGCPauseMillis=20

Niveau Broker

  • [ ] num.io.threads = nombre cores
  • [ ] num.network.threads = cores / 2
  • [ ] log.segment.bytes approprié
  • [ ] Compression activée (lz4)
  • [ ] Réplication factor ≥ 3

Niveau Topic

  • [ ] Partitions = throughput / 10-30 MB/s
  • [ ] min.insync.replicas ≥ 2
  • [ ] Retention appropriée

Niveau Producteur

  • [ ] batch.size ≥ 256 KB
  • [ ] linger.ms = 10-100ms
  • [ ] acks selon besoins
  • [ ] Compression lz4
  • [ ] Asynchrone avec callbacks

Niveau Consommateur

  • [ ] fetch.min.bytes ≥ 1 MB
  • [ ] max.poll.records optimisé
  • [ ] Traitement parallèle
  • [ ] Commit asynchrone

Monitoring

  • [ ] JMX metrics exportées
  • [ ] Grafana dashboards
  • [ ] Alertes configurées
  • [ ] Consumer lag surveillé

Conclusion

L'optimisation de Kafka est un processus itératif qui nécessite :

  1. Comprendre votre workload : Throughput cible, latence acceptable, durabilité requise
  2. Mesurer d'abord : Benchmarker avant toute optimisation
  3. Optimiser par couches : OS → JVM → Broker → Application
  4. Tester en production : Les benchmarks ne remplacent pas le réel
  5. Monitorer en continu : Les performances dégradent avec le temps

Gains Typiques

Configuration Naïve:
- 50K msg/s
- P99 latency: 50ms

Configuration Optimisée:
- 500K msg/s (10x)
- P99 latency: 15ms (3x mieux)

Les optimisations décrites dans ce guide vous permettront d'atteindre ces performances. Kafka peut gérer des millions de messages par seconde quand correctement configuré.

Ressources

Bonne optimisation ! 🚀