KAFKA
Avance

Guide Complet de la Securite Kafka

Guide Complet de la Securite Kafka

Securisez votre cluster Apache Kafka de A a Z : authentification SASL, chiffrement SSL/TLS, autorisation ACLs et RBAC, audit logging et bonnes pratiques de securite en production.

Florian Courouge
40 min de lecture
3,445 mots
0 vues
Kafka
Security
SASL
SSL
ACL
RBAC
Production
Authentication

Guide Complet de la Securite Kafka

Introduction

La securite d'Apache Kafka est souvent negligee en developpement, mais devient critique en production. Un cluster Kafka non securise expose vos donnees sensibles, permet des acces non autorises et viole potentiellement les reglementations (RGPD, PCI-DSS, HIPAA).

Ce guide couvre exhaustivement tous les aspects de la securite Kafka :

  • Authentification : SASL (PLAIN, SCRAM, GSSAPI/Kerberos, OAUTHBEARER)
  • Chiffrement : SSL/TLS pour les communications
  • Autorisation : ACLs et RBAC (Role-Based Access Control)
  • Audit : Logging des acces et operations
  • Bonnes pratiques : Configuration securisee en production

Architecture Securite Kafka

Vue d'Ensemble de la Securite Kafka

Les 4 Piliers de la Securite

┌─────────────────────────────────────────────────────────────────┐
│                    SECURITE KAFKA                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. AUTHENTIFICATION (Qui etes-vous?)                           │
│     └─ SASL: PLAIN, SCRAM-SHA-256/512, GSSAPI, OAUTHBEARER      │
│                                                                  │
│  2. CHIFFREMENT (Confidentialite des donnees)                   │
│     └─ SSL/TLS: Chiffrement en transit                          │
│                                                                  │
│  3. AUTORISATION (Que pouvez-vous faire?)                       │
│     └─ ACLs: Access Control Lists                               │
│     └─ RBAC: Role-Based Access Control (Confluent)              │
│                                                                  │
│  4. AUDIT (Tracabilite)                                         │
│     └─ Logging des connexions et operations                     │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Protocoles de Securite

Listeners disponibles:
┌──────────────────────────────────────────────────────────────┐
│ Protocole          │ Auth │ Chiffrement │ Cas d'usage        │
├────────────────────┼──────┼─────────────┼────────────────────┤
│ PLAINTEXT          │ Non  │ Non         │ Dev uniquement     │
│ SSL                │ Opt  │ Oui         │ Chiffrement seul   │
│ SASL_PLAINTEXT     │ Oui  │ Non         │ Auth sans TLS      │
│ SASL_SSL           │ Oui  │ Oui         │ Production         │
└──────────────────────────────────────────────────────────────┘

Chiffrement SSL/TLS

Concepts de Base

SSL/TLS dans Kafka:
┌─────────────────────────────────────────────────────────────┐
│                                                              │
│  Client ←───── TLS Handshake ─────→ Broker                  │
│         ←───── Encrypted Data ────→                         │
│                                                              │
│  Composants:                                                 │
│  ├─ Keystore: Contient le certificat + cle privee           │
│  ├─ Truststore: Contient les CA de confiance                │
│  └─ Certificats: Identite des parties                       │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Generation des Certificats

1. Creer une Autorite de Certification (CA)

#!/bin/bash
# generate-ca.sh

# Variables
CA_DIR=/opt/kafka/ssl/ca
VALIDITY=3650  # 10 ans
CA_PASSWORD="ca-secret-password"

mkdir -p $CA_DIR

# Generer la cle privee de la CA
openssl genrsa -aes256 -out $CA_DIR/ca-key.pem -passout pass:$CA_PASSWORD 4096

# Generer le certificat CA (auto-signe)
openssl req -new -x509 -days $VALIDITY \
  -key $CA_DIR/ca-key.pem \
  -passin pass:$CA_PASSWORD \
  -out $CA_DIR/ca-cert.pem \
  -subj "/C=FR/ST=IDF/L=Paris/O=MyCompany/OU=IT/CN=Kafka-CA"

# Verifier le certificat
openssl x509 -in $CA_DIR/ca-cert.pem -text -noout

2. Generer les Certificats pour les Brokers

#!/bin/bash
# generate-broker-certs.sh

