Aller au contenu principal
DATA ENGINEERING

Kafka Schema Registry : Gestion des Schémas et Evolution

Florian Courouge
min de lecture
Kafka
Schema Registry
Avro
Data Governance
API
Niveau:
Intermédiaire

Kafka Schema Registry : Gestion des Schémas et Evolution

Introduction

Dans un écosystème Apache Kafka en production, l'évolution des données est inévitable. Les applications changent, les modèles de données évoluent, et les formats de messages doivent s'adapter. Kafka Schema Registry est la solution de référence pour gérer cette complexité en offrant un registre centralisé de schémas de données avec un contrôle strict de compatibilité.

Dans ce guide approfondi, nous explorerons Schema Registry en détail : son architecture, les différents formats de sérialisation, les modes de compatibilité, l'intégration avec les producteurs et consommateurs, et les bonnes pratiques pour une utilisation en production.

Pourquoi Schema Registry ?

Les Problèmes sans Schema Registry

Sans gestion centralisée des schémas, plusieurs problèmes apparaissent rapidement :

1. Incompatibilité de Versions

Producer v1 → {"name": "John", "age": 30}
Producer v2 → {"name": "John", "age": 30, "email": "john@example.com"}

Consumer v1 → Impossible de lire les nouveaux champs
Consumer v2 → Incompatible avec les anciens messages

2. Duplication des Schémas

  • Chaque application maintient sa propre copie du schéma
  • Divergence progressive entre les versions
  • Difficultés de synchronisation

3. Absence de Validation

  • Messages invalides publiés dans les topics
  • Découverte tardive des erreurs (côté consommateur)
  • Corruption potentielle du pipeline de données

4. Overhead de Stockage

  • Schémas embarqués dans chaque message
  • Gaspillage de bande passante et d'espace disque

Les Avantages de Schema Registry

Schema Registry résout ces problèmes en offrant :

Registre Centralisé

Schema Registry (Single Source of Truth)
    ↓
Producers ← → Consumers
(Référencent le même schéma via ID)

Contrôle de Compatibilité

  • Validation automatique lors de l'enregistrement
  • Modes de compatibilité configurables (BACKWARD, FORWARD, FULL)
  • Prévention des changements cassants

Optimisation du Stockage

Sans Schema Registry:
Message = [Schéma complet] + [Données]
Taille: ~2KB par message

Avec Schema Registry:
Message = [Schema ID (4 bytes)] + [Données]
Taille: ~500 bytes par message
Économie: 75%

Gouvernance des Données

  • Historique complet des versions de schémas
  • Audit trail pour la conformité
  • Documentation auto-générée

Architecture de Schema Registry

Composants Principaux

┌─────────────────────────────────────────────────────────┐
│                  Schema Registry Cluster                │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐        │
│  │  Instance  │  │  Instance  │  │  Instance  │        │
│  │     #1     │  │     #2     │  │     #3     │        │
│  │  (Leader)  │  │ (Follower) │  │ (Follower) │        │
│  └─────┬──────┘  └──────┬─────┘  └──────┬─────┘        │
│        │                 │                │              │
│        └─────────────────┼────────────────┘              │
│                          ↓                                │
│              ┌───────────────────────┐                   │
│              │  Kafka Topic:         │                   │
│              │  _schemas             │                   │
│              │  (Stockage durable)   │                   │
│              └───────────────────────┘                   │
└─────────────────────────────────────────────────────────┘
                          ↑
          ┌───────────────┼───────────────┐
          ↓                               ↓
   ┌─────────────┐                 ┌─────────────┐
   │  Producers  │                 │  Consumers  │
   │             │                 │             │
   │ - Fetch     │                 │ - Fetch     │
   │   Schema    │                 │   Schema    │
   │ - Validate  │                 │ - Validate  │
   │ - Serialize │                 │ - Deserial. │
   └─────────────┘                 └─────────────┘

Fonctionnement Interne

1. Enregistrement d'un Schéma

Producer
  ↓ POST /subjects/user-value/versions
