Intermédiaire
⭐ Article vedette

Kafka Cheat Sheet : Commandes Essentielles et Configuration Multi-Clusters

Guide complet des commandes Kafka avec configurations avancées : gestion multi-clusters, sécurité SASL/SSL, scripts d'automatisation et bonnes pratiques.

Publié le
16 décembre 2024
Lecture
12 min
Vues
0
Auteur
Florian Courouge
Kafka
CLI
Configuration
SASL
SSL
Multi-Cluster
Cheat Sheet

Table des matières

📋 Vue d'ensemble rapide des sujets traités dans cet article

Cliquez sur les sections ci-dessous pour naviguer rapidement

Kafka Cheat Sheet : Commandes Essentielles et Configuration Multi-Clusters

Apache Kafka est un système de streaming distribué puissant, mais sa gestion peut devenir complexe avec plusieurs clusters ayant des configurations différentes. Ce guide présente une approche structurée pour simplifier l'utilisation de Kafka avec des fichiers de configuration centralisés et des scripts d'automatisation.

Kafka Multi-Cluster Architecture

💡Introduction : Défis de la Gestion Multi-Clusters

Pour simplifier l'utilisation de Kafka avec plusieurs clusters ayant des configurations différentes (par exemple, pour gérer la sécurité avec SASL, SSL, et différentes configurations JAAS), il est recommandé de centraliser les configurations dans des fichiers de configuration, et d'utiliser des variables d'environnement pour gérer les paramètres dynamiques.

Problématiques Courantes

  • Configurations multiples : Développement, staging, production
  • Sécurité variée : SASL, SSL, mTLS selon l'environnement
  • Gestion des certificats : Différents keystores et truststores
  • Scripts répétitifs : Commandes longues et sujettes aux erreurs

💡Architecture de Configuration Centralisée

Structure de Projet Recommandée

# Structure complète pour gestion Kafka multi-clusters
kafka-management/
├── configs/
│   ├── clusters/
│   │   ├── dev.properties
│   │   ├── staging.properties
│   │   └── prod.properties
│   ├── producers/
│   │   ├── producer-dev.properties
│   │   ├── producer-staging.properties
│   │   └── producer-prod.properties
│   ├── consumers/
│   │   ├── consumer-dev.properties
│   │   ├── consumer-staging.properties
│   │   └── consumer-prod.properties
│   └── security/
│       ├── keystores/
│       ├── truststores/
│       └── jaas/
├── scripts/
│   ├── kafka-admin.sh
│   ├── kafka-producer.sh
│   ├── kafka-consumer.sh
│   └── kafka-monitor.sh
├── templates/
│   ├── docker-compose.yml
│   └── k8s/
└── docs/
    └── cluster-configs.md

💡Fichiers de Configuration par Environnement

1. Configuration Producteur

# configs/producers/producer-dev.properties
bootstrap.servers=kafka-dev-1:9092,kafka-dev-2:9092,kafka-dev-3:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Performance
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
compression.type=gzip
batch.size=16384
linger.ms=5

# Monitoring
client.id=producer-dev-${HOSTNAME}
# configs/producers/producer-prod.properties
bootstrap.servers=kafka-prod-1:9093,kafka-prod-2:9093,kafka-prod-3:9093
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Performance optimisée pour production
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
compression.type=lz4
batch.size=32768
linger.ms=10
buffer.memory=67108864

# Sécurité SASL/SSL
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/opt/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=${SSL_TRUSTSTORE_PASSWORD}
ssl.endpoint.identification.algorithm=

# Configuration JAAS
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="${KAFKA_USERNAME}" \
    password="${KAFKA_PASSWORD}";

# Monitoring
client.id=producer-prod-${HOSTNAME}

2. Configuration Consommateur

# configs/consumers/consumer-prod.properties
bootstrap.servers=kafka-prod-1:9093,kafka-prod-2:9093,kafka-prod-3:9093
group.id=${CONSUMER_GROUP_ID}
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest

# Performance
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
session.timeout.ms=30000
heartbeat.interval.ms=10000

# Sécurité SASL/SSL
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/opt/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=${SSL_TRUSTSTORE_PASSWORD}

# Configuration JAAS
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="${KAFKA_USERNAME}" \
    password="${KAFKA_PASSWORD}";