BROKER_NAME=$1  # kafka1, kafka2, kafka3
SSL_DIR=/opt/kafka/ssl
CA_DIR=$SSL_DIR/ca
BROKER_DIR=$SSL_DIR/$BROKER_NAME
VALIDITY=365
KEYSTORE_PASSWORD="broker-keystore-password"
KEY_PASSWORD="broker-key-password"
CA_PASSWORD="ca-secret-password"

mkdir -p $BROKER_DIR

# 1. Generer le keystore avec une paire de cles
keytool -genkeypair -alias $BROKER_NAME \
  -keyalg RSA -keysize 2048 \
  -validity $VALIDITY \
  -keystore $BROKER_DIR/kafka.keystore.jks \
  -storepass $KEYSTORE_PASSWORD \
  -keypass $KEY_PASSWORD \
  -dname "CN=$BROKER_NAME.example.com,OU=IT,O=MyCompany,L=Paris,ST=IDF,C=FR" \
  -ext "SAN=DNS:$BROKER_NAME.example.com,DNS:localhost,IP:127.0.0.1"

# 2. Creer une demande de signature (CSR)
keytool -certreq -alias $BROKER_NAME \
  -keystore $BROKER_DIR/kafka.keystore.jks \
  -storepass $KEYSTORE_PASSWORD \
  -keypass $KEY_PASSWORD \
  -file $BROKER_DIR/$BROKER_NAME.csr \
  -ext "SAN=DNS:$BROKER_NAME.example.com,DNS:localhost,IP:127.0.0.1"

# 3. Signer le certificat avec la CA
openssl x509 -req -days $VALIDITY \
  -in $BROKER_DIR/$BROKER_NAME.csr \
  -CA $CA_DIR/ca-cert.pem \
  -CAkey $CA_DIR/ca-key.pem \
  -CAcreateserial \
  -passin pass:$CA_PASSWORD \
  -out $BROKER_DIR/$BROKER_NAME-signed.pem \
  -extensions v3_req \
  -extfile <(cat <<EOF
[v3_req]
subjectAltName = DNS:$BROKER_NAME.example.com,DNS:localhost,IP:127.0.0.1
EOF
)

# 4. Importer le certificat CA dans le keystore
keytool -importcert -alias CARoot \
  -keystore $BROKER_DIR/kafka.keystore.jks \
  -storepass $KEYSTORE_PASSWORD \
  -file $CA_DIR/ca-cert.pem \
  -noprompt

# 5. Importer le certificat signe dans le keystore
keytool -importcert -alias $BROKER_NAME \
  -keystore $BROKER_DIR/kafka.keystore.jks \
  -storepass $KEYSTORE_PASSWORD \
  -keypass $KEY_PASSWORD \
  -file $BROKER_DIR/$BROKER_NAME-signed.pem \
  -noprompt

# 6. Creer le truststore avec le certificat CA
keytool -importcert -alias CARoot \
  -keystore $BROKER_DIR/kafka.truststore.jks \
  -storepass $KEYSTORE_PASSWORD \
  -file $CA_DIR/ca-cert.pem \
  -noprompt

# Verification
keytool -list -keystore $BROKER_DIR/kafka.keystore.jks -storepass $KEYSTORE_PASSWORD
keytool -list -keystore $BROKER_DIR/kafka.truststore.jks -storepass $KEYSTORE_PASSWORD

echo "Certificats generes dans $BROKER_DIR"

3. Generer les Certificats pour les Clients

#!/bin/bash
# generate-client-certs.sh

CLIENT_NAME=$1  # producer-app, consumer-app, admin
# ... (meme processus que pour les brokers)

Configuration SSL du Broker

# server.properties

############################# SSL Configuration #############################

# Listeners avec SSL
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://kafka1.example.com:9092,SSL://kafka1.example.com:9093

# Inter-broker communication en SSL
security.inter.broker.protocol=SSL

# Keystore (certificat + cle privee du broker)
ssl.keystore.location=/opt/kafka/ssl/kafka1/kafka.keystore.jks
ssl.keystore.password=broker-keystore-password
ssl.key.password=broker-key-password
ssl.keystore.type=JKS

# Truststore (certificats CA de confiance)
ssl.truststore.location=/opt/kafka/ssl/kafka1/kafka.truststore.jks
ssl.truststore.password=broker-keystore-password
ssl.truststore.type=JKS