Schema Registry
  ↓ Vérifie compatibilité
  ↓ Assigne ID unique (ex: 42)
  ↓ Stocke dans topic _schemas
Kafka (_schemas topic)

2. Sérialisation d'un Message

Producer
  ↓ 1. Récupère Schema ID (cache local)
  ↓ 2. Sérialise données selon schéma
  ↓ 3. Ajoute magic byte + Schema ID
  ↓
Message: [0x00][0x00][0x00][0x00][0x2A][données binaires]
         └─┬─┘└────────┬────────┘└────────┬──────────┘
       Magic   Schema ID (42)      Payload Avro

3. Désérialisation d'un Message

Kafka Topic
  ↓
Consumer
  ↓ 1. Lit magic byte + Schema ID
  ↓ 2. Récupère schéma depuis Registry (cache)
  ↓ 3. Désérialise avec le bon schéma
  ↓
Objet Java/Python

Formats de Sérialisation

Schema Registry supporte plusieurs formats :

1. Avro (Le plus populaire)

Caractéristiques

  • Format binaire compact
  • Schéma séparé des données
  • Support natif de l'évolution de schéma
  • Riche écosystème d'outils

Exemple de Schéma Avro

{
  "type": "record",
  "name": "User",
  "namespace": "com.floriancourouge.model",
  "fields": [
    {
      "name": "id",
      "type": "long",
      "doc": "Identifiant unique de l'utilisateur"
    },
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "int"],
      "default": null,
      "doc": "Age optionnel"
    },
    {
      "name": "preferences",
      "type": {
        "type": "record",
        "name": "UserPreferences",
        "fields": [
          {"name": "language", "type": "string", "default": "en"},
          {"name": "notifications", "type": "boolean", "default": true}
        ]
      },
      "default": {}
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

Types Avro Supportés

Primitifs:
- null, boolean, int, long, float, double
- bytes, string

Complexes:
- record (structure)
- enum (énumération)
- array (tableau)
- map (dictionnaire)
- union (type composite, ex: ["null", "string"])
- fixed (bytes de taille fixe)

Logiques:
- decimal
- date
- time-millis, time-micros
- timestamp-millis, timestamp-micros
- uuid

Producer Java avec Avro

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class AvroProducerExample {

    public static void main(String[] args) {
        // Configuration
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  KafkaAvroSerializer.class.getName());
        props.put("schema.registry.url", "http://localhost:8081");

        // Création du producer
        KafkaProducer<String, GenericRecord> producer =
            new KafkaProducer<>(props);

        // Définition du schéma
        String schemaString = """
            {
              "type": "record",
              "name": "User",
              "fields": [
                {"name": "id", "type": "long"},
                {"name": "username", "type": "string"},
                {"name": "email", "type": "string"}
              ]
            }
        """;

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaString);

        // Création d'un enregistrement
        GenericRecord user = new GenericData.Record(schema);
        user.put("id", 123L);
        user.put("username", "florian");
        user.put("email", "florian@example.com");

        // Envoi du message
        ProducerRecord<String, GenericRecord> record =
            new ProducerRecord<>("users", "user-123", user);

        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.printf("Message envoyé: partition=%d, offset=%d%n",
                    metadata.partition(), metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });

        producer.flush();
        producer.close();
    }
}

Consumer Java avec Avro

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                  KafkaAvroDeserializer.class.getName());
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("specific.avro.reader", "false"); // GenericRecord

        KafkaConsumer<String, GenericRecord> consumer =
            new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("users"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.printf("User reçu: id=%d, username=%s, email=%s%n",
                        user.get("id"),
                        user.get("username"),
                        user.get("email"));
                }
            }
        } finally {
            consumer.close();
        }
    }
}

2. JSON Schema

Caractéristiques

  • Lisible par l'humain
  • Compatible avec REST APIs
  • Moins compact qu'Avro
  • Schéma plus verbeux