# Monitoring
client.id=consumer-prod-${HOSTNAME}

3. Configuration Cluster Spécifique

# configs/clusters/cluster-aws-prod.properties
bootstrap.servers=kafka-aws-1.example.com:9093,kafka-aws-2.example.com:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/opt/kafka/security/aws/kafka.truststore.jks
ssl.truststore.password=${AWS_SSL_TRUSTSTORE_PASSWORD}

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="${AWS_KAFKA_USERNAME}" \
    password="${AWS_KAFKA_PASSWORD}";
# configs/clusters/cluster-onprem-prod.properties
bootstrap.servers=kafka-onprem-1:9093,kafka-onprem-2:9093
security.protocol=SSL
ssl.truststore.location=/opt/kafka/security/onprem/truststore.jks
ssl.truststore.password=${ONPREM_SSL_TRUSTSTORE_PASSWORD}
ssl.keystore.location=/opt/kafka/security/onprem/keystore.jks
ssl.keystore.password=${ONPREM_SSL_KEYSTORE_PASSWORD}
ssl.key.password=${ONPREM_SSL_KEY_PASSWORD}

💡Scripts d'Automatisation

Script Principal d'Administration

#!/bin/bash
# scripts/kafka-admin.sh

set -e

# Configuration par défaut
DEFAULT_ENV="dev"
DEFAULT_CONFIG_DIR="$(dirname "$0")/../configs"

# Couleurs pour les logs
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

usage() {
    cat << EOF
Usage: $0 [OPTIONS] COMMAND [ARGS...]

Options:
    -e, --env ENV           Environment (dev|staging|prod) [default: $DEFAULT_ENV]
    -c, --config DIR        Configuration directory [default: $DEFAULT_CONFIG_DIR]
    -h, --help              Show this help message

Commands:
    topics                  Topic management
        list                List all topics
        create TOPIC        Create topic
        delete TOPIC        Delete topic
        describe TOPIC      Describe topic
        
    consumers               Consumer management
        groups              List consumer groups
        describe GROUP      Describe consumer group
        reset GROUP TOPIC   Reset consumer group offset
        
    cluster                 Cluster management
        info                Show cluster information
        health              Check cluster health
        
    produce                 Produce messages
        TOPIC [FILE]        Produce to topic (from file or stdin)
        
    consume                 Consume messages
        TOPIC [GROUP]       Consume from topic

Examples:
    $0 -e prod topics list
    $0 -e staging topics create my-topic
    $0 -e prod consume my-topic my-consumer-group
    $0 -e dev produce my-topic < messages.txt

EOF
}

log() {
    echo -e "${GREEN}[$(date +'%H:%M:%S')] $1${NC}"
}

warn() {
    echo -e "${YELLOW}[$(date +'%H:%M:%S')] WARNING: $1${NC}"
}

error() {
    echo -e "${RED}[$(date +'%H:%M:%S')] ERROR: $1${NC}"
    exit 1
}

# Parse command line arguments
ENV="$DEFAULT_ENV"
CONFIG_DIR="$DEFAULT_CONFIG_DIR"

while [[ $# -gt 0 ]]; do
    case $1 in
        -e|--env)
            ENV="$2"
            shift 2
            ;;
        -c|--config)
            CONFIG_DIR="$2"
            shift 2
            ;;
        -h|--help)
            usage
            exit 0
            ;;
        *)
            break
            ;;
    esac
done

# Validation de l'environnement
if [[ ! "$ENV" =~ ^(dev|staging|prod)$ ]]; then
    error "Invalid environment: $ENV. Must be dev, staging, or prod"
fi

# Chemins des fichiers de configuration
CLUSTER_CONFIG="$CONFIG_DIR/clusters/$ENV.properties"
PRODUCER_CONFIG="$CONFIG_DIR/producers/producer-$ENV.properties"
CONSUMER_CONFIG="$CONFIG_DIR/consumers/consumer-$ENV.properties"

# Vérification des fichiers de configuration
for config in "$CLUSTER_CONFIG" "$PRODUCER_CONFIG" "$CONSUMER_CONFIG"; do
    if [[ ! -f "$config" ]]; then
        error "Configuration file not found: $config"
    fi
done