# Authentification client par certificat (optionnel)
ssl.client.auth=required
# Options: required, requested, none

# Protocoles et ciphers (securite renforcee)
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.protocol=TLSv1.3

# Ciphers recommandes (TLS 1.2)
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256

# Endpoint identification (verifie le hostname)
ssl.endpoint.identification.algorithm=https

Configuration SSL du Client (Java)

Properties props = new Properties();

// Bootstrap servers en SSL
props.put("bootstrap.servers", "kafka1.example.com:9093,kafka2.example.com:9093");
props.put("security.protocol", "SSL");

// Truststore (pour valider le certificat du broker)
props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

// Keystore (pour l'authentification client, si ssl.client.auth=required)
props.put("ssl.keystore.location", "/path/to/client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");

// Verification du hostname
props.put("ssl.endpoint.identification.algorithm", "https");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Configuration SSL avec Docker Compose

version: '3.8'

services:
  kafka-ssl:
    image: confluentinc/cp-kafka:7.4.0
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      # Listeners
      KAFKA_LISTENERS: SSL://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: SSL://kafka-ssl:9093
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL

      # SSL Configuration
      KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: broker-keystore-password
      KAFKA_SSL_KEY_PASSWORD: broker-key-password
      KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: broker-keystore-password

      KAFKA_SSL_CLIENT_AUTH: required
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https

    volumes:
      - ./ssl/kafka1:/etc/kafka/secrets:ro

Authentification SASL

Mecanismes SASL Disponibles

┌────────────────────────────────────────────────────────────────┐
│                   MECANISMES SASL                               │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  PLAIN                                                          │
│  ├─ Username/Password en clair                                  │
│  ├─ Simple a configurer                                         │
│  └─ TOUJOURS avec SSL (sinon credentials en clair!)             │
│                                                                 │
│  SCRAM-SHA-256 / SCRAM-SHA-512                                  │
│  ├─ Challenge-response (password jamais transmis)               │
│  ├─ Credentials stockes dans ZooKeeper                          │
│  └─ Recommande pour production                                  │
│                                                                 │
│  GSSAPI (Kerberos)                                              │
│  ├─ Authentification enterprise                                 │
│  ├─ Integration Active Directory                                │
│  └─ Complexe mais tres securise                                 │
│                                                                 │
│  OAUTHBEARER                                                    │
│  ├─ Tokens OAuth 2.0 / OpenID Connect                          │
│  ├─ Integration avec IdP (Keycloak, Okta, etc.)                 │
│  └─ Moderne et flexible                                         │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

Configuration SASL/PLAIN

Configuration Broker

# server.properties

# Listeners SASL_SSL (recommande) ou SASL_PLAINTEXT (dev uniquement)
listeners=SASL_SSL://0.0.0.0:9094
advertised.listeners=SASL_SSL://kafka1.example.com:9094
security.inter.broker.protocol=SASL_SSL

# Mecanismes SASL actives
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

# SSL (meme config que precedemment)
ssl.keystore.location=/opt/kafka/ssl/kafka1/kafka.keystore.jks
ssl.keystore.password=broker-keystore-password
# ... autres configs SSL ...

# Fichier JAAS pour le broker
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-secret" \
  user_admin="admin-secret" \
  user_producer="producer-secret" \
  user_consumer="consumer-secret";

Fichier JAAS (Alternative)

# /opt/kafka/config/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_producer="producer-secret"
    user_consumer="consumer-secret";
};
# Demarrer Kafka avec JAAS
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
kafka-server-start.sh config/server.properties

Configuration Client SASL/PLAIN

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");

// JAAS config inline
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"producer\" " +
    "password=\"producer-secret\";");

// SSL truststore
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Configuration SASL/SCRAM (Recommande)

SCRAM est plus securise que PLAIN car le mot de passe n'est jamais transmis.

1. Creer les Utilisateurs SCRAM dans ZooKeeper

# Creer un utilisateur admin
kafka-configs --zookeeper localhost:2181 --alter \
  --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \
  --entity-type users --entity-name admin

# Creer un utilisateur producer
kafka-configs --zookeeper localhost:2181 --alter \
  --add-config 'SCRAM-SHA-256=[password=producer-secret]' \
  --entity-type users --entity-name producer