Exemple JSON Schema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "type": "object",
  "properties": {
    "id": {
      "type": "integer",
      "description": "Identifiant unique"
    },
    "username": {
      "type": "string",
      "minLength": 3,
      "maxLength": 50
    },
    "email": {
      "type": "string",
      "format": "email"
    },
    "age": {
      "type": ["integer", "null"],
      "minimum": 0,
      "maximum": 150
    },
    "roles": {
      "type": "array",
      "items": {
        "type": "string",
        "enum": ["USER", "ADMIN", "MODERATOR"]
      }
    }
  },
  "required": ["id", "username", "email"],
  "additionalProperties": false
}

Producer Python avec JSON Schema

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.serialization import (
    SerializationContext,
    MessageField
)
import json

# Schéma JSON
user_schema = """
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "id": {"type": "integer"},
    "username": {"type": "string"},
    "email": {"type": "string", "format": "email"}
  },
  "required": ["id", "username", "email"]
}
"""

# Configuration Schema Registry
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Serializer
json_serializer = JSONSerializer(
    user_schema,
    schema_registry_client,
    to_dict=lambda user, ctx: user
)

# Configuration Producer
producer_conf = {
    'bootstrap.servers': 'localhost:9092'
}
producer = Producer(producer_conf)

# Données utilisateur
user = {
    'id': 123,
    'username': 'florian',
    'email': 'florian@example.com'
}

# Sérialisation et envoi
producer.produce(
    topic='users',
    key='user-123',
    value=json_serializer(
        user,
        SerializationContext('users', MessageField.VALUE)
    ),
    on_delivery=lambda err, msg: print(
        f'Livré: {msg.topic()}[{msg.partition()}]' if not err
        else f'Erreur: {err}'
    )
)

producer.flush()

3. Protobuf

Caractéristiques

  • Format binaire très performant
  • Largement utilisé chez Google
  • Support natif dans nombreux langages
  • Évolution de schéma native

Exemple Protobuf Schema

syntax = "proto3";

package com.floriancourouge.model;

message User {
  int64 id = 1;
  string username = 2;
  string email = 3;
  optional int32 age = 4;

  message Preferences {
    string language = 1;
    bool notifications = 2;
  }

  Preferences preferences = 5;
  int64 created_at = 6;
}

enum UserRole {
  USER = 0;
  ADMIN = 1;
  MODERATOR = 2;
}

Comparaison des Formats

| Critère | Avro | JSON Schema | Protobuf | |---------|------|-------------|----------| | Taille binaire | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐ Moyen | ⭐⭐⭐⭐⭐ Excellent | | Lisibilité | ⭐⭐⭐ Bon (JSON) | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐ Moyen (.proto) | | Performance | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐ Bon | ⭐⭐⭐⭐⭐ Excellent | | Evolution | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐ Bon | ⭐⭐⭐⭐ Très bon | | Écosystème Kafka | ⭐⭐⭐⭐⭐ Standard | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐⭐ Très bon | | Support langages | ⭐⭐⭐⭐ Très bon | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent |

Modes de Compatibilité

Schema Registry impose des règles de compatibilité pour garantir l'évolution saine des schémas.

Types de Compatibilité

1. BACKWARD (Par défaut)

Les consommateurs utilisant le nouveau schéma peuvent lire les données produites avec l'ancien schéma.

Scénario:
- Déploiement: Consommateurs PUIS Producteurs
- Cas d'usage: Lecture de données historiques

Évolutions autorisées:
✅ Ajout de champs avec valeur par défaut
✅ Suppression de champs

Évolutions interdites:
❌ Ajout de champs obligatoires
❌ Changement de type

Exemple BACKWARD

// Schéma v1
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

// Schéma v2 (BACKWARD compatible)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": ["null", "string"], "default": null} // ✅ OK
  ]
}

// Schéma v2 (NON compatible)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": "string"} // ❌ Champ obligatoire
  ]
}

2. FORWARD

Les consommateurs utilisant l'ancien schéma peuvent lire les données produites avec le nouveau schéma.

Scénario:
- Déploiement: Producteurs PUIS Consommateurs
- Cas d'usage: Ajout progressif de fonctionnalités

Évolutions autorisées:
✅ Ajout de champs optionnels
✅ Suppression de champs avec valeur par défaut

