Kafka Schema Registry : Gestion des Schémas et Evolution
Introduction
Dans un écosystème Apache Kafka en production, l'évolution des données est inévitable. Les applications changent, les modèles de données évoluent, et les formats de messages doivent s'adapter. Kafka Schema Registry est la solution de référence pour gérer cette complexité en offrant un registre centralisé de schémas de données avec un contrôle strict de compatibilité.
Dans ce guide approfondi, nous explorerons Schema Registry en détail : son architecture, les différents formats de sérialisation, les modes de compatibilité, l'intégration avec les producteurs et consommateurs, et les bonnes pratiques pour une utilisation en production.
Pourquoi Schema Registry ?
Les Problèmes sans Schema Registry
Sans gestion centralisée des schémas, plusieurs problèmes apparaissent rapidement :
1. Incompatibilité de Versions
Producer v1 → {"name": "John", "age": 30}
Producer v2 → {"name": "John", "age": 30, "email": "john@example.com"}
Consumer v1 → Impossible de lire les nouveaux champs
Consumer v2 → Incompatible avec les anciens messages
2. Duplication des Schémas
- Chaque application maintient sa propre copie du schéma
- Divergence progressive entre les versions
- Difficultés de synchronisation
3. Absence de Validation
- Messages invalides publiés dans les topics
- Découverte tardive des erreurs (côté consommateur)
- Corruption potentielle du pipeline de données
4. Overhead de Stockage
- Schémas embarqués dans chaque message
- Gaspillage de bande passante et d'espace disque
Les Avantages de Schema Registry
Schema Registry résout ces problèmes en offrant :
Registre Centralisé
Schema Registry (Single Source of Truth)
↓
Producers ← → Consumers
(Référencent le même schéma via ID)
Contrôle de Compatibilité
- Validation automatique lors de l'enregistrement
- Modes de compatibilité configurables (BACKWARD, FORWARD, FULL)
- Prévention des changements cassants
Optimisation du Stockage
Sans Schema Registry:
Message = [Schéma complet] + [Données]
Taille: ~2KB par message
Avec Schema Registry:
Message = [Schema ID (4 bytes)] + [Données]
Taille: ~500 bytes par message
Économie: 75%
Gouvernance des Données
- Historique complet des versions de schémas
- Audit trail pour la conformité
- Documentation auto-générée
Architecture de Schema Registry
Composants Principaux
┌─────────────────────────────────────────────────────────┐
│ Schema Registry Cluster │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Instance │ │ Instance │ │ Instance │ │
│ │ #1 │ │ #2 │ │ #3 │ │
│ │ (Leader) │ │ (Follower) │ │ (Follower) │ │
│ └─────┬──────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ └─────────────────┼────────────────┘ │
│ ↓ │
│ ┌───────────────────────┐ │
│ │ Kafka Topic: │ │
│ │ _schemas │ │
│ │ (Stockage durable) │ │
│ └───────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↑
┌───────────────┼───────────────┐
↓ ↓
┌─────────────┐ ┌─────────────┐
│ Producers │ │ Consumers │
│ │ │ │
│ - Fetch │ │ - Fetch │
│ Schema │ │ Schema │
│ - Validate │ │ - Validate │
│ - Serialize │ │ - Deserial. │
└─────────────┘ └─────────────┘
Fonctionnement Interne
1. Enregistrement d'un Schéma
Producer
↓ POST /subjects/user-value/versions
Schema Registry
↓ Vérifie compatibilité
↓ Assigne ID unique (ex: 42)
↓ Stocke dans topic _schemas
Kafka (_schemas topic)
2. Sérialisation d'un Message
Producer
↓ 1. Récupère Schema ID (cache local)
↓ 2. Sérialise données selon schéma
↓ 3. Ajoute magic byte + Schema ID
↓
Message: [0x00][0x00][0x00][0x00][0x2A][données binaires]
└─┬─┘└────────┬────────┘└────────┬──────────┘
Magic Schema ID (42) Payload Avro
3. Désérialisation d'un Message
Kafka Topic
↓
Consumer
↓ 1. Lit magic byte + Schema ID
↓ 2. Récupère schéma depuis Registry (cache)
↓ 3. Désérialise avec le bon schéma
↓
Objet Java/Python
Formats de Sérialisation
Schema Registry supporte plusieurs formats :
1. Avro (Le plus populaire)
Caractéristiques
- Format binaire compact
- Schéma séparé des données
- Support natif de l'évolution de schéma
- Riche écosystème d'outils
Exemple de Schéma Avro
{
"type": "record",
"name": "User",
"namespace": "com.floriancourouge.model",
"fields": [
{
"name": "id",
"type": "long",
"doc": "Identifiant unique de l'utilisateur"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "age",
"type": ["null", "int"],
"default": null,
"doc": "Age optionnel"
},
{
"name": "preferences",
"type": {
"type": "record",
"name": "UserPreferences",
"fields": [
{"name": "language", "type": "string", "default": "en"},
{"name": "notifications", "type": "boolean", "default": true}
]
},
"default": {}
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
Types Avro Supportés
Primitifs:
- null, boolean, int, long, float, double
- bytes, string
Complexes:
- record (structure)
- enum (énumération)
- array (tableau)
- map (dictionnaire)
- union (type composite, ex: ["null", "string"])
- fixed (bytes de taille fixe)
Logiques:
- decimal
- date
- time-millis, time-micros
- timestamp-millis, timestamp-micros
- uuid
Producer Java avec Avro
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class AvroProducerExample {
public static void main(String[] args) {
// Configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
// Création du producer
KafkaProducer<String, GenericRecord> producer =
new KafkaProducer<>(props);
// Définition du schéma
String schemaString = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"}
]
}
""";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
// Création d'un enregistrement
GenericRecord user = new GenericData.Record(schema);
user.put("id", 123L);
user.put("username", "florian");
user.put("email", "florian@example.com");
// Envoi du message
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("users", "user-123", user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message envoyé: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
}
Consumer Java avec Avro
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "false"); // GenericRecord
KafkaConsumer<String, GenericRecord> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.printf("User reçu: id=%d, username=%s, email=%s%n",
user.get("id"),
user.get("username"),
user.get("email"));
}
}
} finally {
consumer.close();
}
}
}
2. JSON Schema
Caractéristiques
- Lisible par l'humain
- Compatible avec REST APIs
- Moins compact qu'Avro
- Schéma plus verbeux
Exemple JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"id": {
"type": "integer",
"description": "Identifiant unique"
},
"username": {
"type": "string",
"minLength": 3,
"maxLength": 50
},
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": ["integer", "null"],
"minimum": 0,
"maximum": 150
},
"roles": {
"type": "array",
"items": {
"type": "string",
"enum": ["USER", "ADMIN", "MODERATOR"]
}
}
},
"required": ["id", "username", "email"],
"additionalProperties": false
}
Producer Python avec JSON Schema
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.serialization import (
SerializationContext,
MessageField
)
import json
# Schéma JSON
user_schema = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"username": {"type": "string"},
"email": {"type": "string", "format": "email"}
},
"required": ["id", "username", "email"]
}
"""
# Configuration Schema Registry
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Serializer
json_serializer = JSONSerializer(
user_schema,
schema_registry_client,
to_dict=lambda user, ctx: user
)
# Configuration Producer
producer_conf = {
'bootstrap.servers': 'localhost:9092'
}
producer = Producer(producer_conf)
# Données utilisateur
user = {
'id': 123,
'username': 'florian',
'email': 'florian@example.com'
}
# Sérialisation et envoi
producer.produce(
topic='users',
key='user-123',
value=json_serializer(
user,
SerializationContext('users', MessageField.VALUE)
),
on_delivery=lambda err, msg: print(
f'Livré: {msg.topic()}[{msg.partition()}]' if not err
else f'Erreur: {err}'
)
)
producer.flush()
3. Protobuf
Caractéristiques
- Format binaire très performant
- Largement utilisé chez Google
- Support natif dans nombreux langages
- Évolution de schéma native
Exemple Protobuf Schema
syntax = "proto3";
package com.floriancourouge.model;
message User {
int64 id = 1;
string username = 2;
string email = 3;
optional int32 age = 4;
message Preferences {
string language = 1;
bool notifications = 2;
}
Preferences preferences = 5;
int64 created_at = 6;
}
enum UserRole {
USER = 0;
ADMIN = 1;
MODERATOR = 2;
}
Comparaison des Formats
| Critère | Avro | JSON Schema | Protobuf | |---------|------|-------------|----------| | Taille binaire | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐ Moyen | ⭐⭐⭐⭐⭐ Excellent | | Lisibilité | ⭐⭐⭐ Bon (JSON) | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐ Moyen (.proto) | | Performance | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐ Bon | ⭐⭐⭐⭐⭐ Excellent | | Evolution | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐ Bon | ⭐⭐⭐⭐ Très bon | | Écosystème Kafka | ⭐⭐⭐⭐⭐ Standard | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐⭐ Très bon | | Support langages | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent |
Modes de Compatibilité
Schema Registry impose des règles de compatibilité pour garantir l'évolution saine des schémas.
Types de Compatibilité
1. BACKWARD (Par défaut)
Les consommateurs utilisant le nouveau schéma peuvent lire les données produites avec l'ancien schéma.
Scénario:
- Déploiement: Consommateurs PUIS Producteurs
- Cas d'usage: Lecture de données historiques
Évolutions autorisées:
✅ Ajout de champs avec valeur par défaut
✅ Suppression de champs
Évolutions interdites:
❌ Ajout de champs obligatoires
❌ Changement de type
Exemple BACKWARD
// Schéma v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"}
]
}
// Schéma v2 (BACKWARD compatible)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": ["null", "string"], "default": null} // ✅ OK
]
}
// Schéma v2 (NON compatible)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": "string"} // ❌ Champ obligatoire
]
}
2. FORWARD
Les consommateurs utilisant l'ancien schéma peuvent lire les données produites avec le nouveau schéma.
Scénario:
- Déploiement: Producteurs PUIS Consommateurs
- Cas d'usage: Ajout progressif de fonctionnalités
Évolutions autorisées:
✅ Ajout de champs optionnels
✅ Suppression de champs avec valeur par défaut
Évolutions interdites:
❌ Suppression de champs obligatoires
❌ Changement de type
Exemple FORWARD
// Schéma v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": ["null", "string"], "default": null}
]
}
// Schéma v2 (FORWARD compatible)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"}
// Suppression de "phone" ✅ OK (avait valeur par défaut)
]
}
3. FULL (BACKWARD + FORWARD)
Compatibilité totale dans les deux sens.
Évolutions autorisées:
✅ Ajout de champs optionnels avec valeur par défaut
✅ Suppression de champs optionnels avec valeur par défaut
Évolutions interdites:
❌ Tout changement cassant
4. NONE
Aucune vérification de compatibilité (déconseillé en production).
5. BACKWARD_TRANSITIVE / FORWARD_TRANSITIVE / FULL_TRANSITIVE
Vérifie la compatibilité avec toutes les versions précédentes (pas seulement la dernière).
Configuration du Mode de Compatibilité
Au niveau global
curl -X PUT -H "Content-Type: application/json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config
Au niveau d'un subject
curl -X PUT -H "Content-Type: application/json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/users-value
Vérification de compatibilité manuelle
# Test de compatibilité avant enregistrement
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/compatibility/subjects/users-value/versions/latest
# Réponse
{"is_compatible": true}
API REST de Schema Registry
Endpoints Principaux
1. Enregistrement d'un Schéma
POST /subjects/{subject}/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"username\",\"type\":\"string\"}]}"
}' \
http://localhost:8081/subjects/users-value/versions
# Réponse
{"id": 42}
2. Récupération d'un Schéma par ID
GET /schemas/ids/{id}
curl http://localhost:8081/schemas/ids/42
# Réponse
{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[...]}"
}
3. Liste des Versions d'un Subject
GET /subjects/{subject}/versions
curl http://localhost:8081/subjects/users-value/versions
# Réponse
[1, 2, 3, 4]
4. Récupération d'une Version Spécifique
GET /subjects/{subject}/versions/{version}
curl http://localhost:8081/subjects/users-value/versions/latest
# Réponse
{
"subject": "users-value",
"version": 4,
"id": 42,
"schema": "{...}"
}
5. Suppression d'un Schéma
# Soft delete (peut être restauré)
DELETE /subjects/{subject}/versions/{version}
curl -X DELETE http://localhost:8081/subjects/users-value/versions/3
# Hard delete (permanent)
DELETE /subjects/{subject}/versions/{version}?permanent=true
curl -X DELETE http://localhost:8081/subjects/users-value/versions/3?permanent=true
6. Liste de tous les Subjects
GET /subjects
curl http://localhost:8081/subjects
# Réponse
["users-value", "orders-value", "payments-key"]
Client Python pour l'API
import requests
import json
class SchemaRegistryClient:
def __init__(self, url="http://localhost:8081"):
self.url = url
def register_schema(self, subject, schema):
"""Enregistre un nouveau schéma"""
endpoint = f"{self.url}/subjects/{subject}/versions"
payload = {"schema": json.dumps(schema)}
response = requests.post(
endpoint,
json=payload,
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}
)
response.raise_for_status()
return response.json()["id"]
def get_schema(self, schema_id):
"""Récupère un schéma par ID"""
endpoint = f"{self.url}/schemas/ids/{schema_id}"
response = requests.get(endpoint)
response.raise_for_status()
return json.loads(response.json()["schema"])
def get_latest_schema(self, subject):
"""Récupère la dernière version d'un schéma"""
endpoint = f"{self.url}/subjects/{subject}/versions/latest"
response = requests.get(endpoint)
response.raise_for_status()
return response.json()
def list_subjects(self):
"""Liste tous les subjects"""
endpoint = f"{self.url}/subjects"
response = requests.get(endpoint)
response.raise_for_status()
return response.json()
def check_compatibility(self, subject, schema):
"""Vérifie la compatibilité d'un schéma"""
endpoint = f"{self.url}/compatibility/subjects/{subject}/versions/latest"
payload = {"schema": json.dumps(schema)}
response = requests.post(
endpoint,
json=payload,
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}
)
response.raise_for_status()
return response.json()["is_compatible"]
# Utilisation
client = SchemaRegistryClient()
# Enregistrement
user_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"}
]
}
schema_id = client.register_schema("users-value", user_schema)
print(f"Schema ID: {schema_id}")
# Récupération
schema = client.get_schema(schema_id)
print(f"Schema: {schema}")
# Vérification de compatibilité
new_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": None}
]
}
is_compatible = client.check_compatibility("users-value", new_schema)
print(f"Compatible: {is_compatible}")
Déploiement en Production
Docker Compose Setup
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- kafka-data:/var/lib/kafka/data
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# Mode de compatibilité par défaut
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD
# Performance tuning
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS: 5000
# Sécurité
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin,developer
# Métriques JMX
SCHEMA_REGISTRY_JMX_PORT: 9999
SCHEMA_REGISTRY_JMX_HOSTNAME: schema-registry
volumes:
- schema-registry-data:/var/lib/schema-registry
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/"]
interval: 30s
timeout: 10s
retries: 5
# Interface web pour Schema Registry
schema-registry-ui:
image: landoop/schema-registry-ui:latest
depends_on:
- schema-registry
ports:
- "8000:8000"
environment:
SCHEMAREGISTRY_URL: http://schema-registry:8081
PROXY: "true"
volumes:
zookeeper-data:
zookeeper-logs:
kafka-data:
schema-registry-data:
Configuration Haute Disponibilité
Cluster Schema Registry (3 instances)
# docker-compose-ha.yaml
version: '3.8'
services:
schema-registry-1:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# Leader eligibility
SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
# Topic pour stockage avec réplication
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
# Configuration du groupe
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster
schema-registry-2:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8082:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-2
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster
schema-registry-3:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-3
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster
# Load balancer HAProxy
haproxy:
image: haproxy:2.8
ports:
- "8080:8080"
- "8090:8090"
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
depends_on:
- schema-registry-1
- schema-registry-2
- schema-registry-3
Configuration HAProxy
# haproxy.cfg
global
log stdout format raw local0
maxconn 4096
defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
frontend schema_registry_frontend
bind *:8080
default_backend schema_registry_backend
backend schema_registry_backend
balance roundrobin
option httpchk GET /
http-check expect status 200
server sr1 schema-registry-1:8081 check
server sr2 schema-registry-2:8081 check
server sr3 schema-registry-3:8081 check
frontend stats
bind *:8090
stats enable
stats uri /
stats refresh 10s
Monitoring et Métriques
Prometheus Configuration
# prometheus.yml
scrape_configs:
- job_name: 'schema-registry'
static_configs:
- targets:
- 'schema-registry-1:9999'
- 'schema-registry-2:9999'
- 'schema-registry-3:9999'
metrics_path: /
Métriques JMX Clés
# Nombre de schémas enregistrés
kafka_schema_registry_registered_count
# Requêtes par seconde
kafka_schema_registry_api_request_rate
# Latence des requêtes
kafka_schema_registry_api_request_latency_ms
# Cache hit ratio
kafka_schema_registry_cache_hit_ratio
# Nombre d'erreurs de compatibilité
kafka_schema_registry_compatibility_check_errors_total
Grafana Dashboard Query Examples
# Taux de requêtes API
rate(kafka_schema_registry_api_request_count[5m])
# Latence P99
histogram_quantile(0.99,
rate(kafka_schema_registry_api_request_latency_ms_bucket[5m])
)
# Erreurs de compatibilité
sum(rate(kafka_schema_registry_compatibility_check_errors_total[5m]))
by (compatibility_level)
# Taille du cache
kafka_schema_registry_cache_size
Bonnes Pratiques
1. Naming Conventions
Subject Naming Strategy
Confluent recommande trois stratégies :
TopicNameStrategy (Par défaut)
Subject: <topic>-key
Subject: <topic>-value
Exemple:
- users-key
- users-value
- orders-value
RecordNameStrategy
Subject: <fully.qualified.record.name>
Exemple:
- com.floriancourouge.model.User
- com.floriancourouge.model.Order
TopicRecordNameStrategy
Subject: <topic>-<fully.qualified.record.name>
Exemple:
- users-com.floriancourouge.model.User
- events-com.floriancourouge.model.UserEvent
Configuration Java
props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
2. Versioning et Evolution
Règles d'Or
- Toujours utiliser des valeurs par défaut
{
"name": "new_field",
"type": ["null", "string"],
"default": null // ✅ Critique pour BACKWARD
}
- Ne jamais renommer de champs
// ❌ Interdit
{"name": "userName"} → {"name": "username"}
// ✅ À la place: Ajouter nouveau champ, déprécier l'ancien
{
"fields": [
{"name": "userName", "type": "string", "deprecated": true},
{"name": "username", "type": "string", "default": ""}
]
}
- Utiliser des types union pour optionalité
// ✅ Bon
{"name": "age", "type": ["null", "int"], "default": null}
// ❌ Éviter
{"name": "age", "type": "int"} // Obligatoire
- Documenter tous les champs
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Date de création de l'utilisateur en millisecondes UTC"
}
3. Sécurité
Authentification BASIC
# Schema Registry config
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin,developer,read-only
# Fichier jaas_config.properties
admin: secret123,admin
developer: dev456,developer
readonly: read789,read-only
Client avec Authentification
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "developer:dev456");
HTTPS/TLS
SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8081
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/keystore.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: keystorepass
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: keypass
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: truststorepass
4. Performance
Caching côté Client
// Cache local des schémas (évite appels répétés)
props.put("schema.registry.cache.capacity", 1000);
// TTL du cache (en secondes)
props.put("schema.registry.cache.expiry.secs", 3600);
Batch Registration
# Enregistrer plusieurs schémas en parallèle
import concurrent.futures
def register_schema(subject, schema):
return client.register_schema(subject, schema)
schemas = [
("users-value", user_schema),
("orders-value", order_schema),
("payments-value", payment_schema)
]
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(register_schema, subject, schema)
for subject, schema in schemas
]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
Topic _schemas Optimization
# Augmenter réplication pour durabilité
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name _schemas \
--alter \
--add-config min.insync.replicas=2,replication.factor=3
# Compression
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name _schemas \
--alter \
--add-config compression.type=lz4
Dépannage
Problèmes Courants
1. Schema Incompatible
Erreur: Schema being registered is incompatible with an earlier schema
Diagnostic:
1. Récupérer le schéma actuel
curl http://localhost:8081/subjects/users-value/versions/latest
2. Comparer avec le nouveau schéma
3. Identifier le changement cassant
Solutions:
- Ajouter valeur par défaut au nouveau champ
- Modifier le mode de compatibilité (temporairement)
- Créer un nouveau subject (breaking change assumé)
2. Schema Not Found
Erreur: Subject 'users-value' not found
Causes possibles:
- Subject jamais enregistré
- Subject supprimé (soft delete)
- Faute de frappe dans le nom
Solution:
# Lister tous les subjects
curl http://localhost:8081/subjects
# Vérifier les subjects supprimés
curl http://localhost:8081/subjects?deleted=true
# Restaurer si besoin
curl -X POST http://localhost:8081/subjects/users-value/versions
3. Schema Registry Unavailable
Erreur: Connection refused to http://localhost:8081
Vérifications:
1. Service démarré?
docker ps | grep schema-registry
2. Logs d'erreur?
docker logs schema-registry
3. Kafka accessible?
docker exec schema-registry kafka-broker-api-versions \
--bootstrap-server kafka:29092
4. Port exposé?
netstat -an | grep 8081
4. Timeout lors de l'Enregistrement
Erreur: Request timeout
Causes:
- Kafka cluster surchargé
- Topic _schemas avec trop de partitions
- Network latency
Solution:
# Augmenter timeout
props.put("schema.registry.request.timeout.ms", 60000);
# Vérifier topic _schemas
kafka-topics --bootstrap-server localhost:9092 \
--describe --topic _schemas
Cas d'Usage Avancés
Migration de Schéma avec Downtime Zéro
Scénario: Renommer un champ (change cassant)
Étape 1: Double-Write (v1 et v2)
// Nouveau schéma v2 avec les deux champs
{
"type": "record",
"name": "User",
"fields": [
{"name": "user_name", "type": "string"}, // Ancien
{"name": "username", "type": "string"} // Nouveau
]
}
// Producer écrit les deux champs
GenericRecord user = new GenericData.Record(schema);
user.put("user_name", "florian"); // Pour anciens consumers
user.put("username", "florian"); // Pour nouveaux consumers
Étape 2: Migration des Consumers
// Consumers migrent progressivement vers "username"
String username = (String) record.get("username");
Étape 3: Arrêt du Double-Write
// Nouveau schéma v3 - suppression ancien champ
{
"type": "record",
"name": "User",
"fields": [
{"name": "username", "type": "string"}
]
}
Schema Registry Multi-Datacenter
Architecture
DC1 DC2
┌─────────────────────┐ ┌─────────────────────┐
│ Schema Registry │ │ Schema Registry │
│ (Primary) │ │ (Follower) │
│ │ │ │
│ Kafka Cluster 1 │──MirrorMaker─→│ Kafka Cluster 2 │
│ Topic: _schemas │ │ Topic: _schemas │
└─────────────────────┘ └─────────────────────┘
Configuration MirrorMaker 2
# mm2.properties
clusters = primary, secondary
primary.bootstrap.servers = kafka-dc1:9092
secondary.bootstrap.servers = kafka-dc2:9092
# Réplication du topic _schemas
primary->secondary.enabled = true
primary->secondary.topics = _schemas
primary->secondary.replication.factor = 3
Conclusion
Kafka Schema Registry est un composant essentiel pour tout déploiement Kafka en production. Il apporte :
Gouvernance : Contrôle strict de l'évolution des schémas Fiabilité : Détection précoce des incompatibilités Performance : Réduction de 70%+ de la taille des messages Maintenabilité : Documentation centralisée et versioning
Checklist de Mise en Production
- [ ] Déployer Schema Registry en cluster (min 3 instances)
- [ ] Configurer HAProxy/Load Balancer
- [ ] Activer authentification BASIC ou mTLS
- [ ] Configurer mode de compatibilité (BACKWARD recommandé)
- [ ] Définir naming strategy cohérente
- [ ] Mettre en place monitoring (Prometheus + Grafana)
- [ ] Configurer alertes (latence, erreurs, disponibilité)
- [ ] Créer backup topic _schemas
- [ ] Documenter processus d'évolution de schémas
- [ ] Former les équipes aux bonnes pratiques
Ressources
Schema Registry transforme Kafka d'un simple bus de messages en une plateforme de données robuste et évolutive. Son adoption est un investissement qui rapporte dès les premiers mois de production.