# Creer un utilisateur consumer
kafka-configs --zookeeper localhost:2181 --alter \
  --add-config 'SCRAM-SHA-256=[password=consumer-secret]' \
  --entity-type users --entity-name consumer

# Lister les utilisateurs
kafka-configs --zookeeper localhost:2181 --describe \
  --entity-type users

2. Configuration Broker pour SCRAM

# server.properties

listeners=SASL_SSL://0.0.0.0:9094
advertised.listeners=SASL_SSL://kafka1.example.com:9094
security.inter.broker.protocol=SASL_SSL

# SCRAM mechanism
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# JAAS config pour le broker
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-secret";

3. Configuration Client SCRAM

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");

props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"producer\" " +
    "password=\"producer-secret\";");

// SSL config...

Configuration SASL/GSSAPI (Kerberos)

1. Prerequis Kerberos

# Creer les principals Kafka dans le KDC
kadmin.local -q "addprinc -randkey kafka/kafka1.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey kafka/kafka2.example.com@EXAMPLE.COM"
kadmin.local -q "addprinc -randkey kafka/kafka3.example.com@EXAMPLE.COM"

# Exporter les keytabs
kadmin.local -q "xst -k /opt/kafka/kafka1.keytab kafka/kafka1.example.com@EXAMPLE.COM"

2. Configuration Broker Kerberos

# server.properties

listeners=SASL_SSL://0.0.0.0:9094
security.inter.broker.protocol=SASL_SSL

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka

# JAAS
listener.name.sasl_ssl.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
  useKeyTab=true \
  storeKey=true \
  keyTab="/opt/kafka/kafka1.keytab" \
  principal="kafka/kafka1.example.com@EXAMPLE.COM";

3. Configuration Client Kerberos

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");

props.put("sasl.jaas.config",
    "com.sun.security.auth.module.Krb5LoginModule required " +
    "useKeyTab=true " +
    "storeKey=true " +
    "keyTab=\"/path/to/client.keytab\" " +
    "principal=\"client@EXAMPLE.COM\";");

Configuration SASL/OAUTHBEARER

1. Configuration Broker OAuth

# server.properties

listeners=SASL_SSL://0.0.0.0:9094
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER

# Custom callback handler pour validation des tokens
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler

# Configuration OAuth
listener.name.sasl_ssl.oauthbearer.sasl.oauthbearer.token.endpoint.url=https://idp.example.com/oauth/token
listener.name.sasl_ssl.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://idp.example.com/.well-known/jwks.json

2. Configuration Client OAuth

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");

props.put("sasl.login.callback.handler.class",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler");

props.put("sasl.oauthbearer.token.endpoint.url",
    "https://idp.example.com/oauth/token");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
    "clientId=\"kafka-client\" " +
    "clientSecret=\"client-secret\" " +
    "scope=\"kafka\";");

Autorisation avec ACLs

Concepts ACL

Structure d'une ACL:
┌─────────────────────────────────────────────────────────────┐
│                                                              │
│  Principal: User:producer                                   │
│  Permission: ALLOW                                           │
│  Operation: WRITE                                            │
│  Resource: Topic:orders                                      │
│  Pattern: LITERAL                                            │
│                                                              │
│  = L'utilisateur "producer" peut ecrire sur le topic        │
│    "orders"                                                  │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Operations disponibles:
├─ READ: Lire depuis un topic/consumer group
├─ WRITE: Ecrire dans un topic
├─ CREATE: Creer des topics
├─ DELETE: Supprimer des topics/records
├─ ALTER: Modifier la configuration
├─ DESCRIBE: Voir les metadata
├─ CLUSTER_ACTION: Operations cluster
├─ DESCRIBE_CONFIGS: Voir les configurations
├─ ALTER_CONFIGS: Modifier les configurations
├─ IDEMPOTENT_WRITE: Ecriture idempotente
└─ ALL: Toutes les operations

Activer l'Authorizer

# server.properties

# Activer l'authorizer ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# Super users (bypass ACLs)
super.users=User:admin;User:kafka

# Autoriser si pas d'ACL definie (false en production!)
allow.everyone.if.no.acl.found=false

Gestion des ACLs