Évolutions interdites:
❌ Suppression de champs obligatoires
❌ Changement de type

Exemple FORWARD

// Schéma v1
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": ["null", "string"], "default": null}
  ]
}

// Schéma v2 (FORWARD compatible)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"}
    // Suppression de "phone" ✅ OK (avait valeur par défaut)
  ]
}

3. FULL (BACKWARD + FORWARD)

Compatibilité totale dans les deux sens.

Évolutions autorisées:
✅ Ajout de champs optionnels avec valeur par défaut
✅ Suppression de champs optionnels avec valeur par défaut

Évolutions interdites:
❌ Tout changement cassant

4. NONE

Aucune vérification de compatibilité (déconseillé en production).

5. BACKWARD_TRANSITIVE / FORWARD_TRANSITIVE / FULL_TRANSITIVE

Vérifie la compatibilité avec toutes les versions précédentes (pas seulement la dernière).

Configuration du Mode de Compatibilité

Au niveau global

curl -X PUT -H "Content-Type: application/json" \
  --data '{"compatibility": "BACKWARD"}' \
  http://localhost:8081/config

Au niveau d'un subject

curl -X PUT -H "Content-Type: application/json" \
  --data '{"compatibility": "FULL"}' \
  http://localhost:8081/config/users-value

Vérification de compatibilité manuelle

# Test de compatibilité avant enregistrement
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
  http://localhost:8081/compatibility/subjects/users-value/versions/latest

# Réponse
{"is_compatible": true}

API REST de Schema Registry

Endpoints Principaux

1. Enregistrement d'un Schéma

POST /subjects/{subject}/versions

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"username\",\"type\":\"string\"}]}"
  }' \
  http://localhost:8081/subjects/users-value/versions

# Réponse
{"id": 42}

2. Récupération d'un Schéma par ID

GET /schemas/ids/{id}

curl http://localhost:8081/schemas/ids/42

# Réponse
{
  "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[...]}"
}

3. Liste des Versions d'un Subject

GET /subjects/{subject}/versions

curl http://localhost:8081/subjects/users-value/versions

# Réponse
[1, 2, 3, 4]

4. Récupération d'une Version Spécifique

GET /subjects/{subject}/versions/{version}

curl http://localhost:8081/subjects/users-value/versions/latest

# Réponse
{
  "subject": "users-value",
  "version": 4,
  "id": 42,
  "schema": "{...}"
}

5. Suppression d'un Schéma

# Soft delete (peut être restauré)
DELETE /subjects/{subject}/versions/{version}

curl -X DELETE http://localhost:8081/subjects/users-value/versions/3

# Hard delete (permanent)
DELETE /subjects/{subject}/versions/{version}?permanent=true

curl -X DELETE http://localhost:8081/subjects/users-value/versions/3?permanent=true

6. Liste de tous les Subjects

GET /subjects

curl http://localhost:8081/subjects

# Réponse
["users-value", "orders-value", "payments-key"]

Client Python pour l'API

import requests
import json

class SchemaRegistryClient:
    def __init__(self, url="http://localhost:8081"):
        self.url = url

    def register_schema(self, subject, schema):
        """Enregistre un nouveau schéma"""
        endpoint = f"{self.url}/subjects/{subject}/versions"
        payload = {"schema": json.dumps(schema)}
        response = requests.post(
            endpoint,
            json=payload,
            headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}
        )
        response.raise_for_status()
        return response.json()["id"]

    def get_schema(self, schema_id):
        """Récupère un schéma par ID"""
        endpoint = f"{self.url}/schemas/ids/{schema_id}"
        response = requests.get(endpoint)
        response.raise_for_status()
        return json.loads(response.json()["schema"])

    def get_latest_schema(self, subject):
        """Récupère la dernière version d'un schéma"""
        endpoint = f"{self.url}/subjects/{subject}/versions/latest"
        response = requests.get(endpoint)
        response.raise_for_status()
        return response.json()

    def list_subjects(self):
        """Liste tous les subjects"""
        endpoint = f"{self.url}/subjects"
        response = requests.get(endpoint)
        response.raise_for_status()
        return response.json()

    def check_compatibility(self, subject, schema):
        """Vérifie la compatibilité d'un schéma"""
        endpoint = f"{self.url}/compatibility/subjects/{subject}/versions/latest"
        payload = {"schema": json.dumps(schema)}
        response = requests.post(
            endpoint,
            json=payload,
            headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}
        )
        response.raise_for_status()
        return response.json()["is_compatible"]

