Aller au contenu principal
KAFKA

Kafka Streams : Stream Processing avec l'API Streams

Maîtrisez Kafka Streams pour le traitement de flux en temps réel : architecture, patterns, stateful processing, windowing, joins et déploiement en production.

Florian Courouge
28 min de lecture
Kafka
Kafka Streams
Stream Processing
Real-time
Microservices
Event Processing
Niveau:
Avancé

Kafka Streams : Stream Processing avec l'API Streams

Kafka Streams est une bibliothèque client Java/Scala pour construire des applications de traitement de flux en temps réel. Contrairement à des frameworks comme Apache Flink ou Spark Streaming, Kafka Streams est une simple bibliothèque qui s'intègre directement dans votre application, sans nécessiter de cluster séparé.

Kafka Streams Architecture

Pourquoi Kafka Streams ?

Comparaison avec les Alternatives

| Caractéristique | Kafka Streams | Apache Flink | Spark Streaming | |----------------|--------------|--------------|-----------------| | Déploiement | Simple JAR | Cluster dédié | Cluster dédié | | Scalabilité | Partitions Kafka | Task slots | Executors | | State Management | RocksDB local | Flink state backend | RDD/DataFrame | | Latence | < 10ms | < 10ms | ~100ms (micro-batch) | | Exactly-once | Oui (natif) | Oui | Oui (avec checkpoint) | | Complexité ops | Faible | Moyenne | Élevée |

Avantages de Kafka Streams

1. Simplicité opérationnelle

  • Pas de cluster à gérer
  • Déploiement comme n'importe quelle application Java
  • Scalabilité via le nombre d'instances

2. Intégration native avec Kafka

  • Lecture/écriture optimisée
  • Exactly-once semantics
  • Gestion automatique des offsets

3. State management puissant

  • State stores locaux (RocksDB)
  • Changelog topics pour durabilité
  • Standby replicas pour HA

4. API riche et expressive

  • DSL de haut niveau
  • Processor API de bas niveau
  • Support complet des windowing operations

Architecture de Kafka Streams

Concepts Fondamentaux

┌─────────────────────────────────────────────────────────────┐
│                 KAFKA STREAMS APPLICATION                    │
│                                                              │
│  ┌────────────────────────────────────────────────────────┐│
│  │              Stream Processing Topology                ││
│  │                                                        ││
│  │   ┌──────────┐   ┌───────────┐   ┌──────────┐       ││
│  │   │  Source  │──▶│ Processor │──▶│   Sink   │       ││
│  │   │  (Kafka) │   │  (Logic)  │   │ (Kafka)  │       ││
│  │   └──────────┘   └─────┬─────┘   └──────────┘       ││
│  │                         │                             ││
│  │                         ▼                             ││
│  │                 ┌──────────────┐                     ││
│  │                 │ State Stores │                     ││
│  │                 │  (RocksDB)   │                     ││
│  │                 └──────────────┘                     ││
│  └────────────────────────────────────────────────────────┘│
│                                                              │
│  Scalabilité = Nombre de partitions Kafka                   │
└─────────────────────────────────────────────────────────────┘

Stream Processing Topology

Une topologie Kafka Streams définit le flux de traitement :

// Topologie simple : lire, transformer, écrire
StreamsBuilder builder = new StreamsBuilder();

// Source: lire depuis un topic
KStream<String, String> source = builder.stream("input-topic");

// Processor: transformer
KStream<String, String> processed = source
    .filter((key, value) -> value != null)
    .mapValues(value -> value.toUpperCase());

// Sink: écrire vers un topic
processed.to("output-topic");

// Construire la topologie
Topology topology = builder.build();

Partitioning et Parallelism

Input Topic: user-events (3 partitions)

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Partition 0  │  │ Partition 1  │  │ Partition 2  │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                 │                 │
       ▼                 ▼                 ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│   Stream     │  │   Stream     │  │   Stream     │
│   Task 0     │  │   Task 1     │  │   Task 2     │
│              │  │              │  │              │
│ [RocksDB 0]  │  │ [RocksDB 1]  │  │ [RocksDB 2]  │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                 │                 │
       ▼                 ▼                 ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Partition 0  │  │ Partition 1  │  │ Partition 2  │
└──────────────┘  └──────────────┘  └──────────────┘
Output Topic: processed-events (3 partitions)

// Scalabilité : 1 task par partition
// Pour 3 partitions → déployer 1 à 3 instances

Setup et Configuration

Dépendances Maven