Commandes de Base

# Lister toutes les ACLs
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --list

# Lister les ACLs pour un topic specifique
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --list --topic orders

ACLs pour Producer

# Autoriser un producer a ecrire sur un topic
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:producer \
  --operation WRITE \
  --operation DESCRIBE \
  --topic orders

# Autoriser l'ecriture idempotente (pour exactly-once)
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:producer \
  --operation IDEMPOTENT_WRITE \
  --cluster

# Autoriser les transactions
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:producer \
  --operation WRITE \
  --operation DESCRIBE \
  --transactional-id order-producer-*

ACLs pour Consumer

# Autoriser un consumer a lire un topic
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:consumer \
  --operation READ \
  --operation DESCRIBE \
  --topic orders

# Autoriser l'acces au consumer group
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:consumer \
  --operation READ \
  --group order-processors

ACLs avec Patterns (Prefixed)

# Autoriser acces a tous les topics commencant par "orders-"
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:order-service \
  --operation ALL \
  --topic orders- \
  --resource-pattern-type PREFIXED

# Autoriser acces a tous les consumer groups commencant par "order-"
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:order-service \
  --operation ALL \
  --group order- \
  --resource-pattern-type PREFIXED

ACLs d'Administration

# Autoriser la creation de topics
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:admin \
  --operation CREATE \
  --cluster

# Autoriser la modification des configs
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:admin \
  --operation ALTER_CONFIGS \
  --operation DESCRIBE_CONFIGS \
  --cluster

# Autoriser la gestion des ACLs
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --add \
  --allow-principal User:admin \
  --operation ALTER \
  --cluster

Supprimer des ACLs

# Supprimer une ACL specifique
kafka-acls --bootstrap-server localhost:9094 \
  --command-config /path/to/admin.properties \
  --remove \
  --allow-principal User:producer \
  --operation WRITE \
  --topic orders

Script de Configuration ACL Complet

#!/bin/bash
# setup-acls.sh

BOOTSTRAP="kafka1:9094"
ADMIN_CONFIG="/opt/kafka/config/admin.properties"

# Fonction helper
add_acl() {
    kafka-acls --bootstrap-server $BOOTSTRAP \
      --command-config $ADMIN_CONFIG \
      --add "$@"
}

# === Service Orders ===
# Producer
add_acl --allow-principal User:orders-producer \
  --operation WRITE --operation DESCRIBE \
  --topic orders-events
add_acl --allow-principal User:orders-producer \
  --operation IDEMPOTENT_WRITE --cluster

# Consumer
add_acl --allow-principal User:orders-consumer \
  --operation READ --operation DESCRIBE \
  --topic orders-events
add_acl --allow-principal User:orders-consumer \
  --operation READ --group orders-service

# === Service Inventory ===
# Consumer (lit orders-events)
add_acl --allow-principal User:inventory-service \
  --operation READ --operation DESCRIBE \
  --topic orders-events
add_acl --allow-principal User:inventory-service \
  --operation READ --group inventory-service

# Producer (publie inventory-events)
add_acl --allow-principal User:inventory-service \
  --operation WRITE --operation DESCRIBE \
  --topic inventory-events

# === Kafka Streams Application ===
add_acl --allow-principal User:streams-app \
  --operation ALL \
  --topic streams- --resource-pattern-type PREFIXED
add_acl --allow-principal User:streams-app \
  --operation ALL \
  --group streams-app- --resource-pattern-type PREFIXED

# === Monitoring (read-only) ===
add_acl --allow-principal User:monitoring \
  --operation DESCRIBE \
  --topic '*' --resource-pattern-type LITERAL
add_acl --allow-principal User:monitoring \
  --operation DESCRIBE \
  --cluster

echo "ACLs configured successfully"

# Lister toutes les ACLs
kafka-acls --bootstrap-server $BOOTSTRAP \
  --command-config $ADMIN_CONFIG --list

RBAC (Confluent Platform)

Concepts RBAC