log "Using environment: $ENV"

# Fonction pour exécuter les commandes Kafka
kafka_cmd() {
    local cmd="$1"
    shift
    
    case "$cmd" in
        "kafka-topics")
            kafka-topics.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
                --command-config "$CLUSTER_CONFIG" "$@"
            ;;
        "kafka-consumer-groups")
            kafka-consumer-groups.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
                --command-config "$CLUSTER_CONFIG" "$@"
            ;;
        "kafka-console-producer")
            kafka-console-producer.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
                --producer.config "$PRODUCER_CONFIG" "$@"
            ;;
        "kafka-console-consumer")
            kafka-console-consumer.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
                --consumer.config "$CONSUMER_CONFIG" "$@"
            ;;
        *)
            error "Unknown Kafka command: $cmd"
            ;;
    esac
}

# Commandes principales
case "${1:-help}" in
    "topics")
        case "${2:-list}" in
            "list")
                log "Listing topics for environment: $ENV"
                kafka_cmd kafka-topics --list
                ;;
            "create")
                [[ -z "$3" ]] && error "Topic name required"
                TOPIC="$3"
                PARTITIONS="${4:-3}"
                REPLICATION="${5:-3}"
                log "Creating topic: $TOPIC (partitions: $PARTITIONS, replication: $REPLICATION)"
                kafka_cmd kafka-topics --create --topic "$TOPIC" \
                    --partitions "$PARTITIONS" --replication-factor "$REPLICATION"
                ;;
            "delete")
                [[ -z "$3" ]] && error "Topic name required"
                TOPIC="$3"
                warn "Deleting topic: $TOPIC"
                read -p "Are you sure? (y/N): " -n 1 -r
                echo
                if [[ $REPLY =~ ^[Yy]$ ]]; then
                    kafka_cmd kafka-topics --delete --topic "$TOPIC"
                else
                    log "Operation cancelled"
                fi
                ;;
            "describe")
                [[ -z "$3" ]] && error "Topic name required"
                TOPIC="$3"
                log "Describing topic: $TOPIC"
                kafka_cmd kafka-topics --describe --topic "$TOPIC"
                ;;
            *)
                error "Unknown topics command: $2"
                ;;
        esac
        ;;
    
    "consumers")
        case "${2:-groups}" in
            "groups")
                log "Listing consumer groups for environment: $ENV"
                kafka_cmd kafka-consumer-groups --list
                ;;
            "describe")
                [[ -z "$3" ]] && error "Consumer group required"
                GROUP="$3"
                log "Describing consumer group: $GROUP"
                kafka_cmd kafka-consumer-groups --describe --group "$GROUP"
                ;;
            "reset")
                [[ -z "$3" ]] && error "Consumer group required"
                [[ -z "$4" ]] && error "Topic required"
                GROUP="$3"
                TOPIC="$4"
                warn "Resetting consumer group: $GROUP for topic: $TOPIC"
                kafka_cmd kafka-consumer-groups --reset-offsets --group "$GROUP" \
                    --topic "$TOPIC" --to-earliest --execute
                ;;
            *)
                error "Unknown consumers command: $2"
                ;;
        esac
        ;;
    
    "cluster")
        case "${2:-info}" in
            "info")
                log "Cluster information for environment: $ENV"
                kafka_cmd kafka-topics --describe --topics-with-overrides
                ;;
            "health")
                log "Checking cluster health for environment: $ENV"
                # Test de connectivité simple
                if kafka_cmd kafka-topics --list > /dev/null 2>&1; then
                    log "✓ Cluster is healthy"
                else
                    error "✗ Cluster is not accessible"
                fi
                ;;
            *)
                error "Unknown cluster command: $2"
                ;;
        esac
        ;;
    
    "produce")
        [[ -z "$2" ]] && error "Topic name required"
        TOPIC="$2"
        INPUT_FILE="$3"
        
        log "Producing messages to topic: $TOPIC"
        if [[ -n "$INPUT_FILE" && -f "$INPUT_FILE" ]]; then
            log "Reading from file: $INPUT_FILE"
            kafka_cmd kafka-console-producer --topic "$TOPIC" < "$INPUT_FILE"
        else
            log "Reading from stdin (Ctrl+C to stop)"
            kafka_cmd kafka-console-producer --topic "$TOPIC"
        fi
        ;;
    
    "consume")
        [[ -z "$2" ]] && error "Topic name required"
        TOPIC="$2"
        GROUP="${3:-kafka-admin-consumer-$(date +%s)}"
        
        log "Consuming messages from topic: $TOPIC with group: $GROUP"
        kafka_cmd kafka-console-consumer --topic "$TOPIC" --group "$GROUP" --from-beginning
        ;;
    
    "help"|*)
        usage
        ;;