<dependencies>
    <!-- Kafka Streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.0</version>
    </dependency>

    <!-- Sérialisation JSON (optionnel) -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>

    <!-- Avro (optionnel) -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-streams-avro-serde</artifactId>
        <version>7.5.0</version>
    </dependency>

    <!-- Testing -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>3.6.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>

Configuration de Base

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;

public class StreamsApplication {

    public static Properties getStreamsConfig() {
        Properties props = new Properties();

        // Application ID (= consumer group ID)
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");

        // Bootstrap servers
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Sérialisation par défaut
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());

        // State store directory
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

        // Processing guarantee
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
                  StreamsConfig.EXACTLY_ONCE_V2);

        // Commit interval (pour at-least-once)
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

        // Number of threads (= parallelism par instance)
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

        // Cache size (pour optimiser les aggregations)
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);

        // RocksDB configuration
        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
                  CustomRocksDBConfigSetter.class);

        return props;
    }

    public static void main(String[] args) {
        // Créer la topologie
        StreamsBuilder builder = new StreamsBuilder();
        // ... définir la topologie ...

        // Démarrer l'application
        KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());

        // Gérer le shutdown gracefully
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        // Démarrer
        streams.start();
    }
}

Stateless Transformations

Filter et FilterNot

KStream<String, Order> orders = builder.stream("orders");

// Filtrer les commandes > 100€
KStream<String, Order> bigOrders = orders
    .filter((key, order) -> order.getAmount() > 100.0);

// Exclure les commandes annulées
KStream<String, Order> activeOrders = orders
    .filterNot((key, order) -> order.getStatus().equals("CANCELLED"));

Map et MapValues

// Map: transformer key ET value
KStream<String, String> upperCased = orders
    .map((key, order) ->
        KeyValue.pair(key.toUpperCase(), order.toUpperCase())
    );

// MapValues: transformer SEULEMENT value (plus performant)
KStream<String, OrderSummary> summaries = orders
    .mapValues(order -> new OrderSummary(
        order.getId(),
        order.getAmount(),
        order.getCustomerId()
    ));

// FlatMapValues: 1 input → N outputs
KStream<String, String> items = orders
    .flatMapValues(order -> order.getItems()); // List<String>

Branch: Router vers Plusieurs Streams

// Séparer les commandes par montant
Map<String, KStream<String, Order>> branches = orders
    .split(Named.as("order-"))
    .branch((key, order) -> order.getAmount() < 50,
            Branched.as("small"))
    .branch((key, order) -> order.getAmount() >= 50 && order.getAmount() < 200,
            Branched.as("medium"))
    .branch((key, order) -> order.getAmount() >= 200,
            Branched.as("large"))
    .noDefaultBranch();

// Traiter chaque branche séparément
branches.get("order-small").to("small-orders");
branches.get("order-medium").to("medium-orders");
branches.get("order-large").to("large-orders");

SelectKey: Rechanger la Clé

// Re-partitionner par customer ID au lieu de order ID
KStream<String, Order> rekeyed = orders
    .selectKey((orderId, order) -> order.getCustomerId());
// ⚠️ Ceci déclenche un repartitioning (coûteux)

Stateful Processing avec KTable

KStream vs KTable

KStream: Flux d'événements (append-only)
┌─────┬─────┬─────┬─────┬─────┐
│ id:1│ id:2│ id:1│ id:3│ id:1│ (3 événements pour id:1)
└─────┴─────┴─────┴─────┴─────┘

KTable: État courant (upsert)
┌────────┬────────┬────────┐
│ id:1   │ id:2   │ id:3   │ (1 état par clé)
│ (last) │ (last) │ (last) │
└────────┴────────┴────────┘

Créer un KTable

// KTable depuis un topic compacté
KTable<String, User> users = builder.table(
    "users",
    Materialized.<String, User, KeyValueStore<Bytes, byte[]>>as("users-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(userSerde)
);

// GlobalKTable: répliqué sur chaque instance
GlobalKTable<String, Product> products = builder.globalTable(
    "products",
    Materialized.as("products-global-store")
);

Aggregations avec KTable

// Compter les commandes par customer
KTable<String, Long> orderCountByCustomer = orders
    .groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
    .count(Materialized.as("customer-order-count"));

// Somme des montants par customer
KTable<String, Double> totalAmountByCustomer = orders
    .groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
    .aggregate(
        () -> 0.0,  // Initializer
        (customerId, order, total) -> total + order.getAmount(),  // Adder
        (customerId, order, total) -> total - order.getAmount(),  // Subtractor
        Materialized.as("customer-total-amount")
    );

// Reduce: combiner les valeurs
KTable<String, Order> latestOrderByCustomer = orders
    .groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
    .reduce(
        (oldOrder, newOrder) -> newOrder,  // Garder le plus récent
        Materialized.as("customer-latest-order")
    );

Windowing Operations

Types de Windows

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;

// 1. Tumbling Window: fenêtres fixes non-chevauchantes
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));

