Kafka Cheat Sheet : Commandes Essentielles et Configuration Multi-Clusters
Guide complet des commandes Kafka avec configurations avancées : gestion multi-clusters, sécurité SASL/SSL, scripts d'automatisation et bonnes pratiques.
Publié le
16 décembre 2024
Lecture
12 min
Vues
0
Auteur
Florian Courouge
Kafka
CLI
Configuration
SASL
SSL
Multi-Cluster
Cheat Sheet
Table des matières
📋 Vue d'ensemble rapide des sujets traités dans cet article
Cliquez sur les sections ci-dessous pour naviguer rapidement
Kafka Cheat Sheet : Commandes Essentielles et Configuration Multi-Clusters
Apache Kafka est un système de streaming distribué puissant, mais sa gestion peut devenir complexe avec plusieurs clusters ayant des configurations différentes. Ce guide présente une approche structurée pour simplifier l'utilisation de Kafka avec des fichiers de configuration centralisés et des scripts d'automatisation.
💡Introduction : Défis de la Gestion Multi-Clusters
Pour simplifier l'utilisation de Kafka avec plusieurs clusters ayant des configurations différentes (par exemple, pour gérer la sécurité avec SASL, SSL, et différentes configurations JAAS), il est recommandé de centraliser les configurations dans des fichiers de configuration, et d'utiliser des variables d'environnement pour gérer les paramètres dynamiques.
Problématiques Courantes
•Configurations multiples : Développement, staging, production
•Sécurité variée : SASL, SSL, mTLS selon l'environnement
•Gestion des certificats : Différents keystores et truststores
•Scripts répétitifs : Commandes longues et sujettes aux erreurs
#!/bin/bash
# scripts/kafka-admin.sh
set -e
# Configuration par défaut
DEFAULT_ENV="dev"
DEFAULT_CONFIG_DIR="$(dirname "$0")/../configs"
# Couleurs pour les logs
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
usage() {
cat << EOF
Usage: $0 [OPTIONS] COMMAND [ARGS...]
Options:
-e, --env ENV Environment (dev|staging|prod) [default: $DEFAULT_ENV]
-c, --config DIR Configuration directory [default: $DEFAULT_CONFIG_DIR]
-h, --help Show this help message
Commands:
topics Topic management
list List all topics
create TOPIC Create topic
delete TOPIC Delete topic
describe TOPIC Describe topic
consumers Consumer management
groups List consumer groups
describe GROUP Describe consumer group
reset GROUP TOPIC Reset consumer group offset
cluster Cluster management
info Show cluster information
health Check cluster health
produce Produce messages
TOPIC [FILE] Produce to topic (from file or stdin)
consume Consume messages
TOPIC [GROUP] Consume from topic
Examples:
$0 -e prod topics list
$0 -e staging topics create my-topic
$0 -e prod consume my-topic my-consumer-group
$0 -e dev produce my-topic < messages.txt
EOF
}
log() {
echo -e "${GREEN}[$(date +'%H:%M:%S')] $1${NC}"
}
warn() {
echo -e "${YELLOW}[$(date +'%H:%M:%S')] WARNING: $1${NC}"
}
error() {
echo -e "${RED}[$(date +'%H:%M:%S')] ERROR: $1${NC}"
exit 1
}
# Parse command line arguments
ENV="$DEFAULT_ENV"
CONFIG_DIR="$DEFAULT_CONFIG_DIR"
while [[ $# -gt 0 ]]; do
case $1 in
-e|--env)
ENV="$2"
shift 2
;;
-c|--config)
CONFIG_DIR="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
break
;;
esac
done
# Validation de l'environnement
if [[ ! "$ENV" =~ ^(dev|staging|prod)$ ]]; then
error "Invalid environment: $ENV. Must be dev, staging, or prod"
fi
# Chemins des fichiers de configuration
CLUSTER_CONFIG="$CONFIG_DIR/clusters/$ENV.properties"
PRODUCER_CONFIG="$CONFIG_DIR/producers/producer-$ENV.properties"
CONSUMER_CONFIG="$CONFIG_DIR/consumers/consumer-$ENV.properties"
# Vérification des fichiers de configuration
for config in "$CLUSTER_CONFIG" "$PRODUCER_CONFIG" "$CONSUMER_CONFIG"; do
if [[ ! -f "$config" ]]; then
error "Configuration file not found: $config"
fi
done
log "Using environment: $ENV"
# Fonction pour exécuter les commandes Kafka
kafka_cmd() {
local cmd="$1"
shift
case "$cmd" in
"kafka-topics")
kafka-topics.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
--command-config "$CLUSTER_CONFIG" "$@"
;;
"kafka-consumer-groups")
kafka-consumer-groups.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
--command-config "$CLUSTER_CONFIG" "$@"
;;
"kafka-console-producer")
kafka-console-producer.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
--producer.config "$PRODUCER_CONFIG" "$@"
;;
"kafka-console-consumer")
kafka-console-consumer.sh --bootstrap-server $(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2) \
--consumer.config "$CONSUMER_CONFIG" "$@"
;;
*)
error "Unknown Kafka command: $cmd"
;;
esac
}
# Commandes principales
case "${1:-help}" in
"topics")
case "${2:-list}" in
"list")
log "Listing topics for environment: $ENV"
kafka_cmd kafka-topics --list
;;
"create")
[[ -z "$3" ]] && error "Topic name required"
TOPIC="$3"
PARTITIONS="${4:-3}"
REPLICATION="${5:-3}"
log "Creating topic: $TOPIC (partitions: $PARTITIONS, replication: $REPLICATION)"
kafka_cmd kafka-topics --create --topic "$TOPIC" \
--partitions "$PARTITIONS" --replication-factor "$REPLICATION"
;;
"delete")
[[ -z "$3" ]] && error "Topic name required"
TOPIC="$3"
warn "Deleting topic: $TOPIC"
read -p "Are you sure? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
kafka_cmd kafka-topics --delete --topic "$TOPIC"
else
log "Operation cancelled"
fi
;;
"describe")
[[ -z "$3" ]] && error "Topic name required"
TOPIC="$3"
log "Describing topic: $TOPIC"
kafka_cmd kafka-topics --describe --topic "$TOPIC"
;;
*)
error "Unknown topics command: $2"
;;
esac
;;
"consumers")
case "${2:-groups}" in
"groups")
log "Listing consumer groups for environment: $ENV"
kafka_cmd kafka-consumer-groups --list
;;
"describe")
[[ -z "$3" ]] && error "Consumer group required"
GROUP="$3"
log "Describing consumer group: $GROUP"
kafka_cmd kafka-consumer-groups --describe --group "$GROUP"
;;
"reset")
[[ -z "$3" ]] && error "Consumer group required"
[[ -z "$4" ]] && error "Topic required"
GROUP="$3"
TOPIC="$4"
warn "Resetting consumer group: $GROUP for topic: $TOPIC"
kafka_cmd kafka-consumer-groups --reset-offsets --group "$GROUP" \
--topic "$TOPIC" --to-earliest --execute
;;
*)
error "Unknown consumers command: $2"
;;
esac
;;
"cluster")
case "${2:-info}" in
"info")
log "Cluster information for environment: $ENV"
kafka_cmd kafka-topics --describe --topics-with-overrides
;;
"health")
log "Checking cluster health for environment: $ENV"
# Test de connectivité simple
if kafka_cmd kafka-topics --list > /dev/null 2>&1; then
log "✓ Cluster is healthy"
else
error "✗ Cluster is not accessible"
fi
;;
*)
error "Unknown cluster command: $2"
;;
esac
;;
"produce")
[[ -z "$2" ]] && error "Topic name required"
TOPIC="$2"
INPUT_FILE="$3"
log "Producing messages to topic: $TOPIC"
if [[ -n "$INPUT_FILE" && -f "$INPUT_FILE" ]]; then
log "Reading from file: $INPUT_FILE"
kafka_cmd kafka-console-producer --topic "$TOPIC" < "$INPUT_FILE"
else
log "Reading from stdin (Ctrl+C to stop)"
kafka_cmd kafka-console-producer --topic "$TOPIC"
fi
;;
"consume")
[[ -z "$2" ]] && error "Topic name required"
TOPIC="$2"
GROUP="${3:-kafka-admin-consumer-$(date +%s)}"
log "Consuming messages from topic: $TOPIC with group: $GROUP"
kafka_cmd kafka-console-consumer --topic "$TOPIC" --group "$GROUP" --from-beginning
;;
"help"|*)
usage
;;
esac
Script de Monitoring
#!/bin/bash
# scripts/kafka-monitor.sh
set -e
ENV="${1:-dev}"
CONFIG_DIR="$(dirname "$0")/../configs"
CLUSTER_CONFIG="$CONFIG_DIR/clusters/$ENV.properties"
# Fonction pour extraire les métriques
get_cluster_metrics() {
local bootstrap_servers=$(grep bootstrap.servers "$CLUSTER_CONFIG" | cut -d'=' -f2)
echo "=== Cluster Health Check ==="
echo "Environment: $ENV"
echo "Bootstrap Servers: $bootstrap_servers"
echo
# Test de connectivité
echo "Testing connectivity..."
if kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
--command-config "$CLUSTER_CONFIG" --list > /dev/null 2>&1; then
echo "✓ Cluster is accessible"
else
echo "✗ Cluster is not accessible"
exit 1
fi
# Nombre de topics
TOPIC_COUNT=$(kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
--command-config "$CLUSTER_CONFIG" --list | wc -l)
echo "Topics: $TOPIC_COUNT"
# Consumer groups
GROUP_COUNT=$(kafka-consumer-groups.sh --bootstrap-server "$bootstrap_servers" \
--command-config "$CLUSTER_CONFIG" --list | wc -l)
echo "Consumer Groups: $GROUP_COUNT"
echo
echo "=== Topics with Issues ==="
# Topics avec des partitions sous-répliquées
kafka-topics.sh --bootstrap-server "$bootstrap_servers" \
--command-config "$CLUSTER_CONFIG" --describe --under-replicated-partitions
echo
echo "=== Consumer Group Lag ==="
# Lag des consumer groups
kafka-consumer-groups.sh --bootstrap-server "$bootstrap_servers" \
--command-config "$CLUSTER_CONFIG" --describe --all-groups | \
grep -E "(GROUP|TOPIC|LAG)" | head -20
}
# Script de monitoring continu
monitor_continuous() {
while true; do
clear
echo "Kafka Cluster Monitoring - $(date)"
echo "========================================="
get_cluster_metrics
echo
echo "Refreshing in 30 seconds... (Ctrl+C to stop)"
sleep 30
done
}
case "${2:-once}" in
"once")
get_cluster_metrics
;;
"continuous")
monitor_continuous
;;
*)
echo "Usage: $0 ENV [once|continuous]"
echo "Example: $0 prod continuous"
;;
esac
💡Commandes Essentielles par Catégorie
Gestion des Topics
# Lister tous les topics
./kafka-admin.sh -e prod topics list
# Créer un topic avec configuration personnalisée
./kafka-admin.sh -e prod topics create user-events 6 3
# Décrire un topic en détail
./kafka-admin.sh -e prod topics describe user-events
# Modifier la configuration d'un topic
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name user-events \
--alter --add-config retention.ms=604800000
# Supprimer un topic (attention en production!)
./kafka-admin.sh -e staging topics delete test-topic
Gestion des Consumer Groups
# Lister tous les consumer groups
./kafka-admin.sh -e prod consumers groups
# Décrire un consumer group spécifique
./kafka-admin.sh -e prod consumers describe user-service-group
# Réinitialiser les offsets d'un consumer group
./kafka-admin.sh -e prod consumers reset user-service-group user-events
# Voir le lag de tous les consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--command-config configs/clusters/prod.properties \
--describe --all-groups
Production et Consommation
# Produire des messages depuis un fichier
./kafka-admin.sh -e prod produce user-events messages.json
# Produire des messages interactivement
./kafka-admin.sh -e prod produce user-events
# Consommer des messages depuis le début
./kafka-admin.sh -e prod consume user-events user-debug-group
# Consommer avec un filtre (exemple avec jq)
./kafka-admin.sh -e prod consume user-events | jq '.userId'
#!/bin/bash
# scripts/kafka-audit.sh
ENV="$1"
CONFIG_DIR="configs"
AUDIT_LOG="/var/log/kafka-audit.log"
audit_log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') [$ENV] $1" >> "$AUDIT_LOG"
}
# Audit des configurations
audit_log "Starting configuration audit"
# Vérifier les permissions des fichiers de configuration
find "$CONFIG_DIR" -name "*.properties" -exec ls -la {} \; | \
while read -r line; do
if [[ "$line" =~ "rw-rw-rw-" ]]; then
audit_log "WARNING: Configuration file with world-writable permissions: $line"
fi
done
# Vérifier les mots de passe en clair
if grep -r "password=" "$CONFIG_DIR" | grep -v "password=\${"; then
audit_log "WARNING: Plain text passwords found in configuration files"
fi
audit_log "Configuration audit completed"
💡Automatisation et CI/CD
Pipeline GitLab CI
# .gitlab-ci.yml
stages:
- validate
- test
- deploy
variables:
KAFKA_CONFIG_DIR: configs
validate-configs:
stage: validate
image: confluentinc/cp-kafka:7.4.0
script:
- echo "Validating Kafka configurations..."
- for env in dev staging prod; do
echo "Validating $env environment"
if [[ ! -f "configs/clusters/$env.properties" ]]; then
echo "Missing cluster config for $env"
exit 1
fi
done
- echo "All configurations are valid"
test-connectivity:
stage: test
image: confluentinc/cp-kafka:7.4.0
script:
- echo "Testing Kafka connectivity..."
- ./scripts/kafka-admin.sh -e dev cluster health
only:
- main
- develop
deploy-configs:
stage: deploy
script:
- echo "Deploying Kafka configurations..."
- rsync -av configs/ kafka-server:/opt/kafka/configs/
- ssh kafka-server "systemctl reload kafka-client-configs"
only:
- main
when: manual
💡Troubleshooting et Debugging
Script de Diagnostic
#!/bin/bash
# scripts/kafka-debug.sh
ENV="$1"
ISSUE_TYPE="$2"
debug_connection() {
echo "=== Connection Debug ==="
# Test de résolution DNS
BOOTSTRAP_SERVERS=$(grep bootstrap.servers "configs/clusters/$ENV.properties" | cut -d'=' -f2)
for server in $(echo "$BOOTSTRAP_SERVERS" | tr ',' ' '); do
HOST=$(echo "$server" | cut -d':' -f1)
PORT=$(echo "$server" | cut -d':' -f2)
echo "Testing $HOST:$PORT..."
if nslookup "$HOST" > /dev/null 2>&1; then
echo " ✓ DNS resolution OK"
else
echo " ✗ DNS resolution failed"
fi
if nc -zv "$HOST" "$PORT" > /dev/null 2>&1; then
echo " ✓ Port $PORT is open"
else
echo " ✗ Port $PORT is closed or filtered"
fi
done
}
debug_ssl() {
echo "=== SSL Debug ==="
TRUSTSTORE=$(grep ssl.truststore.location "configs/clusters/$ENV.properties" | cut -d'=' -f2)
if [[ -f "$TRUSTSTORE" ]]; then
echo "✓ Truststore file exists: $TRUSTSTORE"
keytool -list -keystore "$TRUSTSTORE" -storepass changeit
else
echo "✗ Truststore file not found: $TRUSTSTORE"
fi
}
debug_consumer_lag() {
echo "=== Consumer Lag Debug ==="
./kafka-admin.sh -e "$ENV" consumers groups | while read -r group; do
echo "Group: $group"
./kafka-admin.sh -e "$ENV" consumers describe "$group" | \
grep -E "(TOPIC|LAG)" | head -10
echo "---"
done
}
case "$ISSUE_TYPE" in
"connection")
debug_connection
;;
"ssl")
debug_ssl
;;
"lag")
debug_consumer_lag
;;
"all")
debug_connection
debug_ssl
debug_consumer_lag
;;
*)
echo "Usage: $0 ENV [connection|ssl|lag|all]"
echo "Example: $0 prod connection"
;;
esac
💡Conclusion
Cette approche structurée pour la gestion des commandes Kafka offre plusieurs avantages :
Bénéfices Clés
•🔧 Simplicité : Une seule commande pour tous les environnements
•🔒 Sécurité : Configurations centralisées et sécurisées
•📊 Monitoring : Scripts de surveillance intégrés
•🚀 Automatisation : Intégration CI/CD simplifiée
Bonnes Pratiques
•Versioning des configurations : Git pour tracer les changements
•Variables d'environnement : Secrets sécurisés
•Audit et logging : Traçabilité des opérations
•Tests automatisés : Validation des configurations
Prochaines Étapes
•Adapter les configurations à vos environnements
•Personnaliser les scripts selon vos besoins
•Intégrer le monitoring dans votre stack
•Former les équipes à l'utilisation des scripts
Cette approche transforme la complexité de la gestion multi-clusters Kafka en un workflow simple et reproductible, essentiel pour les environnements de production.
Cet article fait partie de ma série sur Apache Kafka. Consultez mes autres guides pour approfondir vos connaissances en streaming et architecture distribuée.
À propos de l'auteur
Florian Courouge - Expert DevOps et Apache Kafka avec plus de 5 ans d'expérience dans l'architecture de systèmes distribués et l'automatisation d'infrastructures.
Cet article vous a été utile ?
Découvrez mes autres articles techniques ou contactez-moi pour discuter de vos projets DevOps et Kafka.