Kafka Connect : Intégration de Données Sans Code avec Kafka
Kafka Connect est un framework pour connecter Apache Kafka à des systèmes externes (bases de données, files systems, cloud storage, APIs) sans écrire de code. Il permet de créer des pipelines de données scalables et fault-tolerant pour ingérer et exporter des données depuis/vers Kafka.
Pourquoi Kafka Connect ?
Alternative: Producer/Consumer Custom
// ❌ Approche manuelle : beaucoup de code boilerplate
public class DatabaseToKafkaProducer {
public void run() {
while (true) {
// 1. Se connecter à la DB
Connection conn = getConnection();
// 2. Query les nouvelles données
ResultSet rs = conn.executeQuery("SELECT ...");
// 3. Transformer en format Kafka
while (rs.next()) {
ProducerRecord<String, String> record = transform(rs);
producer.send(record);
}
// 4. Gérer les erreurs, retry, offset tracking, etc.
Thread.sleep(10000);
}
}
}
✅ Avec Kafka Connect : Configuration Déclarative
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "mydb",
"table.include.list": "public.orders",
"topic.prefix": "db"
}
}
Avantages de Kafka Connect
| Fonctionnalité | Kafka Connect | Custom Code | |----------------|--------------|-------------| | Développement | Configuration JSON | Code complet | | Scalabilité | Automatique (workers) | Manuel | | Fault tolerance | Intégré | À implémenter | | Offset management | Automatique | Manuel | | Dead Letter Queue | Intégré | À coder | | Schema management | Intégré | À coder | | Monitoring | Métriques JMX | Custom metrics | | Backpressure | Géré | À gérer |
Architecture de Kafka Connect
Composants Principaux
┌─────────────────────────────────────────────────────────────┐
│ KAFKA CONNECT CLUSTER │
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌───────────────┐│
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 ││
│ │ │ │ │ │ ││
│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐││
│ │ │Connector1│ │ │ │Connector2│ │ │ │Connector3│││
│ │ │ (Task) │ │ │ │ (Task) │ │ │ │ (Task) │││
│ │ └──────────┘ │ │ └──────────┘ │ │ └──────────┘││
│ └────────┬───────┘ └────────┬───────┘ └────────┬──────┘│
│ │ │ │ │
└───────────┼───────────────────┼────────────────────┼────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ (config, offset, status topics) │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌───────────────────┐
│ PostgreSQL │ │ MongoDB │ │ S3 / HDFS │
│ (Source) │ │ (Source) │ │ (Sink) │
└────────────────┘ └────────────────┘ └───────────────────┘
Source vs Sink Connectors
SOURCE CONNECTOR
┌────────────┐ ┌────────────┐ ┌────────────┐
│ External │ ──────▶ │ Kafka │ ──────▶ │ Kafka │
│ System │ │ Connect │ │ Topic │
│ (DB, API) │ │ (Source) │ │ │
└────────────┘ └────────────┘ └────────────┘
SINK CONNECTOR
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Kafka │ ──────▶ │ Kafka │ ──────▶ │ External │
│ Topic │ │ Connect │ │ System │
│ │ │ (Sink) │ │ (DB, S3) │
└────────────┘ └────────────┘ └────────────┘
Installation et Configuration
Docker Compose Setup
version: '3.8'
services:
# Kafka Connect en mode distribué
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.0
container_name: kafka-connect
depends_on:
- kafka
ports:
- "8083:8083"
environment:
# Connect configuration
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
# Cluster group ID (pour mode distribué)
CONNECT_GROUP_ID: kafka-connect-cluster
# Topics internes
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
# Replication factors
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# Converters (format des données)
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true
# Internal converters
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# Plugin path
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
# Logging
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
# Heap size
KAFKA_HEAP_OPTS: "-Xms1G -Xmx1G"
volumes:
# Installer des connecteurs additionnels
- ./connectors:/usr/share/confluent-hub-components
command:
- bash
- -c
- |
# Installer des connecteurs via Confluent Hub
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
#
# Démarrer Connect
/etc/confluent/docker/run
# Kafka Connect UI (optionnel)
kafka-connect-ui:
image: landoop/kafka-connect-ui:latest
container_name: kafka-connect-ui
ports:
- "8000:8000"
environment:
CONNECT_URL: "http://kafka-connect:8083"
Configuration Standalone (Dev/Test)
# connect-standalone.properties
bootstrap.servers=localhost:9092
# Converters
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
# Offset storage (file-based pour standalone)
offset.storage.file.filename=/tmp/connect.offsets
# Plugin path
plugin.path=/usr/local/share/kafka/plugins
# Démarrer en mode standalone
connect-standalone connect-standalone.properties \
connector-postgres-source.properties \
connector-s3-sink.properties
REST API : Gérer les Connecteurs
Lister les Connecteurs
# Lister tous les connecteurs
curl -X GET http://localhost:8083/connectors
# Voir les plugins disponibles
curl -X GET http://localhost:8083/connector-plugins | jq .
# Statut d'un connecteur
curl -X GET http://localhost:8083/connectors/my-connector/status | jq .
Créer un Connecteur
# POST: créer un nouveau connecteur
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"database.server.name": "dbserver1",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput"
}
}'
Mettre à Jour et Supprimer
# PUT: mettre à jour la configuration
curl -X PUT http://localhost:8083/connectors/postgres-source/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "2",
...
}'
# Pause un connecteur
curl -X PUT http://localhost:8083/connectors/postgres-source/pause
# Resume un connecteur
curl -X PUT http://localhost:8083/connectors/postgres-source/resume
# Restart un connecteur
curl -X POST http://localhost:8083/connectors/postgres-source/restart
# Restart une task spécifique
curl -X POST http://localhost:8083/connectors/postgres-source/tasks/0/restart
# Supprimer un connecteur
curl -X DELETE http://localhost:8083/connectors/postgres-source
Connecteurs Source Populaires
1. Debezium PostgreSQL Source (CDC)
Change Data Capture (CDC) permet de capturer les changements de base de données en temps réel.
{
"name": "postgres-cdc-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
// PostgreSQL connection
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replpassword",
"database.dbname": "production",
"database.server.name": "prod-db",
// Réplication slot (requis pour PostgreSQL CDC)
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
// Tables à capturer
"table.include.list": "public.orders,public.payments,public.users",
"schema.include.list": "public",
// Topic routing
"topic.prefix": "db",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
// Snapshot mode
"snapshot.mode": "initial", // initial, never, always
// Transformations
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
// Heartbeat (pour avancer le LSN même sans changements)
"heartbeat.interval.ms": "10000",
"heartbeat.topics.prefix": "__debezium-heartbeat",
// Schema history (pour DDL changes)
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.production"
}
}
Setup PostgreSQL pour CDC:
-- 1. Créer un utilisateur de réplication
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'replpassword';
-- 2. Créer une publication (logical replication)
CREATE PUBLICATION debezium_publication FOR TABLE orders, payments, users;
-- 3. Configurer postgresql.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
-- 4. Permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;
GRANT USAGE ON SCHEMA public TO replicator;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replicator;
-- Vérifier les slots de réplication
SELECT * FROM pg_replication_slots;
Format des Messages CDC:
{
"schema": {...},
"payload": {
"before": null, // État avant (null pour INSERT)
"after": { // État après
"id": 123,
"customer_id": "cust-456",
"amount": 99.99,
"status": "PENDING",
"created_at": 1700000000000
},
"source": {
"version": "2.4.0.Final",
"connector": "postgresql",
"name": "prod-db",
"ts_ms": 1700000000000,
"db": "production",
"schema": "public",
"table": "orders",
"lsn": 123456789,
"txId": 987654
},
"op": "c", // Operation: c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1700000000000
}
}
2. JDBC Source Connector
Connecteur générique pour n'importe quelle base SQL.
{
"name": "jdbc-mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
// JDBC connection
"connection.url": "jdbc:mysql://mysql:3306/mydb?user=kafkaconnect&password=secret",
"connection.user": "kafkaconnect",
"connection.password": "secret",
// Mode: bulk, timestamp, incrementing, timestamp+incrementing
"mode": "incrementing",
"incrementing.column.name": "id",
// Ou mode timestamp
// "mode": "timestamp",
// "timestamp.column.name": "updated_at",
// Query mode
"query": "SELECT * FROM orders WHERE region = 'EU'",
// Ou table whitelist
"table.whitelist": "orders,customers",
// Topic prefix
"topic.prefix": "mysql-",
// Polling interval
"poll.interval.ms": "5000",
// Batch size
"batch.max.rows": "1000",
// Numeric mapping
"numeric.mapping": "best_fit"
}
}
3. MongoDB Source Connector
{
"name": "mongodb-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
// MongoDB connection
"connection.uri": "mongodb://user:password@mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0",
"database": "mydb",
"collection": "orders",
// Topic configuration
"topic.prefix": "mongo",
// Change stream options
"pipeline": "[{\"$match\": {\"operationType\": {\"$in\": [\"insert\", \"update\", \"replace\"]}}}]",
// Output format
"output.format.key": "json",
"output.format.value": "json",
"output.schema.infer.value": true
}
}
Connecteurs Sink Populaires
1. JDBC Sink Connector
Écrire dans n'importe quelle base SQL.
{
"name": "jdbc-postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
// Topics à consommer
"topics": "db.public.orders",
// JDBC connection
"connection.url": "jdbc:postgresql://postgres:5432/analytics",
"connection.user": "postgres",
"connection.password": "postgres",
// Insert mode
"insert.mode": "upsert", // insert, update, upsert
"pk.mode": "record_key", // none, kafka, record_key, record_value
"pk.fields": "id",
// Delete enabled
"delete.enabled": "true",
// Auto-create table
"auto.create": "true",
"auto.evolve": "true",
// Batch size
"batch.size": "3000",
// Table name format
"table.name.format": "${topic}"
}
}
2. S3 Sink Connector
Exporter vers S3 avec partitioning.
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
// Topics
"topics": "orders,payments",
// S3 configuration
"s3.bucket.name": "my-data-lake",
"s3.region": "eu-west-1",
"s3.part.size": "5242880", // 5MB
// AWS credentials (ou IAM role)
"aws.access.key.id": "AKIAIOSFODNN7EXAMPLE",
"aws.secret.access.key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
// Format (avro, json, parquet)
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
// Schema registry (pour Avro/Parquet)
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
// Partitioning
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"partition.duration.ms": "3600000", // 1 heure
"timestamp.extractor": "Record",
// File rotation
"flush.size": "10000",
"rotate.interval.ms": "60000",
"rotate.schedule.interval.ms": "3600000",
// Compression
"parquet.codec": "snappy"
}
}
3. Elasticsearch Sink Connector
Indexer dans Elasticsearch pour analytics et search.
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
// Topics
"topics": "db.public.products,db.public.orders",
// Elasticsearch connection
"connection.url": "http://elasticsearch:9200",
"connection.username": "elastic",
"connection.password": "changeme",
// Index naming
"type.name": "_doc",
"key.ignore": "false",
// Schema ignore (pour JSON dynamique)
"schema.ignore": "true",
// Batch size
"batch.size": "2000",
"max.in.flight.requests": "5",
"linger.ms": "1000",
// Retry
"max.retries": "10",
"retry.backoff.ms": "100",
// Behavior on error
"behavior.on.malformed.documents": "warn",
"behavior.on.null.values": "ignore",
// Index transformation
"transforms": "routeTS",
"transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format": "${topic}-${timestamp}",
"transforms.routeTS.timestamp.format": "yyyy-MM-dd"
}
}
Single Message Transforms (SMT)
Les SMTs permettent de transformer les messages à la volée.
SMTs Utiles
{
"name": "connector-with-transforms",
"config": {
"connector.class": "...",
// Chain de transformations
"transforms": "unwrap,addMetadata,route,maskPII",
// 1. Unwrap CDC envelope
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
// 2. Ajouter des métadonnées
"transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addMetadata.static.field": "source",
"transforms.addMetadata.static.value": "postgres-production",
"transforms.addMetadata.timestamp.field": "ingested_at",
// 3. Router vers différents topics
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "db.public.(.*)",
"transforms.route.replacement": "analytics.$1",
// 4. Masquer les données sensibles
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "email,phone,ssn",
"transforms.maskPII.replacement": "***REDACTED***"
}
}
SMTs Couramment Utilisés
// Timestamp Router: partitionner par date
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "yyyyMMdd"
// Flatten: aplatir les structures imbriquées
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
// Cast: changer le type des champs
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast.spec": "age:int32,salary:float64"
// ReplaceField: renommer/supprimer des champs
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.renames": "old_name:new_name",
"transforms.ReplaceField.blacklist": "sensitive_field,temp_field"
// Filter: filtrer les messages
"transforms.filter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filter.filter.condition": "$.status == 'ACTIVE'",
"transforms.filter.filter.type": "include"
Gestion des Erreurs et Dead Letter Queue
Configuration Error Handling
{
"name": "resilient-connector",
"config": {
"connector.class": "...",
// Error tolerance
"errors.tolerance": "all", // none, all
"errors.log.enable": "true",
"errors.log.include.messages": "true",
// Dead Letter Queue
"errors.deadletterqueue.topic.name": "connect-dlq",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
// Retry
"errors.retry.timeout": "300000", // 5 minutes
"errors.retry.delay.max.ms": "60000" // Max 60s entre retries
}
}
Consommer les Messages du DLQ
# Voir les messages en erreur
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic connect-dlq \
--from-beginning \
--property print.headers=true \
--property print.key=true
# Headers disponibles dans DLQ:
# __connect.errors.topic
# __connect.errors.partition
# __connect.errors.offset
# __connect.errors.connector.name
# __connect.errors.task.id
# __connect.errors.stage
# __connect.errors.class.name
# __connect.errors.exception.class.name
# __connect.errors.exception.message
# __connect.errors.exception.stacktrace
Monitoring et Métriques
Métriques JMX Essentielles
# Connecteur-level metrics
kafka.connect:type=connector-metrics,connector=*
- connector-destroyed-task-count
- connector-failed-task-count
- connector-paused-task-count
- connector-running-task-count
- connector-total-task-count
- connector-unassigned-task-count
# Task-level metrics
kafka.connect:type=task-metrics,connector=*,task=*
- batch-size-avg
- batch-size-max
- offset-commit-avg-time-ms
- offset-commit-max-time-ms
- offset-commit-failure-percentage
- offset-commit-success-percentage
# Source task metrics
kafka.connect:type=source-task-metrics,connector=*,task=*
- source-record-poll-total
- source-record-write-total
- source-record-poll-rate
- source-record-write-rate
- source-record-active-count
# Sink task metrics
kafka.connect:type=sink-task-metrics,connector=*,task=*
- sink-record-read-total
- sink-record-send-total
- sink-record-active-count
- sink-record-read-rate
- sink-record-send-rate
- partition-count
- put-batch-avg-time-ms
- put-batch-max-time-ms
# Connect worker metrics
kafka.connect:type=connect-worker-metrics
- connector-count
- connector-startup-attempts-total
- connector-startup-failure-total
- connector-startup-success-total
- task-count
- task-startup-attempts-total
- task-startup-failure-total
- task-startup-success-total
Prometheus Exporter
# prometheus-jmx-config.yml pour Kafka Connect
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
- pattern: 'kafka.connect<type=(.+), connector=(.+), task=(.+)><>(.+): (.+)'
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
value: $5
type: GAUGE
- pattern: 'kafka.connect<type=(.+)><>(.+): (.+)'
name: kafka_connect_$1_$2
value: $3
type: GAUGE
Alertes Prometheus
groups:
- name: kafka-connect-alerts
rules:
- alert: ConnectTaskFailed
expr: kafka_connect_connector_metrics_connector_failed_task_count > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Connect task failed"
description: "Connector {{ $labels.connector }} has {{ $value }} failed tasks"
- alert: ConnectHighOffsetCommitLatency
expr: kafka_connect_task_metrics_offset_commit_avg_time_ms > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High offset commit latency"
description: "Task {{ $labels.connector }}/{{ $labels.task }} avg commit time: {{ $value }}ms"
- alert: ConnectNoRecordsProcessed
expr: rate(kafka_connect_source_task_metrics_source_record_poll_total[5m]) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "No records processed by source connector"
description: "Connector {{ $labels.connector }} hasn't processed records in 10min"
Bonnes Pratiques Production
1. Scalabilité et Performance
{
"config": {
// Augmenter le parallélisme
"tasks.max": "10",
// Optimiser le batching (sink)
"batch.size": "3000",
"linger.ms": "100",
"max.in.flight.requests": "5",
// Consumer config (pour sink)
"consumer.max.poll.records": "500",
"consumer.max.poll.interval.ms": "300000",
"consumer.session.timeout.ms": "30000",
// Producer config (pour source)
"producer.compression.type": "snappy",
"producer.batch.size": "16384",
"producer.linger.ms": "10",
"producer.buffer.memory": "33554432"
}
}
2. Sécurité
{
"config": {
// SSL/TLS pour Kafka
"producer.security.protocol": "SSL",
"producer.ssl.truststore.location": "/etc/kafka/secrets/truststore.jks",
"producer.ssl.truststore.password": "secret",
"producer.ssl.keystore.location": "/etc/kafka/secrets/keystore.jks",
"producer.ssl.keystore.password": "secret",
"producer.ssl.key.password": "secret",
// SASL authentication
"producer.security.protocol": "SASL_SSL",
"producer.sasl.mechanism": "PLAIN",
"producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
// Consumer SSL
"consumer.security.protocol": "SSL",
"consumer.ssl.truststore.location": "/etc/kafka/secrets/truststore.jks"
}
}
3. Idempotence et Exactly-Once
{
"config": {
// Producer idempotence (pour source connectors)
"producer.enable.idempotence": "true",
"producer.acks": "all",
"producer.max.in.flight.requests.per.connection": "5",
// Consumer exactly-once (pour sink connectors)
"consumer.isolation.level": "read_committed"
}
}
4. Schema Registry Integration
{
"config": {
// Avro avec Schema Registry
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.auto.register.schemas": "true",
"value.converter.use.latest.version": "true",
// Schema evolution
"value.converter.schema.compatibility": "BACKWARD"
}
}
Troubleshooting
Problèmes Courants
1. Connecteur bloqué en RUNNING mais ne traite pas de données
# Vérifier les logs
docker logs kafka-connect | grep ERROR
# Vérifier les tasks
curl http://localhost:8083/connectors/my-connector/status | jq .
# Restart le connecteur
curl -X POST http://localhost:8083/connectors/my-connector/restart
# Restart une task spécifique
curl -X POST http://localhost:8083/connectors/my-connector/tasks/0/restart
2. Offset lag élevé sur Sink Connector
# Augmenter le parallélisme
curl -X PUT http://localhost:8083/connectors/my-sink/config \
-H "Content-Type: application/json" \
-d '{"tasks.max": "5", "batch.size": "5000"}'
# Vérifier les consumer metrics
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group connect-my-sink
3. Schema Evolution Errors
# Vérifier la compatibilité du schema
curl http://schema-registry:8081/subjects/my-topic-value/versions
# Changer la stratégie de compatibilité
curl -X PUT http://schema-registry:8081/config/my-topic-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
Conclusion
Kafka Connect est un outil puissant pour l'intégration de données avec Kafka :
Points Forts :
- Configuration déclarative (pas de code)
- Scalabilité et fault tolerance intégrées
- Écosystème riche de connecteurs
- Monitoring et error handling robustes
Cas d'Usage Idéaux :
- Change Data Capture (CDC) depuis bases de données
- Data Lake ingestion (vers S3, HDFS)
- Search indexing (Elasticsearch)
- Analytics pipelines (Snowflake, BigQuery)
- Real-time data replication
Best Practices :
- Mode distribué pour production
- Monitoring avec Prometheus/Grafana
- Dead Letter Queue pour error handling
- Schema Registry pour évolution des données
- Testing avec mode standalone d'abord
Kafka Connect simplifie radicalement la construction de pipelines de données robustes et scalables, en permettant de se concentrer sur la logique métier plutôt que sur le plumbing d'intégration.
Cet article fait partie d'une série complète sur Apache Kafka. Consultez mes autres guides sur Kafka Streams, Schema Registry, et les patterns de déploiement en production.