# Utilisation
client = SchemaRegistryClient()

# Enregistrement
user_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "username", "type": "string"}
    ]
}
schema_id = client.register_schema("users-value", user_schema)
print(f"Schema ID: {schema_id}")

# Récupération
schema = client.get_schema(schema_id)
print(f"Schema: {schema}")

# Vérification de compatibilité
new_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "username", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": None}
    ]
}
is_compatible = client.check_compatibility("users-value", new_schema)
print(f"Compatible: {is_compatible}")

Déploiement en Production

Docker Compose Setup

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-logs:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - kafka-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

      # Mode de compatibilité par défaut
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD

      # Performance tuning
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
      SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS: 5000

      # Sécurité
      SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
      SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
      SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin,developer

      # Métriques JMX
      SCHEMA_REGISTRY_JMX_PORT: 9999
      SCHEMA_REGISTRY_JMX_HOSTNAME: schema-registry
    volumes:
      - schema-registry-data:/var/lib/schema-registry
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8081/"]
      interval: 30s
      timeout: 10s
      retries: 5

  # Interface web pour Schema Registry
  schema-registry-ui:
    image: landoop/schema-registry-ui:latest
    depends_on:
      - schema-registry
    ports:
      - "8000:8000"
    environment:
      SCHEMAREGISTRY_URL: http://schema-registry:8081
      PROXY: "true"

volumes:
  zookeeper-data:
  zookeeper-logs:
  kafka-data:
  schema-registry-data:

Configuration Haute Disponibilité

Cluster Schema Registry (3 instances)

# docker-compose-ha.yaml
version: '3.8'

services:
  schema-registry-1:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

      # Leader eligibility
      SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"

      # Topic pour stockage avec réplication
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3

      # Configuration du groupe
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster

  schema-registry-2:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8082:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry-2
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster

  schema-registry-3:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8083:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry-3
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-registry-cluster

  # Load balancer HAProxy
  haproxy:
    image: haproxy:2.8
    ports:
      - "8080:8080"
      - "8090:8090"
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    depends_on:
      - schema-registry-1
      - schema-registry-2
      - schema-registry-3

Configuration HAProxy

# haproxy.cfg
global
    log stdout format raw local0
    maxconn 4096

defaults
    log     global
    mode    http
    option  httplog
    option  dontlognull
    timeout connect 5000ms
    timeout client  50000ms
    timeout server  50000ms

frontend schema_registry_frontend
    bind *:8080
    default_backend schema_registry_backend

backend schema_registry_backend
    balance roundrobin
    option httpchk GET /
    http-check expect status 200
    server sr1 schema-registry-1:8081 check
    server sr2 schema-registry-2:8081 check
    server sr3 schema-registry-3:8081 check

frontend stats
    bind *:8090
    stats enable
    stats uri /
    stats refresh 10s

Monitoring et Métriques

Prometheus Configuration

# prometheus.yml
scrape_configs:
  - job_name: 'schema-registry'
    static_configs:
      - targets:
        - 'schema-registry-1:9999'
        - 'schema-registry-2:9999'
        - 'schema-registry-3:9999'
    metrics_path: /

Métriques JMX Clés

# Nombre de schémas enregistrés
kafka_schema_registry_registered_count

# Requêtes par seconde
kafka_schema_registry_api_request_rate

# Latence des requêtes
kafka_schema_registry_api_request_latency_ms

# Cache hit ratio
kafka_schema_registry_cache_hit_ratio

# Nombre d'erreurs de compatibilité
kafka_schema_registry_compatibility_check_errors_total