esac

Script de Monitoring

#!/bin/bash
# scripts/kafka-monitor.sh

set -e

ENV="${1:-dev}"
CONFIG_DIR="$(dirname "$0")/../configs"
CLUSTER_CONFIG="$CONFIG_DIR/clusters/$ENV.properties"

# Fonction pour extraire les métriques
get_cluster_metrics() {
    local bootstrap_servers=$(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2)
    
    echo "=== Cluster Health Check ==="
    echo "Environment: $ENV"
    echo "Bootstrap Servers: $bootstrap_servers"
    echo
    
    # Test de connectivité
    echo "Testing connectivity..."
    if kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
        --command-config "$CLUSTER_CONFIG" --list > /dev/null 2>&1; then
        echo "✓ Cluster is accessible"
    else
        echo "✗ Cluster is not accessible"
        exit 1
    fi
    
    # Nombre de topics
    TOPIC_COUNT=$(kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
        --command-config "$CLUSTER_CONFIG" --list | wc -l)
    echo "Topics: $TOPIC_COUNT"
    
    # Consumer groups
    GROUP_COUNT=$(kafka-consumer-groups.sh --bootstrap-server "$bootstrap_servers" \
        --command-config "$CLUSTER_CONFIG" --list | wc -l)
    echo "Consumer Groups: $GROUP_COUNT"
    
    echo
    echo "=== Topics with Issues ==="
    
    # Topics avec des partitions sous-répliquées
    kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
        --command-config "$CLUSTER_CONFIG" --describe --under-replicated-partitions
    
    echo
    echo "=== Consumer Group Lag ==="
    
    # Lag des consumer groups
    kafka-consumer-groups.sh --bootstrap-server "$bootstrap_servers" \
        --command-config "$CLUSTER_CONFIG" --describe --all-groups | \
        grep -E "(GROUP|TOPIC|LAG)" | head -20
}

# Script de monitoring continu
monitor_continuous() {
    while true; do
        clear
        echo "Kafka Cluster Monitoring - $(date)"
        echo "========================================="
        get_cluster_metrics
        echo
        echo "Refreshing in 30 seconds... (Ctrl+C to stop)"
        sleep 30
    done
}

case "${2:-once}" in
    "once")
        get_cluster_metrics
        ;;
    "continuous")
        monitor_continuous
        ;;
    *)
        echo "Usage: $0 ENV [once|continuous]"
        echo "Example: $0 prod continuous"
        ;;
esac

💡Commandes Essentielles par Catégorie

Gestion des Topics

# Lister tous les topics
./kafka-admin.sh -e prod topics list

# Créer un topic avec configuration personnalisée
./kafka-admin.sh -e prod topics create user-events 6 3

# Décrire un topic en détail
./kafka-admin.sh -e prod topics describe user-events

# Modifier la configuration d'un topic
kafka-configs.sh --bootstrap-server localhost:9092 \
    --entity-type topics --entity-name user-events \
    --alter --add-config retention.ms=604800000

# Supprimer un topic (attention en production!)
./kafka-admin.sh -e staging topics delete test-topic

Gestion des Consumer Groups

# Lister tous les consumer groups
./kafka-admin.sh -e prod consumers groups

# Décrire un consumer group spécifique
./kafka-admin.sh -e prod consumers describe user-service-group

# Réinitialiser les offsets d'un consumer group
./kafka-admin.sh -e prod consumers reset user-service-group user-events

# Voir le lag de tous les consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --command-config configs/clusters/prod.properties \
    --describe --all-groups

Production et Consommation

# Produire des messages depuis un fichier
./kafka-admin.sh -e prod produce user-events messages.json

# Produire des messages interactivement
./kafka-admin.sh -e prod produce user-events