RBAC vs ACLs:
┌─────────────────────────────────────────────────────────────┐
│                                                              │
│  ACLs:                                                       │
│  ├─ Granularite fine (par topic/operation)                  │
│  ├─ Gestion manuelle par commandes                          │
│  └─ Difficile a maintenir a grande echelle                  │
│                                                              │
│  RBAC (Confluent):                                          │
│  ├─ Roles predefinies (DeveloperRead, DeveloperWrite, etc.) │
│  ├─ Assignation par groupe LDAP/AD                          │
│  ├─ Interface UI pour gestion                               │
│  └─ Plus adapte aux grandes organisations                   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Roles Predefinies Confluent

Roles disponibles:
├─ SystemAdmin: Administration complete
├─ ClusterAdmin: Administration du cluster
├─ Operator: Operations (restart, rebalance)
├─ SecurityAdmin: Gestion ACLs et RBAC
├─ ResourceOwner: Proprietaire d'une ressource
├─ DeveloperManage: Gerer les ressources
├─ DeveloperRead: Lecture seule
└─ DeveloperWrite: Lecture et ecriture

Configuration RBAC

# server.properties (Confluent Platform)

# MDS (Metadata Service) pour RBAC
confluent.metadata.topic.replication.factor=3
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.advertised.listeners=http://kafka1:8090

# Authorizer RBAC
authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer

# LDAP configuration
ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
ldap.java.naming.provider.url=ldap://ldap.example.com:389
ldap.java.naming.security.principal=cn=admin,dc=example,dc=com
ldap.java.naming.security.credentials=admin-password
ldap.java.naming.security.authentication=simple
ldap.user.search.base=ou=users,dc=example,dc=com
ldap.group.search.base=ou=groups,dc=example,dc=com

Assignation de Roles via CLI

# Assigner le role DeveloperWrite a un utilisateur sur un topic
confluent iam rbac role-binding create \
  --principal User:producer-app \
  --role DeveloperWrite \
  --resource Topic:orders \
  --kafka-cluster-id <cluster-id>

# Assigner un role a un groupe LDAP
confluent iam rbac role-binding create \
  --principal Group:kafka-developers \
  --role DeveloperRead \
  --resource Topic:* \
  --prefix \
  --kafka-cluster-id <cluster-id>

Audit Logging

Activer l'Audit Log (Confluent)

# server.properties

# Activer l'audit log
confluent.security.event.logger.enable=true
confluent.security.event.logger.destination=topic

# Topic pour les audit logs
confluent.security.event.logger.topic.name=confluent-audit-log-events
confluent.security.event.logger.topic.replicas=3
confluent.security.event.logger.topic.partitions=12

# Categories d'evenements a logger
confluent.security.event.logger.authentication.enable=true
confluent.security.event.logger.authorization.enable=true

Format des Audit Logs

{
  "timestamp": "2025-01-05T10:30:00.000Z",
  "eventType": "AUTHORIZATION",
  "principal": "User:producer-app",
  "clientAddress": "192.168.1.100",
  "resourceType": "TOPIC",
  "resourceName": "orders",
  "operation": "WRITE",
  "result": "ALLOWED",
  "requestType": "Produce",
  "correlationId": "abc123"
}

Consumer pour Audit Logs

// Consumer pour lire les audit logs
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9094");
props.put("group.id", "audit-log-processor");
props.put("security.protocol", "SASL_SSL");
// ... config SASL ...

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("confluent-audit-log-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        AuditEvent event = parseAuditEvent(record.value());

        // Alerter sur les echecs d'autorisation
        if ("DENIED".equals(event.getResult())) {
            alertSecurityTeam(event);
        }

        // Stocker pour analyse
        storeInSIEM(event);
    }
}

Bonnes Pratiques de Securite

Checklist de Securite Production

[ ] Chiffrement
    [ ] SSL/TLS active sur tous les listeners
    [ ] Inter-broker communication en SSL
    [ ] Certificats signes par une CA interne
    [ ] Rotation des certificats planifiee
    [ ] TLS 1.2 ou 1.3 uniquement

[ ] Authentification
    [ ] SASL active (SCRAM ou Kerberos)
    [ ] Jamais PLAINTEXT en production
    [ ] Mots de passe forts et uniques
    [ ] Rotation des credentials planifiee

[ ] Autorisation
    [ ] ACLs definies pour tous les principals
    [ ] allow.everyone.if.no.acl.found=false
    [ ] Principe du moindre privilege
    [ ] Super users limites et documentes