KTable<Windowed<String>, Long> clicksPerWindow = clicks
    .groupByKey()
    .windowedBy(tumblingWindow)
    .count();

// 2. Hopping Window: fenêtres fixes chevauchantes
TimeWindows hoppingWindow = TimeWindows
    .ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
    .advanceBy(Duration.ofMinutes(1));  // Avance toutes les 1 minute

KTable<Windowed<String>, Long> hoppingCounts = clicks
    .groupByKey()
    .windowedBy(hoppingWindow)
    .count();

// 3. Session Window: fenêtres basées sur l'inactivité
SessionWindows sessionWindow = SessionWindows
    .ofInactivityGapAndGrace(Duration.ofMinutes(30), Duration.ofMinutes(5));

KTable<Windowed<String>, Long> sessionsPerUser = userActivity
    .groupByKey()
    .windowedBy(sessionWindow)
    .count();

// 4. Sliding Window: fenêtres glissantes continues
SlidingWindows slidingWindow = SlidingWindows
    .ofTimeDifferenceAndGrace(Duration.ofMinutes(10), Duration.ofSeconds(30));

KTable<Windowed<String>, Long> slidingCounts = events
    .groupByKey()
    .windowedBy(slidingWindow)
    .count();

Exemple Pratique: Détection d'Anomalies

// Détecter les pics de requêtes par utilisateur
KStream<String, Long> requestCounts = requests
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count()
    .toStream()
    .map((windowedKey, count) -> KeyValue.pair(windowedKey.key(), count));

// Alerter si > 100 requêtes/minute
KStream<String, Alert> alerts = requestCounts
    .filter((userId, count) -> count > 100)
    .mapValues((userId, count) -> new Alert(
        userId,
        "High request rate: " + count + " req/min",
        Instant.now()
    ));

alerts.to("security-alerts");

Joins

Stream-Stream Join

// Join entre deux streams dans une fenêtre temporelle
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Payment> payments = builder.stream("payments");