# Consommer des messages depuis le début
./kafka-admin.sh -e prod consume user-events user-debug-group

# Consommer avec un filtre (exemple avec jq)
./kafka-admin.sh -e prod consume user-events | jq '.userId'

💡Configuration Avancée avec Docker

Docker Compose Multi-Environnement

# templates/docker-compose.yml
version: '3.8'

services:
  kafka-client:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka-client-${ENV:-dev}
    environment:
      - KAFKA_ENV=${ENV:-dev}
      - KAFKA_USERNAME=${KAFKA_USERNAME}
      - KAFKA_PASSWORD=${KAFKA_PASSWORD}
      - SSL_TRUSTSTORE_PASSWORD=${SSL_TRUSTSTORE_PASSWORD}
    volumes:
      - ./configs:/opt/kafka/configs:ro
      - ./security:/opt/kafka/security:ro
      - ./scripts:/opt/kafka/scripts:ro
    working_dir: /opt/kafka
    command: tail -f /dev/null
    networks:
      - kafka-network

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui-${ENV:-dev}
    environment:
      KAFKA_CLUSTERS_0_NAME: ${ENV:-dev}
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: ${KAFKA_SECURITY_PROTOCOL:-PLAINTEXT}
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: ${KAFKA_SASL_MECHANISM:-}
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: ${KAFKA_SASL_JAAS_CONFIG:-}
    ports:
      - "${KAFKA_UI_PORT:-8080}:8080"
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

Variables d'Environnement par Cluster

# .env.dev
ENV=dev
KAFKA_BOOTSTRAP_SERVERS=kafka-dev-1:9092,kafka-dev-2:9092
KAFKA_SECURITY_PROTOCOL=PLAINTEXT
KAFKA_UI_PORT=8080

# .env.prod
ENV=prod
KAFKA_BOOTSTRAP_SERVERS=kafka-prod-1:9093,kafka-prod-2:9093
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_USERNAME=prod-user
KAFKA_PASSWORD=secure-password
SSL_TRUSTSTORE_PASSWORD=truststore-password
KAFKA_UI_PORT=8443

💡Sécurité et Bonnes Pratiques

Gestion des Certificats

#!/bin/bash
# scripts/setup-ssl.sh

SECURITY_DIR="configs/security"
CLUSTER_NAME="$1"

# Créer les répertoires
mkdir -p "$SECURITY_DIR/keystores"
mkdir -p "$SECURITY_DIR/truststores"

# Générer le truststore
keytool -keystore "$SECURITY_DIR/truststores/kafka.client.truststore.jks" \
    -alias CARoot -import -file ca-cert -storepass changeit -keypass changeit -noprompt

# Générer le keystore client
keytool -keystore "$SECURITY_DIR/keystores/kafka.client.keystore.jks" \
    -alias localhost -validity 365 -genkey -keyalg RSA \
    -storepass changeit -keypass changeit \
    -dname "CN=localhost, OU=Dev, O=Company, L=City, S=State, C=US"

echo "SSL certificates generated for cluster: $CLUSTER_NAME"

Audit et Monitoring

#!/bin/bash
# scripts/kafka-audit.sh

ENV="$1"
CONFIG_DIR="configs"
AUDIT_LOG="/var/log/kafka-audit.log"

audit_log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') [$ENV] $1" >> "$AUDIT_LOG"
}

# Audit des configurations
audit_log "Starting configuration audit"

# Vérifier les permissions des fichiers de configuration
find "$CONFIG_DIR" -name "*.properties" -exec ls -la {} \; | \
    while read -r line; do
        if [[ "$line" =~ "rw-rw-rw-" ]]; then
            audit_log "WARNING: Configuration file with world-writable permissions: $line"
        fi
    done

# Vérifier les mots de passe en clair
if grep -r "password=" "$CONFIG_DIR" | grep -v "password=\${"; then
    audit_log "WARNING: Plain text passwords found in configuration files"
fi

audit_log "Configuration audit completed"

💡Automatisation et CI/CD

Pipeline GitLab CI

# .gitlab-ci.yml
stages:
  - validate
  - test
  - deploy

variables:
  KAFKA_CONFIG_DIR: configs