[ ] Audit
    [ ] Audit logging active
    [ ] Logs centralises (SIEM)
    [ ] Alertes sur anomalies
    [ ] Retention suffisante

[ ] Reseau
    [ ] Kafka dans un reseau prive/VPC
    [ ] Firewall configuré (ports 9092-9094)
    [ ] Pas d'acces direct depuis Internet
    [ ] Load balancer si necessaire

[ ] Operations
    [ ] Secrets geres via Vault/KMS
    [ ] Backup des keystores
    [ ] Monitoring des connexions
    [ ] Plan de reponse aux incidents

Erreurs Courantes a Eviter

1. SASL/PLAIN sans SSL
   ├─ Risque: Credentials transmis en clair
   └─ Solution: Toujours SASL_SSL, jamais SASL_PLAINTEXT

2. allow.everyone.if.no.acl.found=true
   ├─ Risque: Acces non autorise si ACL oubliee
   └─ Solution: Toujours false en production

3. Super user partage
   ├─ Risque: Pas de tracabilite
   └─ Solution: Un super user par administrateur

4. Certificats auto-signes en production
   ├─ Risque: Man-in-the-middle possible
   └─ Solution: CA interne ou commerciale

5. Mots de passe dans les fichiers de config
   ├─ Risque: Exposition des credentials
   └─ Solution: Secrets manager (Vault, AWS Secrets)

6. Pas de rotation des credentials
   ├─ Risque: Compromission long terme
   └─ Solution: Rotation automatisee (90 jours max)

Gestion des Secrets

Avec HashiCorp Vault

# Stocker les credentials Kafka dans Vault
vault kv put secret/kafka/producer \
  username=producer \
  password=producer-secret

# Recuperer dans l'application
vault kv get -field=password secret/kafka/producer

Avec Kubernetes Secrets

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
type: Opaque
stringData:
  sasl-username: producer
  sasl-password: producer-secret
---
apiVersion: v1
kind: Pod
spec:
  containers:
  - name: kafka-client
    env:
    - name: KAFKA_SASL_USERNAME
      valueFrom:
        secretKeyRef:
          name: kafka-credentials
          key: sasl-username
    - name: KAFKA_SASL_PASSWORD
      valueFrom:
        secretKeyRef:
          name: kafka-credentials
          key: sasl-password

Troubleshooting Securite

Erreurs Courantes

SSL Handshake Failed

# Verifier le certificat
openssl s_client -connect kafka1:9093 -showcerts

# Verifier le keystore
keytool -list -v -keystore kafka.keystore.jks

# Cause courante: Hostname mismatch
# Solution: Verifier ssl.endpoint.identification.algorithm

SASL Authentication Failed

# Verifier les logs broker
grep "authentication" /var/log/kafka/server.log

# Causes courantes:
# - Mauvais username/password
# - Mecanisme SASL non active sur le broker
# - JAAS config mal formatee

Authorization Failed

# Lister les ACLs pour le principal
kafka-acls --bootstrap-server kafka1:9094 \
  --command-config admin.properties \
  --list --principal User:producer

# Verifier que allow.everyone.if.no.acl.found n'est pas true
# Verifier les super.users

Logs de Debug

# log4j.properties
log4j.logger.kafka.authorizer.logger=DEBUG
log4j.logger.org.apache.kafka.common.security=DEBUG
log4j.logger.org.apache.kafka.common.network.SslTransportLayer=DEBUG

A Retenir

  1. Defense en profondeur : Combinez SSL + SASL + ACLs pour une securite complete
  2. SCRAM > PLAIN : Preferez SCRAM-SHA-256/512 a PLAIN pour l'authentification
  3. Moindre privilege : Chaque application ne doit avoir que les droits necessaires
  4. Audit systematique : Tracez toutes les connexions et operations
  5. Rotation des credentials : Automatisez la rotation des mots de passe et certificats
  6. Secrets management : Ne stockez jamais les credentials dans le code ou les configs
  7. Tests reguliers : Validez votre configuration securite regulierement

La securite de votre cluster Kafka est critique pour proteger vos donnees. Contactez-moi pour un audit de securite ou une assistance dans la mise en place d'une architecture Kafka securisee.

F

Florian Courouge

Expert DevOps & Kafka | Consultant freelance specialise dans les architectures distribuees et le streaming de donnees.

Articles similaires