// Inner join: garder seulement si les deux existent
KStream<String, OrderWithPayment> joined = orders.join(
    payments,
    (order, payment) -> new OrderWithPayment(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(
        Serdes.String(),
        orderSerde,
        paymentSerde
    )
);

// Left join: garder toutes les commandes
KStream<String, OrderWithPayment> leftJoined = orders.leftJoin(
    payments,
    (order, payment) -> new OrderWithPayment(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

// Outer join: garder tout
KStream<String, OrderWithPayment> outerJoined = orders.outerJoin(
    payments,
    (order, payment) -> new OrderWithPayment(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

Stream-Table Join (Enrichment Pattern)

// Enrichir les commandes avec les données utilisateur
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");

// Re-key orders par customer ID
KStream<String, Order> ordersByCustomer = orders
    .selectKey((orderId, order) -> order.getCustomerId());

// Join: enrichir avec les données utilisateur
KStream<String, EnrichedOrder> enriched = ordersByCustomer.join(
    users,
    (order, user) -> new EnrichedOrder(
        order,
        user.getName(),
        user.getEmail(),
        user.getAddress()
    )
);

enriched.to("enriched-orders");

Table-Table Join

// Joindre deux tables stateful
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserSettings> settings = builder.table("user-settings");

KTable<String, CompleteUserData> completeData = profiles.join(
    settings,
    (profile, setting) -> new CompleteUserData(profile, setting)
);

// Écrire dans un topic compacté
completeData.toStream().to("complete-user-data");

GlobalKTable Join: Éviter le Repartitioning

// GlobalKTable: pas besoin de co-partitioning
GlobalKTable<String, Product> products = builder.globalTable("products");
KStream<String, Order> orders = builder.stream("orders");

// Join avec clé différente (product ID depuis order)
KStream<String, EnrichedOrder> enriched = orders.join(
    products,
    (orderId, order) -> order.getProductId(),  // Extrait la foreign key
    (order, product) -> new EnrichedOrder(order, product)
);

State Stores Avancés

Types de State Stores

import org.apache.kafka.streams.state.*;

// 1. KeyValueStore: stockage clé-valeur
StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-kv-store"),
        Serdes.String(),
        Serdes.Long()
    );

// 2. WindowStore: stockage avec fenêtres temporelles
StoreBuilder<WindowStore<String, Long>> windowStoreBuilder =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore(
            "my-window-store",
            Duration.ofDays(1),  // Retention
            Duration.ofMinutes(5),  // Window size
            false  // Retain duplicates
        ),
        Serdes.String(),
        Serdes.Long()
    );

// 3. SessionStore: stockage de sessions
StoreBuilder<SessionStore<String, Long>> sessionStoreBuilder =
    Stores.sessionStoreBuilder(
        Stores.persistentSessionStore(
            "my-session-store",
            Duration.ofHours(1)  // Retention
        ),
        Serdes.String(),
        Serdes.Long()
    );

Interactive Queries: Interroger les State Stores

// Application Kafka Streams exposant un REST API
public class StateStoreQueryService {

    private final KafkaStreams streams;

    public StateStoreQueryService(KafkaStreams streams) {
        this.streams = streams;
    }

    // Requête locale
    public Long getCount(String key) {
        ReadOnlyKeyValueStore<String, Long> store = streams.store(
            StoreQueryParameters.fromNameAndType(
                "customer-order-count",
                QueryableStoreTypes.keyValueStore()
            )
        );

        return store.get(key);
    }

    // Requête distribuée (peut être sur une autre instance)
    public Long getCountDistributed(String key) {
        // Trouver quelle instance possède cette clé
        StreamsMetadata metadata = streams.queryMetadataForKey(
            "customer-order-count",
            key,
            Serdes.String().serializer()
        );

        if (metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
            return null;  // Store pas encore prêt
        }

        // Si c'est local, requête locale
        if (metadata.host().equals(getThisInstanceHost())) {
            return getCount(key);
        }

        // Sinon, faire un appel HTTP vers l'autre instance
        return fetchFromRemoteInstance(metadata.host(), metadata.port(), key);
    }

    // Requête range
    public Map<String, Long> getCountRange(String from, String to) {
        ReadOnlyKeyValueStore<String, Long> store = streams.store(
            StoreQueryParameters.fromNameAndType(
                "customer-order-count",
                QueryableStoreTypes.keyValueStore()
            )
        );

        Map<String, Long> results = new HashMap<>();
        try (KeyValueIterator<String, Long> iterator = store.range(from, to)) {
            while (iterator.hasNext()) {
                KeyValue<String, Long> entry = iterator.next();
                results.put(entry.key, entry.value);
            }
        }
        return results;
    }
}

// Endpoint REST avec Spring Boot
@RestController
@RequestMapping("/state")
public class StateStoreController {

    @Autowired
    private StateStoreQueryService queryService;

    @GetMapping("/count/{key}")
    public ResponseEntity<Long> getCount(@PathVariable String key) {
        Long count = queryService.getCountDistributed(key);
        return count != null
            ? ResponseEntity.ok(count)
            : ResponseEntity.notFound().build();
    }
}

Testing avec TopologyTestDriver

import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;

public class StreamsApplicationTest {

    @Test
    public void testOrderProcessing() {
        // Créer la topologie
        StreamsBuilder builder = new StreamsBuilder();
        // ... définir la topologie ...

        // Configuration de test
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        // Test driver
        try (TopologyTestDriver testDriver = new TopologyTestDriver(
                builder.build(), props)) {

            // Topics de test
            TestInputTopic<String, Order> inputTopic = testDriver.createInputTopic(
                "orders",
                Serdes.String().serializer(),
                orderSerde.serializer()
            );

            TestOutputTopic<String, ProcessedOrder> outputTopic =
                testDriver.createOutputTopic(
                    "processed-orders",
                    Serdes.String().deserializer(),
                    processedOrderSerde.deserializer()
                );

            // Envoyer des messages de test
            Order order1 = new Order("1", "customer-1", 150.0);
            inputTopic.pipeInput("1", order1);

            Order order2 = new Order("2", "customer-1", 75.0);
            inputTopic.pipeInput("2", order2);

            // Vérifier les résultats
            List<TestRecord<String, ProcessedOrder>> results =
                outputTopic.readRecordsToList();

            assertEquals(2, results.size());
            assertEquals("customer-1", results.get(0).getValue().getCustomerId());
        }
    }

    @Test
    public void testWindowing() {
        // Test avec fenêtres temporelles
        Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");

        try (TopologyTestDriver testDriver = new TopologyTestDriver(
                builder.build(), props)) {

            TestInputTopic<String, Click> inputTopic = testDriver.createInputTopic(
                "clicks",
                Serdes.String().serializer(),
                clickSerde.serializer()
            );

            // Envoyer avec timestamps spécifiques
            inputTopic.pipeInput("user-1", new Click("user-1"), baseTime);
            inputTopic.pipeInput("user-1", new Click("user-1"),
                baseTime.plus(Duration.ofMinutes(1)));
            inputTopic.pipeInput("user-1", new Click("user-1"),
                baseTime.plus(Duration.ofMinutes(6))); // Nouvelle fenêtre

            // Vérifier les agrégations par fenêtre
            // ...
        }
    }
}

Déploiement en Production

Docker Compose

version: '3.8'

services:
  streams-app:
    build: .
    image: my-streams-app:1.0.0
    deploy:
      replicas: 3  # Pour parallélisme sur 3 partitions
      resources:
        limits:
          memory: 2G
          cpus: '1'
        reservations:
          memory: 1G
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      STREAMS_APPLICATION_ID: order-processor
      STREAMS_NUM_THREADS: 2
      STREAMS_STATE_DIR: /tmp/kafka-streams
      JAVA_OPTS: >
        -Xmx1536m
        -Xms1536m
        -XX:+UseG1GC
        -XX:MaxGCPauseMillis=20
        -Dcom.sun.management.jmxremote
        -Dcom.sun.management.jmxremote.port=9999
        -Dcom.sun.management.jmxremote.authenticate=false
        -Dcom.sun.management.jmxremote.ssl=false
    volumes:
      - streams-state:/tmp/kafka-streams
    networks:
      - kafka-network

volumes:
  streams-state:

networks:
  kafka-network:
    external: true

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-streams-app
  labels:
    app: kafka-streams-app
spec:
  replicas: 3  # Matche le nombre de partitions
  selector:
    matchLabels:
      app: kafka-streams-app
  template:
    metadata:
      labels:
        app: kafka-streams-app
    spec:
      containers:
      - name: streams-app
        image: my-streams-app:1.0.0
        ports:
        - containerPort: 8080  # REST API pour interactive queries
        - containerPort: 9999  # JMX metrics
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-service:9092"
        - name: STREAMS_APPLICATION_ID
          value: "order-processor"
        - name: STREAMS_NUM_THREADS
          value: "2"
        - name: JAVA_OPTS
          value: "-Xmx1536m -Xms1536m -XX:+UseG1GC"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        volumeMounts:
        - name: state-store
          mountPath: /tmp/kafka-streams
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 5
      volumes:
      - name: state-store
        emptyDir: {}

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-streams-service
spec:
  selector:
    app: kafka-streams-app
  ports:
  - name: http
    port: 8080
    targetPort: 8080
  - name: jmx
    port: 9999
    targetPort: 9999
  type: ClusterIP

Health Checks et Graceful Shutdown

import org.apache.kafka.streams.KafkaStreams.State;

public class StreamsHealthCheck {

    private final KafkaStreams streams;

    public StreamsHealthCheck(KafkaStreams streams) {
        this.streams = streams;

        // Listener pour état
        streams.setStateListener((newState, oldState) -> {
            System.out.println("State transition: " + oldState + " -> " + newState);

            if (newState == State.ERROR) {
                // Alerter, redémarrer, etc.
                System.err.println("Streams application in ERROR state!");
            }
        });

        // Uncaught exception handler
        streams.setUncaughtExceptionHandler((thread, throwable) -> {
            System.err.println("Uncaught exception in thread " + thread.getName());
            throwable.printStackTrace();

            // Décider si on continue ou on arrête
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        });
    }

    // Health check endpoint
    public boolean isHealthy() {
        State state = streams.state();
        return state == State.RUNNING || state == State.REBALANCING;
    }

    // Readiness check
    public boolean isReady() {
        return streams.state() == State.RUNNING;
    }

    // Graceful shutdown
    public void shutdown() {
        System.out.println("Shutting down Kafka Streams gracefully...");
        streams.close(Duration.ofSeconds(30));
        System.out.println("Shutdown complete.");
    }
}

// Spring Boot actuator
@Component
public class StreamsHealthIndicator implements HealthIndicator {

    @Autowired
    private StreamsHealthCheck healthCheck;

    @Override
    public Health health() {
        return healthCheck.isHealthy()
            ? Health.up().build()
            : Health.down().build();
    }
}

Monitoring et Métriques

Métriques JMX Essentielles

// Métriques à surveiller :

// 1. Thread-level metrics
kafka.streams:type=stream-thread-metrics,thread-id=*
  - commit-latency-avg/max
  - poll-latency-avg/max
  - process-latency-avg/max
  - task-created-rate
  - task-closed-rate

// 2. Task-level metrics
kafka.streams:type=stream-task-metrics,thread-id=*,task-id=*
  - process-latency-avg/max
  - commit-latency-avg/max
  - dropped-records-rate

// 3. State store metrics
kafka.streams:type=state-store-metrics,rocksdb-state-id=*
  - put-latency-avg/max
  - get-latency-avg/max
  - flush-latency-avg/max
  - bytes-written-rate
  - bytes-read-rate

// 4. RocksDB metrics
kafka.streams:type=rocksdb-metrics,rocksdb-state-id=*
  - number-open-files
  - num-immutable-mem-table
  - size-all-mem-tables
  - background-errors

Logging et Debugging

# log4j2.xml
log4j.logger.org.apache.kafka.streams=DEBUG
log4j.logger.org.apache.kafka.streams.processor=TRACE
log4j.logger.org.apache.kafka.streams.state=DEBUG

# Activer les métriques détaillées
metrics.recording.level=DEBUG

Bonnes Pratiques

1. Sérialisation

// Utiliser Avro pour schéma evolution
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
          GenericAvroSerde.class);
props.put("schema.registry.url", "http://schema-registry:8081");

// Ou JSON avec Jackson
public class JsonSerde<T> implements Serde<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class<T> targetType;

    public JsonSerde(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public Serializer<T> serializer() {
        return (topic, data) -> {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON", e);
            }
        };
    }

    @Override
    public Deserializer<T> deserializer() {
        return (topic, data) -> {
            try {
                return objectMapper.readValue(data, targetType);
            } catch (Exception e) {
                throw new SerializationException("Error deserializing JSON", e);
            }
        };
    }
}

2. Error Handling

// Deserialization exception handler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);

// Production exception handler
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          DefaultProductionExceptionHandler.class);

