Kafka Streams : Stream Processing avec l'API Streams
Kafka Streams est une bibliothèque client Java/Scala pour construire des applications de traitement de flux en temps réel. Contrairement à des frameworks comme Apache Flink ou Spark Streaming, Kafka Streams est une simple bibliothèque qui s'intègre directement dans votre application, sans nécessiter de cluster séparé.
Pourquoi Kafka Streams ?
Comparaison avec les Alternatives
| Caractéristique | Kafka Streams | Apache Flink | Spark Streaming | |----------------|--------------|--------------|-----------------| | Déploiement | Simple JAR | Cluster dédié | Cluster dédié | | Scalabilité | Partitions Kafka | Task slots | Executors | | State Management | RocksDB local | Flink state backend | RDD/DataFrame | | Latence | < 10ms | < 10ms | ~100ms (micro-batch) | | Exactly-once | Oui (natif) | Oui | Oui (avec checkpoint) | | Complexité ops | Faible | Moyenne | Élevée |
Avantages de Kafka Streams
1. Simplicité opérationnelle
- Pas de cluster à gérer
- Déploiement comme n'importe quelle application Java
- Scalabilité via le nombre d'instances
2. Intégration native avec Kafka
- Lecture/écriture optimisée
- Exactly-once semantics
- Gestion automatique des offsets
3. State management puissant
- State stores locaux (RocksDB)
- Changelog topics pour durabilité
- Standby replicas pour HA
4. API riche et expressive
- DSL de haut niveau
- Processor API de bas niveau
- Support complet des windowing operations
Architecture de Kafka Streams
Concepts Fondamentaux
┌─────────────────────────────────────────────────────────────┐
│ KAFKA STREAMS APPLICATION │
│ │
│ ┌────────────────────────────────────────────────────────┐│
│ │ Stream Processing Topology ││
│ │ ││
│ │ ┌──────────┐ ┌───────────┐ ┌──────────┐ ││
│ │ │ Source │──▶│ Processor │──▶│ Sink │ ││
│ │ │ (Kafka) │ │ (Logic) │ │ (Kafka) │ ││
│ │ └──────────┘ └─────┬─────┘ └──────────┘ ││
│ │ │ ││
│ │ ▼ ││
│ │ ┌──────────────┐ ││
│ │ │ State Stores │ ││
│ │ │ (RocksDB) │ ││
│ │ └──────────────┘ ││
│ └────────────────────────────────────────────────────────┘│
│ │
│ Scalabilité = Nombre de partitions Kafka │
└─────────────────────────────────────────────────────────────┘
Stream Processing Topology
Une topologie Kafka Streams définit le flux de traitement :
// Topologie simple : lire, transformer, écrire
StreamsBuilder builder = new StreamsBuilder();
// Source: lire depuis un topic
KStream<String, String> source = builder.stream("input-topic");
// Processor: transformer
KStream<String, String> processed = source
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase());
// Sink: écrire vers un topic
processed.to("output-topic");
// Construire la topologie
Topology topology = builder.build();
Partitioning et Parallelism
Input Topic: user-events (3 partitions)
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Partition 0 │ │ Partition 1 │ │ Partition 2 │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Stream │ │ Stream │ │ Stream │
│ Task 0 │ │ Task 1 │ │ Task 2 │
│ │ │ │ │ │
│ [RocksDB 0] │ │ [RocksDB 1] │ │ [RocksDB 2] │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Partition 0 │ │ Partition 1 │ │ Partition 2 │
└──────────────┘ └──────────────┘ └──────────────┘
Output Topic: processed-events (3 partitions)
// Scalabilité : 1 task par partition
// Pour 3 partitions → déployer 1 à 3 instances
Setup et Configuration
Dépendances Maven
<dependencies>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>
</dependency>
<!-- Sérialisation JSON (optionnel) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Avro (optionnel) -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>7.5.0</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Configuration de Base
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class StreamsApplication {
public static Properties getStreamsConfig() {
Properties props = new Properties();
// Application ID (= consumer group ID)
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// Bootstrap servers
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Sérialisation par défaut
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
// State store directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Processing guarantee
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Commit interval (pour at-least-once)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Number of threads (= parallelism par instance)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
// Cache size (pour optimiser les aggregations)
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);
// RocksDB configuration
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
CustomRocksDBConfigSetter.class);
return props;
}
public static void main(String[] args) {
// Créer la topologie
StreamsBuilder builder = new StreamsBuilder();
// ... définir la topologie ...
// Démarrer l'application
KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());
// Gérer le shutdown gracefully
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
// Démarrer
streams.start();
}
}
Stateless Transformations
Filter et FilterNot
KStream<String, Order> orders = builder.stream("orders");
// Filtrer les commandes > 100€
KStream<String, Order> bigOrders = orders
.filter((key, order) -> order.getAmount() > 100.0);
// Exclure les commandes annulées
KStream<String, Order> activeOrders = orders
.filterNot((key, order) -> order.getStatus().equals("CANCELLED"));
Map et MapValues
// Map: transformer key ET value
KStream<String, String> upperCased = orders
.map((key, order) ->
KeyValue.pair(key.toUpperCase(), order.toUpperCase())
);
// MapValues: transformer SEULEMENT value (plus performant)
KStream<String, OrderSummary> summaries = orders
.mapValues(order -> new OrderSummary(
order.getId(),
order.getAmount(),
order.getCustomerId()
));
// FlatMapValues: 1 input → N outputs
KStream<String, String> items = orders
.flatMapValues(order -> order.getItems()); // List<String>
Branch: Router vers Plusieurs Streams
// Séparer les commandes par montant
Map<String, KStream<String, Order>> branches = orders
.split(Named.as("order-"))
.branch((key, order) -> order.getAmount() < 50,
Branched.as("small"))
.branch((key, order) -> order.getAmount() >= 50 && order.getAmount() < 200,
Branched.as("medium"))
.branch((key, order) -> order.getAmount() >= 200,
Branched.as("large"))
.noDefaultBranch();
// Traiter chaque branche séparément
branches.get("order-small").to("small-orders");
branches.get("order-medium").to("medium-orders");
branches.get("order-large").to("large-orders");
SelectKey: Rechanger la Clé
// Re-partitionner par customer ID au lieu de order ID
KStream<String, Order> rekeyed = orders
.selectKey((orderId, order) -> order.getCustomerId());
// ⚠️ Ceci déclenche un repartitioning (coûteux)
Stateful Processing avec KTable
KStream vs KTable
KStream: Flux d'événements (append-only)
┌─────┬─────┬─────┬─────┬─────┐
│ id:1│ id:2│ id:1│ id:3│ id:1│ (3 événements pour id:1)
└─────┴─────┴─────┴─────┴─────┘
KTable: État courant (upsert)
┌────────┬────────┬────────┐
│ id:1 │ id:2 │ id:3 │ (1 état par clé)
│ (last) │ (last) │ (last) │
└────────┴────────┴────────┘
Créer un KTable
// KTable depuis un topic compacté
KTable<String, User> users = builder.table(
"users",
Materialized.<String, User, KeyValueStore<Bytes, byte[]>>as("users-store")
.withKeySerde(Serdes.String())
.withValueSerde(userSerde)
);
// GlobalKTable: répliqué sur chaque instance
GlobalKTable<String, Product> products = builder.globalTable(
"products",
Materialized.as("products-global-store")
);
Aggregations avec KTable
// Compter les commandes par customer
KTable<String, Long> orderCountByCustomer = orders
.groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
.count(Materialized.as("customer-order-count"));
// Somme des montants par customer
KTable<String, Double> totalAmountByCustomer = orders
.groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
.aggregate(
() -> 0.0, // Initializer
(customerId, order, total) -> total + order.getAmount(), // Adder
(customerId, order, total) -> total - order.getAmount(), // Subtractor
Materialized.as("customer-total-amount")
);
// Reduce: combiner les valeurs
KTable<String, Order> latestOrderByCustomer = orders
.groupBy((orderId, order) -> KeyValue.pair(order.getCustomerId(), order))
.reduce(
(oldOrder, newOrder) -> newOrder, // Garder le plus récent
Materialized.as("customer-latest-order")
);
Windowing Operations
Types de Windows
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;
// 1. Tumbling Window: fenêtres fixes non-chevauchantes
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
KTable<Windowed<String>, Long> clicksPerWindow = clicks
.groupByKey()
.windowedBy(tumblingWindow)
.count();
// 2. Hopping Window: fenêtres fixes chevauchantes
TimeWindows hoppingWindow = TimeWindows
.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
.advanceBy(Duration.ofMinutes(1)); // Avance toutes les 1 minute
KTable<Windowed<String>, Long> hoppingCounts = clicks
.groupByKey()
.windowedBy(hoppingWindow)
.count();
// 3. Session Window: fenêtres basées sur l'inactivité
SessionWindows sessionWindow = SessionWindows
.ofInactivityGapAndGrace(Duration.ofMinutes(30), Duration.ofMinutes(5));
KTable<Windowed<String>, Long> sessionsPerUser = userActivity
.groupByKey()
.windowedBy(sessionWindow)
.count();
// 4. Sliding Window: fenêtres glissantes continues
SlidingWindows slidingWindow = SlidingWindows
.ofTimeDifferenceAndGrace(Duration.ofMinutes(10), Duration.ofSeconds(30));
KTable<Windowed<String>, Long> slidingCounts = events
.groupByKey()
.windowedBy(slidingWindow)
.count();
Exemple Pratique: Détection d'Anomalies
// Détecter les pics de requêtes par utilisateur
KStream<String, Long> requestCounts = requests
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count()
.toStream()
.map((windowedKey, count) -> KeyValue.pair(windowedKey.key(), count));
// Alerter si > 100 requêtes/minute
KStream<String, Alert> alerts = requestCounts
.filter((userId, count) -> count > 100)
.mapValues((userId, count) -> new Alert(
userId,
"High request rate: " + count + " req/min",
Instant.now()
));
alerts.to("security-alerts");
Joins
Stream-Stream Join
// Join entre deux streams dans une fenêtre temporelle
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Payment> payments = builder.stream("payments");
// Inner join: garder seulement si les deux existent
KStream<String, OrderWithPayment> joined = orders.join(
payments,
(order, payment) -> new OrderWithPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
orderSerde,
paymentSerde
)
);
// Left join: garder toutes les commandes
KStream<String, OrderWithPayment> leftJoined = orders.leftJoin(
payments,
(order, payment) -> new OrderWithPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
// Outer join: garder tout
KStream<String, OrderWithPayment> outerJoined = orders.outerJoin(
payments,
(order, payment) -> new OrderWithPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
Stream-Table Join (Enrichment Pattern)
// Enrichir les commandes avec les données utilisateur
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");
// Re-key orders par customer ID
KStream<String, Order> ordersByCustomer = orders
.selectKey((orderId, order) -> order.getCustomerId());
// Join: enrichir avec les données utilisateur
KStream<String, EnrichedOrder> enriched = ordersByCustomer.join(
users,
(order, user) -> new EnrichedOrder(
order,
user.getName(),
user.getEmail(),
user.getAddress()
)
);
enriched.to("enriched-orders");
Table-Table Join
// Joindre deux tables stateful
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserSettings> settings = builder.table("user-settings");
KTable<String, CompleteUserData> completeData = profiles.join(
settings,
(profile, setting) -> new CompleteUserData(profile, setting)
);
// Écrire dans un topic compacté
completeData.toStream().to("complete-user-data");
GlobalKTable Join: Éviter le Repartitioning
// GlobalKTable: pas besoin de co-partitioning
GlobalKTable<String, Product> products = builder.globalTable("products");
KStream<String, Order> orders = builder.stream("orders");
// Join avec clé différente (product ID depuis order)
KStream<String, EnrichedOrder> enriched = orders.join(
products,
(orderId, order) -> order.getProductId(), // Extrait la foreign key
(order, product) -> new EnrichedOrder(order, product)
);
State Stores Avancés
Types de State Stores
import org.apache.kafka.streams.state.*;
// 1. KeyValueStore: stockage clé-valeur
StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-kv-store"),
Serdes.String(),
Serdes.Long()
);
// 2. WindowStore: stockage avec fenêtres temporelles
StoreBuilder<WindowStore<String, Long>> windowStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"my-window-store",
Duration.ofDays(1), // Retention
Duration.ofMinutes(5), // Window size
false // Retain duplicates
),
Serdes.String(),
Serdes.Long()
);
// 3. SessionStore: stockage de sessions
StoreBuilder<SessionStore<String, Long>> sessionStoreBuilder =
Stores.sessionStoreBuilder(
Stores.persistentSessionStore(
"my-session-store",
Duration.ofHours(1) // Retention
),
Serdes.String(),
Serdes.Long()
);
Interactive Queries: Interroger les State Stores
// Application Kafka Streams exposant un REST API
public class StateStoreQueryService {
private final KafkaStreams streams;
public StateStoreQueryService(KafkaStreams streams) {
this.streams = streams;
}
// Requête locale
public Long getCount(String key) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"customer-order-count",
QueryableStoreTypes.keyValueStore()
)
);
return store.get(key);
}
// Requête distribuée (peut être sur une autre instance)
public Long getCountDistributed(String key) {
// Trouver quelle instance possède cette clé
StreamsMetadata metadata = streams.queryMetadataForKey(
"customer-order-count",
key,
Serdes.String().serializer()
);
if (metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
return null; // Store pas encore prêt
}
// Si c'est local, requête locale
if (metadata.host().equals(getThisInstanceHost())) {
return getCount(key);
}
// Sinon, faire un appel HTTP vers l'autre instance
return fetchFromRemoteInstance(metadata.host(), metadata.port(), key);
}
// Requête range
public Map<String, Long> getCountRange(String from, String to) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"customer-order-count",
QueryableStoreTypes.keyValueStore()
)
);
Map<String, Long> results = new HashMap<>();
try (KeyValueIterator<String, Long> iterator = store.range(from, to)) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
results.put(entry.key, entry.value);
}
}
return results;
}
}
// Endpoint REST avec Spring Boot
@RestController
@RequestMapping("/state")
public class StateStoreController {
@Autowired
private StateStoreQueryService queryService;
@GetMapping("/count/{key}")
public ResponseEntity<Long> getCount(@PathVariable String key) {
Long count = queryService.getCountDistributed(key);
return count != null
? ResponseEntity.ok(count)
: ResponseEntity.notFound().build();
}
}
Testing avec TopologyTestDriver
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
public class StreamsApplicationTest {
@Test
public void testOrderProcessing() {
// Créer la topologie
StreamsBuilder builder = new StreamsBuilder();
// ... définir la topologie ...
// Configuration de test
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Test driver
try (TopologyTestDriver testDriver = new TopologyTestDriver(
builder.build(), props)) {
// Topics de test
TestInputTopic<String, Order> inputTopic = testDriver.createInputTopic(
"orders",
Serdes.String().serializer(),
orderSerde.serializer()
);
TestOutputTopic<String, ProcessedOrder> outputTopic =
testDriver.createOutputTopic(
"processed-orders",
Serdes.String().deserializer(),
processedOrderSerde.deserializer()
);
// Envoyer des messages de test
Order order1 = new Order("1", "customer-1", 150.0);
inputTopic.pipeInput("1", order1);
Order order2 = new Order("2", "customer-1", 75.0);
inputTopic.pipeInput("2", order2);
// Vérifier les résultats
List<TestRecord<String, ProcessedOrder>> results =
outputTopic.readRecordsToList();
assertEquals(2, results.size());
assertEquals("customer-1", results.get(0).getValue().getCustomerId());
}
}
@Test
public void testWindowing() {
// Test avec fenêtres temporelles
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
try (TopologyTestDriver testDriver = new TopologyTestDriver(
builder.build(), props)) {
TestInputTopic<String, Click> inputTopic = testDriver.createInputTopic(
"clicks",
Serdes.String().serializer(),
clickSerde.serializer()
);
// Envoyer avec timestamps spécifiques
inputTopic.pipeInput("user-1", new Click("user-1"), baseTime);
inputTopic.pipeInput("user-1", new Click("user-1"),
baseTime.plus(Duration.ofMinutes(1)));
inputTopic.pipeInput("user-1", new Click("user-1"),
baseTime.plus(Duration.ofMinutes(6))); // Nouvelle fenêtre
// Vérifier les agrégations par fenêtre
// ...
}
}
}
Déploiement en Production
Docker Compose
version: '3.8'
services:
streams-app:
build: .
image: my-streams-app:1.0.0
deploy:
replicas: 3 # Pour parallélisme sur 3 partitions
resources:
limits:
memory: 2G
cpus: '1'
reservations:
memory: 1G
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
STREAMS_APPLICATION_ID: order-processor
STREAMS_NUM_THREADS: 2
STREAMS_STATE_DIR: /tmp/kafka-streams
JAVA_OPTS: >
-Xmx1536m
-Xms1536m
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
volumes:
- streams-state:/tmp/kafka-streams
networks:
- kafka-network
volumes:
streams-state:
networks:
kafka-network:
external: true
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-streams-app
labels:
app: kafka-streams-app
spec:
replicas: 3 # Matche le nombre de partitions
selector:
matchLabels:
app: kafka-streams-app
template:
metadata:
labels:
app: kafka-streams-app
spec:
containers:
- name: streams-app
image: my-streams-app:1.0.0
ports:
- containerPort: 8080 # REST API pour interactive queries
- containerPort: 9999 # JMX metrics
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
- name: STREAMS_APPLICATION_ID
value: "order-processor"
- name: STREAMS_NUM_THREADS
value: "2"
- name: JAVA_OPTS
value: "-Xmx1536m -Xms1536m -XX:+UseG1GC"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeMounts:
- name: state-store
mountPath: /tmp/kafka-streams
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 30
periodSeconds: 5
volumes:
- name: state-store
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: kafka-streams-service
spec:
selector:
app: kafka-streams-app
ports:
- name: http
port: 8080
targetPort: 8080
- name: jmx
port: 9999
targetPort: 9999
type: ClusterIP
Health Checks et Graceful Shutdown
import org.apache.kafka.streams.KafkaStreams.State;
public class StreamsHealthCheck {
private final KafkaStreams streams;
public StreamsHealthCheck(KafkaStreams streams) {
this.streams = streams;
// Listener pour état
streams.setStateListener((newState, oldState) -> {
System.out.println("State transition: " + oldState + " -> " + newState);
if (newState == State.ERROR) {
// Alerter, redémarrer, etc.
System.err.println("Streams application in ERROR state!");
}
});
// Uncaught exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread.getName());
throwable.printStackTrace();
// Décider si on continue ou on arrête
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
}
// Health check endpoint
public boolean isHealthy() {
State state = streams.state();
return state == State.RUNNING || state == State.REBALANCING;
}
// Readiness check
public boolean isReady() {
return streams.state() == State.RUNNING;
}
// Graceful shutdown
public void shutdown() {
System.out.println("Shutting down Kafka Streams gracefully...");
streams.close(Duration.ofSeconds(30));
System.out.println("Shutdown complete.");
}
}
// Spring Boot actuator
@Component
public class StreamsHealthIndicator implements HealthIndicator {
@Autowired
private StreamsHealthCheck healthCheck;
@Override
public Health health() {
return healthCheck.isHealthy()
? Health.up().build()
: Health.down().build();
}
}
Monitoring et Métriques
Métriques JMX Essentielles
// Métriques à surveiller :
// 1. Thread-level metrics
kafka.streams:type=stream-thread-metrics,thread-id=*
- commit-latency-avg/max
- poll-latency-avg/max
- process-latency-avg/max
- task-created-rate
- task-closed-rate
// 2. Task-level metrics
kafka.streams:type=stream-task-metrics,thread-id=*,task-id=*
- process-latency-avg/max
- commit-latency-avg/max
- dropped-records-rate
// 3. State store metrics
kafka.streams:type=state-store-metrics,rocksdb-state-id=*
- put-latency-avg/max
- get-latency-avg/max
- flush-latency-avg/max
- bytes-written-rate
- bytes-read-rate
// 4. RocksDB metrics
kafka.streams:type=rocksdb-metrics,rocksdb-state-id=*
- number-open-files
- num-immutable-mem-table
- size-all-mem-tables
- background-errors
Logging et Debugging
# log4j2.xml
log4j.logger.org.apache.kafka.streams=DEBUG
log4j.logger.org.apache.kafka.streams.processor=TRACE
log4j.logger.org.apache.kafka.streams.state=DEBUG
# Activer les métriques détaillées
metrics.recording.level=DEBUG
Bonnes Pratiques
1. Sérialisation
// Utiliser Avro pour schéma evolution
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
GenericAvroSerde.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Ou JSON avec Jackson
public class JsonSerde<T> implements Serde<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Class<T> targetType;
public JsonSerde(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public Serializer<T> serializer() {
return (topic, data) -> {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON", e);
}
};
}
@Override
public Deserializer<T> deserializer() {
return (topic, data) -> {
try {
return objectMapper.readValue(data, targetType);
} catch (Exception e) {
throw new SerializationException("Error deserializing JSON", e);
}
};
}
}
2. Error Handling
// Deserialization exception handler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Production exception handler
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class);
// Custom error handling
KStream<String, Order> validOrders = orders
.flatMap((key, value) -> {
try {
Order order = parseOrder(value);
return Collections.singletonList(KeyValue.pair(key, order));
} catch (Exception e) {
// Log error, send to DLQ
sendToDeadLetterQueue(key, value, e);
return Collections.emptyList();
}
});
3. Exactly-Once Semantics
// Activer exactly-once (EOS)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Requis pour EOS :
// - Kafka >= 3.0 (pour EOS v2)
// - Topics avec replication.factor >= 3
// - min.insync.replicas >= 2
4. State Store Tuning
// Custom RocksDB configuration
public class CustomRocksDBConfigSetter implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options,
Map<String, Object> configs) {
// Block cache (importante pour performances read)
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCache(new LRUCache(16 * 1024 * 1024)); // 16MB
tableConfig.setFilterPolicy(new BloomFilter(10));
options.setTableFormatConfig(tableConfig);
// Write buffer
options.setWriteBufferSize(16 * 1024 * 1024); // 16MB
options.setMaxWriteBufferNumber(3);
// Compaction
options.setCompactionStyle(CompactionStyle.UNIVERSAL);
options.setMaxBackgroundCompactions(2);
// Compression
options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
}
}
Conclusion
Kafka Streams offre une solution puissante et élégante pour le stream processing :
Points Forts :
- Simplicité opérationnelle (pas de cluster séparé)
- Intégration native avec l'écosystème Kafka
- Exactly-once semantics
- State management robuste
- API riche et expressive
Cas d'Usage Idéaux :
- Enrichissement de données en temps réel
- Agrégations et analytics streaming
- Détection d'anomalies et alerting
- ETL en temps réel
- Event-driven microservices
Limitations :
- Limité à Kafka comme source/sink
- JVM uniquement (Java/Scala/Kotlin)
- State stores limités par le disque local
Kafka Streams est le choix parfait pour construire des pipelines de traitement de flux robustes, scalables et production-ready, particulièrement dans des architectures déjà basées sur Kafka.
Cet article fait partie d'une série complète sur Apache Kafka. Consultez mes autres guides sur Kafka Connect, Schema Registry, et les patterns de déploiement en production.