Grafana Dashboard Query Examples

# Taux de requêtes API
rate(kafka_schema_registry_api_request_count[5m])

# Latence P99
histogram_quantile(0.99,
  rate(kafka_schema_registry_api_request_latency_ms_bucket[5m])
)

# Erreurs de compatibilité
sum(rate(kafka_schema_registry_compatibility_check_errors_total[5m]))
  by (compatibility_level)

# Taille du cache
kafka_schema_registry_cache_size

Bonnes Pratiques

1. Naming Conventions

Subject Naming Strategy

Confluent recommande trois stratégies :

TopicNameStrategy (Par défaut)

Subject: <topic>-key
Subject: <topic>-value

Exemple:
- users-key
- users-value
- orders-value

RecordNameStrategy

Subject: <fully.qualified.record.name>

Exemple:
- com.floriancourouge.model.User
- com.floriancourouge.model.Order

TopicRecordNameStrategy

Subject: <topic>-<fully.qualified.record.name>

Exemple:
- users-com.floriancourouge.model.User
- events-com.floriancourouge.model.UserEvent

Configuration Java

props.put("value.subject.name.strategy",
          "io.confluent.kafka.serializers.subject.RecordNameStrategy");

2. Versioning et Evolution

Règles d'Or

  1. Toujours utiliser des valeurs par défaut
{
  "name": "new_field",
  "type": ["null", "string"],
  "default": null  // ✅ Critique pour BACKWARD
}
  1. Ne jamais renommer de champs
// ❌ Interdit
{"name": "userName"} → {"name": "username"}

// ✅ À la place: Ajouter nouveau champ, déprécier l'ancien
{
  "fields": [
    {"name": "userName", "type": "string", "deprecated": true},
    {"name": "username", "type": "string", "default": ""}
  ]
}
  1. Utiliser des types union pour optionalité
// ✅ Bon
{"name": "age", "type": ["null", "int"], "default": null}

// ❌ Éviter
{"name": "age", "type": "int"}  // Obligatoire
  1. Documenter tous les champs
{
  "name": "created_at",
  "type": "long",
  "logicalType": "timestamp-millis",
  "doc": "Date de création de l'utilisateur en millisecondes UTC"
}

3. Sécurité

Authentification BASIC

# Schema Registry config
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin,developer,read-only

# Fichier jaas_config.properties
admin: secret123,admin
developer: dev456,developer
readonly: read789,read-only

Client avec Authentification

Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "developer:dev456");

HTTPS/TLS

SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8081
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/keystore.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: keystorepass
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: keypass
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: truststorepass

4. Performance

Caching côté Client

// Cache local des schémas (évite appels répétés)
props.put("schema.registry.cache.capacity", 1000);

// TTL du cache (en secondes)
props.put("schema.registry.cache.expiry.secs", 3600);

Batch Registration

# Enregistrer plusieurs schémas en parallèle
import concurrent.futures

def register_schema(subject, schema):
    return client.register_schema(subject, schema)

schemas = [
    ("users-value", user_schema),
    ("orders-value", order_schema),
    ("payments-value", payment_schema)
]

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = [
        executor.submit(register_schema, subject, schema)
        for subject, schema in schemas
    ]
    results = [f.result() for f in concurrent.futures.as_completed(futures)]

Topic _schemas Optimization

# Augmenter réplication pour durabilité
kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name _schemas \
  --alter \
  --add-config min.insync.replicas=2,replication.factor=3

# Compression
kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name _schemas \
  --alter \
  --add-config compression.type=lz4

Dépannage

Problèmes Courants

1. Schema Incompatible

Erreur: Schema being registered is incompatible with an earlier schema

Diagnostic:
1. Récupérer le schéma actuel
curl http://localhost:8081/subjects/users-value/versions/latest

2. Comparer avec le nouveau schéma
3. Identifier le changement cassant

Solutions:
- Ajouter valeur par défaut au nouveau champ
- Modifier le mode de compatibilité (temporairement)
- Créer un nouveau subject (breaking change assumé)