// Custom error handling
KStream<String, Order> validOrders = orders
    .flatMap((key, value) -> {
        try {
            Order order = parseOrder(value);
            return Collections.singletonList(KeyValue.pair(key, order));
        } catch (Exception e) {
            // Log error, send to DLQ
            sendToDeadLetterQueue(key, value, e);
            return Collections.emptyList();
        }
    });

3. Exactly-Once Semantics

// Activer exactly-once (EOS)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.EXACTLY_ONCE_V2);

// Requis pour EOS :
// - Kafka >= 3.0 (pour EOS v2)
// - Topics avec replication.factor >= 3
// - min.insync.replicas >= 2

4. State Store Tuning

// Custom RocksDB configuration
public class CustomRocksDBConfigSetter implements RocksDBConfigSetter {

    @Override
    public void setConfig(String storeName, Options options,
                         Map<String, Object> configs) {
        // Block cache (importante pour performances read)
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCache(new LRUCache(16 * 1024 * 1024)); // 16MB
        tableConfig.setFilterPolicy(new BloomFilter(10));
        options.setTableFormatConfig(tableConfig);

        // Write buffer
        options.setWriteBufferSize(16 * 1024 * 1024); // 16MB
        options.setMaxWriteBufferNumber(3);

        // Compaction
        options.setCompactionStyle(CompactionStyle.UNIVERSAL);
        options.setMaxBackgroundCompactions(2);

        // Compression
        options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
    }
}

Conclusion

Kafka Streams offre une solution puissante et élégante pour le stream processing :

Points Forts :

  • Simplicité opérationnelle (pas de cluster séparé)
  • Intégration native avec l'écosystème Kafka
  • Exactly-once semantics
  • State management robuste
  • API riche et expressive

Cas d'Usage Idéaux :

  • Enrichissement de données en temps réel
  • Agrégations et analytics streaming
  • Détection d'anomalies et alerting
  • ETL en temps réel
  • Event-driven microservices

Limitations :

  • Limité à Kafka comme source/sink
  • JVM uniquement (Java/Scala/Kotlin)
  • State stores limités par le disque local

Kafka Streams est le choix parfait pour construire des pipelines de traitement de flux robustes, scalables et production-ready, particulièrement dans des architectures déjà basées sur Kafka.


Cet article fait partie d'une série complète sur Apache Kafka. Consultez mes autres guides sur Kafka Connect, Schema Registry, et les patterns de déploiement en production.