validate-configs:
  stage: validate
  image: confluentinc/cp-kafka:7.4.0
  script:
    - echo "Validating Kafka configurations..."
    - for env in dev staging prod; do
        echo "Validating $env environment"
        if [[ ! -f "configs/clusters/$env.properties" ]]; then
          echo "Missing cluster config for $env"
          exit 1
        fi
      done
    - echo "All configurations are valid"

test-connectivity:
  stage: test
  image: confluentinc/cp-kafka:7.4.0
  script:
    - echo "Testing Kafka connectivity..."
    - ./scripts/kafka-admin.sh -e dev cluster health
  only:
    - main
    - develop

deploy-configs:
  stage: deploy
  script:
    - echo "Deploying Kafka configurations..."
    - rsync -av configs/ kafka-server:/opt/kafka/configs/
    - ssh kafka-server "systemctl reload kafka-client-configs"
  only:
    - main
  when: manual

💡Troubleshooting et Debugging

Script de Diagnostic

#!/bin/bash
# scripts/kafka-debug.sh

ENV="$1"
ISSUE_TYPE="$2"

debug_connection() {
    echo "=== Connection Debug ==="
    
    # Test de résolution DNS
    BOOTSTRAP_SERVERS=$(grep bootstrap.servers "configs/clusters/$ENV.properties" | cut -d'=' -f2)
    
    for server in $(echo "$BOOTSTRAP_SERVERS" | tr ',' ' '); do
        HOST=$(echo "$server" | cut -d':' -f1)
        PORT=$(echo "$server" | cut -d':' -f2)
        
        echo "Testing $HOST:$PORT..."
        
        if nslookup "$HOST" > /dev/null 2>&1; then
            echo "  ✓ DNS resolution OK"
        else
            echo "  ✗ DNS resolution failed"
        fi
        
        if nc -zv "$HOST" "$PORT" > /dev/null 2>&1; then
            echo "  ✓ Port $PORT is open"
        else
            echo "  ✗ Port $PORT is closed or filtered"
        fi
    done
}

debug_ssl() {
    echo "=== SSL Debug ==="
    
    TRUSTSTORE=$(grep ssl.truststore.location "configs/clusters/$ENV.properties" | cut -d'=' -f2)
    
    if [[ -f "$TRUSTSTORE" ]]; then
        echo "✓ Truststore file exists: $TRUSTSTORE"
        keytool -list -keystore "$TRUSTSTORE" -storepass changeit
    else
        echo "✗ Truststore file not found: $TRUSTSTORE"
    fi
}

debug_consumer_lag() {
    echo "=== Consumer Lag Debug ==="
    
    ./kafka-admin.sh -e "$ENV" consumers groups | while read -r group; do
        echo "Group: $group"
        ./kafka-admin.sh -e "$ENV" consumers describe "$group" | \
            grep -E "(TOPIC|LAG)" | head -10
        echo "---"
    done
}

case "$ISSUE_TYPE" in
    "connection")
        debug_connection
        ;;
    "ssl")
        debug_ssl
        ;;
    "lag")
        debug_consumer_lag
        ;;
    "all")
        debug_connection
        debug_ssl
        debug_consumer_lag
        ;;
    *)
        echo "Usage: $0 ENV [connection|ssl|lag|all]"
        echo "Example: $0 prod connection"
        ;;
esac

💡Conclusion

Cette approche structurée pour la gestion des commandes Kafka offre plusieurs avantages :

Bénéfices Clés

Bonnes Pratiques

Prochaines Étapes

  1. Adapter les configurations à vos environnements
  2. Personnaliser les scripts selon vos besoins
  3. Intégrer le monitoring dans votre stack
  4. Former les équipes à l'utilisation des scripts

Cette approche transforme la complexité de la gestion multi-clusters Kafka en un workflow simple et reproductible, essentiel pour les environnements de production.

Cet article fait partie de ma série sur Apache Kafka. Consultez mes autres guides pour approfondir vos connaissances en streaming et architecture distribuée.

À propos de l'auteur

Florian Courouge - Expert DevOps et Apache Kafka avec plus de 5 ans d'expérience dans l'architecture de systèmes distribués et l'automatisation d'infrastructures.

Cet article vous a été utile ?

Découvrez mes autres articles techniques ou contactez-moi pour discuter de vos projets DevOps et Kafka.