2. Schema Not Found

Erreur: Subject 'users-value' not found

Causes possibles:
- Subject jamais enregistré
- Subject supprimé (soft delete)
- Faute de frappe dans le nom

Solution:
# Lister tous les subjects
curl http://localhost:8081/subjects

# Vérifier les subjects supprimés
curl http://localhost:8081/subjects?deleted=true

# Restaurer si besoin
curl -X POST http://localhost:8081/subjects/users-value/versions

3. Schema Registry Unavailable

Erreur: Connection refused to http://localhost:8081

Vérifications:
1. Service démarré?
docker ps | grep schema-registry

2. Logs d'erreur?
docker logs schema-registry

3. Kafka accessible?
docker exec schema-registry kafka-broker-api-versions \
  --bootstrap-server kafka:29092

4. Port exposé?
netstat -an | grep 8081

4. Timeout lors de l'Enregistrement

Erreur: Request timeout

Causes:
- Kafka cluster surchargé
- Topic _schemas avec trop de partitions
- Network latency

Solution:
# Augmenter timeout
props.put("schema.registry.request.timeout.ms", 60000);

# Vérifier topic _schemas
kafka-topics --bootstrap-server localhost:9092 \
  --describe --topic _schemas

Cas d'Usage Avancés

Migration de Schéma avec Downtime Zéro

Scénario: Renommer un champ (change cassant)

Étape 1: Double-Write (v1 et v2)

// Nouveau schéma v2 avec les deux champs
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "user_name", "type": "string"},  // Ancien
    {"name": "username", "type": "string"}    // Nouveau
  ]
}

// Producer écrit les deux champs
GenericRecord user = new GenericData.Record(schema);
user.put("user_name", "florian");  // Pour anciens consumers
user.put("username", "florian");    // Pour nouveaux consumers

Étape 2: Migration des Consumers

// Consumers migrent progressivement vers "username"
String username = (String) record.get("username");

Étape 3: Arrêt du Double-Write

// Nouveau schéma v3 - suppression ancien champ
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "username", "type": "string"}
  ]
}

Schema Registry Multi-Datacenter

Architecture

DC1                                    DC2
┌─────────────────────┐               ┌─────────────────────┐
│ Schema Registry     │               │ Schema Registry     │
│ (Primary)           │               │ (Follower)          │
│                     │               │                     │
│ Kafka Cluster 1     │──MirrorMaker─→│ Kafka Cluster 2     │
│ Topic: _schemas     │               │ Topic: _schemas     │
└─────────────────────┘               └─────────────────────┘

Configuration MirrorMaker 2

# mm2.properties
clusters = primary, secondary
primary.bootstrap.servers = kafka-dc1:9092
secondary.bootstrap.servers = kafka-dc2:9092

# Réplication du topic _schemas
primary->secondary.enabled = true
primary->secondary.topics = _schemas
primary->secondary.replication.factor = 3

Conclusion

Kafka Schema Registry est un composant essentiel pour tout déploiement Kafka en production. Il apporte :

Gouvernance : Contrôle strict de l'évolution des schémas Fiabilité : Détection précoce des incompatibilités Performance : Réduction de 70%+ de la taille des messages Maintenabilité : Documentation centralisée et versioning

Checklist de Mise en Production

  • [ ] Déployer Schema Registry en cluster (min 3 instances)
  • [ ] Configurer HAProxy/Load Balancer
  • [ ] Activer authentification BASIC ou mTLS
  • [ ] Configurer mode de compatibilité (BACKWARD recommandé)
  • [ ] Définir naming strategy cohérente
  • [ ] Mettre en place monitoring (Prometheus + Grafana)
  • [ ] Configurer alertes (latence, erreurs, disponibilité)
  • [ ] Créer backup topic _schemas
  • [ ] Documenter processus d'évolution de schémas
  • [ ] Former les équipes aux bonnes pratiques

Ressources

Schema Registry transforme Kafka d'un simple bus de messages en une plateforme de données robuste et évolutive. Son adoption est un investissement qui rapporte dès les premiers mois de production.