Scripts de Monitoring Kafka : Prêts à l'Emploi
Cet article contient des scripts complets et testés en production. Copiez, adaptez, déployez.
Ces scripts sont ceux que j'utilise en mission pour monitorer des clusters Kafka de 3 à 47 brokers. Ils sont battle-tested sur des environnements traitant plusieurs millions de messages par seconde.
1. Script de Vérification du Lag Consumer
Ce script vérifie le lag de tous les consumer groups et envoie des alertes si le lag dépasse un seuil.
#!/usr/bin/env python3
"""
Kafka Consumer Lag Monitor
Envoie des alertes Slack/Email si le lag dépasse le seuil configuré.
Usage:
python kafka_lag_monitor.py --bootstrap-server localhost:9092 --threshold 10000
"""
import argparse
import json
import subprocess
import sys
from datetime import datetime
from typing import Dict, List, Optional
import requests
class KafkaLagMonitor:
def __init__(self, bootstrap_servers: str, threshold: int = 10000):
self.bootstrap_servers = bootstrap_servers
self.threshold = threshold
self.alerts: List[Dict] = []
def get_consumer_groups(self) -> List[str]:
"""Liste tous les consumer groups."""
result = subprocess.run([
'kafka-consumer-groups.sh',
'--bootstrap-server', self.bootstrap_servers,
'--list'
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Erreur: {result.stderr}")
return [g.strip() for g in result.stdout.strip().split('\n') if g.strip()]
def get_group_lag(self, group_id: str) -> Dict:
"""Récupère le lag détaillé d'un consumer group."""
result = subprocess.run([
'kafka-consumer-groups.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe',
'--group', group_id
], capture_output=True, text=True)
if result.returncode != 0:
return {'group': group_id, 'error': result.stderr, 'total_lag': -1}
lines = result.stdout.strip().split('\n')
total_lag = 0
partitions = []
for line in lines[1:]: # Skip header
if not line.strip():
continue
parts = line.split()
if len(parts) >= 6:
try:
lag = int(parts[5]) if parts[5] != '-' else 0
total_lag += lag
partitions.append({
'topic': parts[1],
'partition': int(parts[2]),
'current_offset': parts[3],
'log_end_offset': parts[4],
'lag': lag,
'consumer_id': parts[6] if len(parts) > 6 else 'N/A'
})
except (ValueError, IndexError):
continue
return {
'group': group_id,
'total_lag': total_lag,
'partitions': partitions,
'partition_count': len(partitions)
}
def check_all_groups(self) -> List[Dict]:
"""Vérifie le lag de tous les consumer groups."""
groups = self.get_consumer_groups()
results = []
for group in groups:
lag_info = self.get_group_lag(group)
results.append(lag_info)
if lag_info['total_lag'] > self.threshold:
self.alerts.append({
'group': group,
'lag': lag_info['total_lag'],
'threshold': self.threshold,
'timestamp': datetime.now().isoformat()
})
return results
def send_slack_alert(self, webhook_url: str):
"""Envoie les alertes vers Slack."""
if not self.alerts:
return
blocks = [{
"type": "header",
"text": {
"type": "plain_text",
"text": "⚠️ Kafka Consumer Lag Alert"
}
}]
for alert in self.alerts:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Group:* `{alert['group']}`\n*Lag:* {alert['lag']:,} messages\n*Threshold:* {alert['threshold']:,}"
}
})
payload = {"blocks": blocks}
requests.post(webhook_url, json=payload)
def generate_report(self, results: List[Dict]) -> str:
"""Génère un rapport texte."""
report = []
report.append("=" * 60)
report.append(f"KAFKA CONSUMER LAG REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 60)
report.append("")
# Tri par lag décroissant
sorted_results = sorted(results, key=lambda x: x['total_lag'], reverse=True)
for r in sorted_results:
status = "🔴 CRITICAL" if r['total_lag'] > self.threshold else "🟢 OK"
report.append(f"{status} | {r['group']:<40} | Lag: {r['total_lag']:>10,}")
report.append("")
report.append(f"Total groups: {len(results)}")
report.append(f"Groups in alert: {len(self.alerts)}")
return '\n'.join(report)
def main():
parser = argparse.ArgumentParser(description='Monitor Kafka consumer lag')
parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
parser.add_argument('--threshold', type=int, default=10000, help='Lag threshold for alerts')
parser.add_argument('--slack-webhook', help='Slack webhook URL for alerts')
parser.add_argument('--json', action='store_true', help='Output as JSON')
args = parser.parse_args()
monitor = KafkaLagMonitor(args.bootstrap_server, args.threshold)
try:
results = monitor.check_all_groups()
if args.json:
print(json.dumps(results, indent=2))
else:
print(monitor.generate_report(results))
if args.slack_webhook and monitor.alerts:
monitor.send_slack_alert(args.slack_webhook)
print(f"\n📤 {len(monitor.alerts)} alertes envoyées vers Slack")
# Exit code basé sur les alertes
sys.exit(1 if monitor.alerts else 0)
except Exception as e:
print(f"❌ Erreur: {e}", file=sys.stderr)
sys.exit(2)
if __name__ == "__main__":
main()
2. Health Check Complet du Cluster
Script qui vérifie la santé globale du cluster : brokers, topics, réplication.
#!/usr/bin/env python3
"""
Kafka Cluster Health Check
Vérifie brokers, topics, réplication, et ISR.
Usage:
python kafka_health_check.py --bootstrap-server localhost:9092
"""
import argparse
import json
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class HealthStatus:
component: str
status: str # OK, WARNING, CRITICAL
message: str
details: Optional[Dict] = None
class KafkaHealthChecker:
def __init__(self, bootstrap_servers: str, zookeeper: str = None):
self.bootstrap_servers = bootstrap_servers
self.zookeeper = zookeeper
self.checks: List[HealthStatus] = []
def run_kafka_command(self, args: List[str]) -> subprocess.CompletedProcess:
"""Exécute une commande Kafka."""
return subprocess.run(args, capture_output=True, text=True, timeout=30)
def check_brokers(self) -> HealthStatus:
"""Vérifie l'état des brokers."""
result = self.run_kafka_command([
'kafka-broker-api-versions.sh',
'--bootstrap-server', self.bootstrap_servers
])
if result.returncode != 0:
return HealthStatus(
component="Brokers",
status="CRITICAL",
message="Impossible de contacter les brokers",
details={"error": result.stderr}
)
# Compter les brokers
broker_count = result.stdout.count("ApiVersion")
if broker_count == 0:
return HealthStatus(
component="Brokers",
status="CRITICAL",
message="Aucun broker accessible"
)
return HealthStatus(
component="Brokers",
status="OK",
message=f"{broker_count} broker(s) accessible(s)",
details={"broker_count": broker_count}
)
def check_under_replicated_partitions(self) -> HealthStatus:
"""Vérifie les partitions sous-répliquées."""
result = self.run_kafka_command([
'kafka-topics.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe',
'--under-replicated-partitions'
])
if result.returncode != 0:
return HealthStatus(
component="Replication",
status="WARNING",
message="Impossible de vérifier la réplication",
details={"error": result.stderr}
)
urp_count = len([l for l in result.stdout.split('\n') if l.strip()])
if urp_count > 0:
return HealthStatus(
component="Replication",
status="CRITICAL",
message=f"{urp_count} partition(s) sous-répliquée(s)",
details={"under_replicated": urp_count}
)
return HealthStatus(
component="Replication",
status="OK",
message="Toutes les partitions sont répliquées"
)
def check_offline_partitions(self) -> HealthStatus:
"""Vérifie les partitions offline (sans leader)."""
result = self.run_kafka_command([
'kafka-topics.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe',
'--unavailable-partitions'
])
if result.returncode != 0:
return HealthStatus(
component="Partitions",
status="WARNING",
message="Impossible de vérifier les partitions"
)
offline_count = len([l for l in result.stdout.split('\n') if l.strip()])
if offline_count > 0:
return HealthStatus(
component="Partitions",
status="CRITICAL",
message=f"{offline_count} partition(s) OFFLINE!",
details={"offline": offline_count}
)
return HealthStatus(
component="Partitions",
status="OK",
message="Toutes les partitions ont un leader"
)
def check_consumer_groups(self) -> HealthStatus:
"""Vérifie l'état des consumer groups."""
result = self.run_kafka_command([
'kafka-consumer-groups.sh',
'--bootstrap-server', self.bootstrap_servers,
'--list'
])
if result.returncode != 0:
return HealthStatus(
component="Consumer Groups",
status="WARNING",
message="Impossible de lister les consumer groups"
)
groups = [g.strip() for g in result.stdout.split('\n') if g.strip()]
return HealthStatus(
component="Consumer Groups",
status="OK",
message=f"{len(groups)} consumer group(s) actif(s)",
details={"group_count": len(groups)}
)
def run_all_checks(self) -> List[HealthStatus]:
"""Exécute tous les health checks."""
self.checks = [
self.check_brokers(),
self.check_under_replicated_partitions(),
self.check_offline_partitions(),
self.check_consumer_groups()
]
return self.checks
def get_overall_status(self) -> str:
"""Retourne le statut global."""
statuses = [c.status for c in self.checks]
if "CRITICAL" in statuses:
return "CRITICAL"
elif "WARNING" in statuses:
return "WARNING"
return "OK"
def generate_report(self) -> str:
"""Génère un rapport de santé."""
status_icons = {
"OK": "🟢",
"WARNING": "🟡",
"CRITICAL": "🔴"
}
report = []
report.append("=" * 60)
report.append(f"KAFKA HEALTH CHECK - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Cluster: {self.bootstrap_servers}")
report.append("=" * 60)
report.append("")
for check in self.checks:
icon = status_icons.get(check.status, "⚪")
report.append(f"{icon} {check.component:<20} | {check.status:<10} | {check.message}")
report.append("")
report.append("-" * 60)
overall = self.get_overall_status()
report.append(f"OVERALL STATUS: {status_icons[overall]} {overall}")
return '\n'.join(report)
def to_json(self) -> str:
"""Exporte les résultats en JSON."""
return json.dumps({
"timestamp": datetime.now().isoformat(),
"bootstrap_servers": self.bootstrap_servers,
"overall_status": self.get_overall_status(),
"checks": [asdict(c) for c in self.checks]
}, indent=2)
def main():
parser = argparse.ArgumentParser(description='Kafka cluster health check')
parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
parser.add_argument('--json', action='store_true', help='Output as JSON')
args = parser.parse_args()
checker = KafkaHealthChecker(args.bootstrap_server)
try:
checker.run_all_checks()
if args.json:
print(checker.to_json())
else:
print(checker.generate_report())
# Exit code basé sur le statut
status = checker.get_overall_status()
sys.exit(0 if status == "OK" else (1 if status == "WARNING" else 2))
except Exception as e:
print(f"❌ Erreur: {e}", file=sys.stderr)
sys.exit(3)
if __name__ == "__main__":
main()
3. Exporteur Prometheus pour Métriques Custom
#!/usr/bin/env python3
"""
Exporteur Prometheus pour métriques Kafka personnalisées.
Expose les métriques sur :8000/metrics
Métriques exposées:
- kafka_consumer_lag_total
- kafka_topic_partition_count
- kafka_broker_count
- kafka_under_replicated_partitions
"""
from prometheus_client import start_http_server, Gauge, Counter, Info
import subprocess
import time
import argparse
# Définition des métriques
BROKER_COUNT = Gauge('kafka_broker_count', 'Number of active brokers')
TOPIC_COUNT = Gauge('kafka_topic_count', 'Number of topics')
PARTITION_COUNT = Gauge('kafka_partition_count', 'Total number of partitions')
UNDER_REPLICATED = Gauge('kafka_under_replicated_partitions', 'Under-replicated partitions')
CONSUMER_LAG = Gauge('kafka_consumer_lag_total', 'Consumer group lag', ['group', 'topic'])
GROUP_COUNT = Gauge('kafka_consumer_group_count', 'Number of consumer groups')
class KafkaMetricsCollector:
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
def collect_broker_metrics(self):
"""Collecte les métriques des brokers."""
try:
result = subprocess.run([
'kafka-broker-api-versions.sh',
'--bootstrap-server', self.bootstrap_servers
], capture_output=True, text=True, timeout=30)
broker_count = result.stdout.count("ApiVersion") if result.returncode == 0 else 0
BROKER_COUNT.set(broker_count)
except Exception as e:
print(f"Erreur collecte brokers: {e}")
def collect_topic_metrics(self):
"""Collecte les métriques des topics."""
try:
result = subprocess.run([
'kafka-topics.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe'
], capture_output=True, text=True, timeout=60)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
topics = set()
partitions = 0
for line in lines:
if line.startswith('Topic:'):
parts = line.split('\t')
for part in parts:
if part.startswith('Topic:'):
topics.add(part.split(':')[1].strip())
elif part.startswith('PartitionCount:'):
partitions += int(part.split(':')[1].strip())
TOPIC_COUNT.set(len(topics))
PARTITION_COUNT.set(partitions)
except Exception as e:
print(f"Erreur collecte topics: {e}")
def collect_replication_metrics(self):
"""Collecte les métriques de réplication."""
try:
result = subprocess.run([
'kafka-topics.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe',
'--under-replicated-partitions'
], capture_output=True, text=True, timeout=30)
urp = len([l for l in result.stdout.split('\n') if l.strip()]) if result.returncode == 0 else -1
UNDER_REPLICATED.set(urp)
except Exception as e:
print(f"Erreur collecte réplication: {e}")
def collect_consumer_lag(self):
"""Collecte le lag des consumer groups."""
try:
# Liste des groups
result = subprocess.run([
'kafka-consumer-groups.sh',
'--bootstrap-server', self.bootstrap_servers,
'--list'
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return
groups = [g.strip() for g in result.stdout.split('\n') if g.strip()]
GROUP_COUNT.set(len(groups))
# Lag par groupe
for group in groups[:20]: # Limiter pour éviter timeout
try:
result = subprocess.run([
'kafka-consumer-groups.sh',
'--bootstrap-server', self.bootstrap_servers,
'--describe',
'--group', group
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lag_by_topic = {}
for line in result.stdout.split('\n')[1:]:
parts = line.split()
if len(parts) >= 6:
topic = parts[1]
try:
lag = int(parts[5]) if parts[5] != '-' else 0
lag_by_topic[topic] = lag_by_topic.get(topic, 0) + lag
except ValueError:
continue
for topic, lag in lag_by_topic.items():
CONSUMER_LAG.labels(group=group, topic=topic).set(lag)
except Exception:
continue
except Exception as e:
print(f"Erreur collecte lag: {e}")
def collect_all(self):
"""Collecte toutes les métriques."""
self.collect_broker_metrics()
self.collect_topic_metrics()
self.collect_replication_metrics()
self.collect_consumer_lag()
def main():
parser = argparse.ArgumentParser(description='Kafka Prometheus exporter')
parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
parser.add_argument('--port', type=int, default=8000, help='Prometheus metrics port')
parser.add_argument('--interval', type=int, default=30, help='Collection interval in seconds')
args = parser.parse_args()
# Démarrer le serveur HTTP
start_http_server(args.port)
print(f"📊 Prometheus exporter started on :{args.port}/metrics")
collector = KafkaMetricsCollector(args.bootstrap_server)
while True:
try:
collector.collect_all()
print(f"✅ Métriques collectées - {time.strftime('%H:%M:%S')}")
except Exception as e:
print(f"❌ Erreur: {e}")
time.sleep(args.interval)
if __name__ == "__main__":
main()
4. Script Bash de Diagnostic Rapide
Pour un diagnostic rapide sans Python :
#!/bin/bash
# Diagnostic rapide d'un cluster Kafka
# Usage: ./kafka_quick_check.sh localhost:9092
BOOTSTRAP=${1:-localhost:9092}
echo "========================================"
echo "KAFKA QUICK DIAGNOSTIC"
echo "Bootstrap: $BOOTSTRAP"
echo "Date: $(date)"
echo "========================================"
echo ""
# 1. Test de connectivité
echo "🔍 Test de connectivité..."
if kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP > /dev/null 2>&1; then
echo "✅ Cluster accessible"
BROKER_COUNT=$(kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP 2>/dev/null | grep -c "ApiVersion")
echo " Brokers détectés: $BROKER_COUNT"
else
echo "❌ ERREUR: Impossible de contacter le cluster"
exit 1
fi
echo ""
# 2. Partitions sous-répliquées
echo "🔍 Vérification de la réplication..."
URP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:")
if [ "$URP" -eq 0 ]; then
echo "✅ Aucune partition sous-répliquée"
else
echo "⚠️ $URP partition(s) sous-répliquée(s)"
kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --under-replicated-partitions 2>/dev/null
fi
echo ""
# 3. Partitions offline
echo "🔍 Vérification des partitions offline..."
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --unavailable-partitions 2>/dev/null | grep -c "Topic:")
if [ "$OFFLINE" -eq 0 ]; then
echo "✅ Aucune partition offline"
else
echo "🔴 CRITIQUE: $OFFLINE partition(s) OFFLINE!"
kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --unavailable-partitions 2>/dev/null
fi
echo ""
# 4. Consumer groups avec lag
echo "🔍 Consumer groups avec lag élevé (>10000)..."
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | while read group; do
if [ -n "$group" ]; then
LAG=$(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --describe --group "$group" 2>/dev/null | awk 'NR>1 && $6 != "-" {sum+=$6} END {print sum+0}')
if [ "$LAG" -gt 10000 ]; then
echo "⚠️ $group: lag=$LAG"
fi
fi
done
echo ""
# 5. Résumé
echo "========================================"
echo "RÉSUMÉ"
echo "========================================"
TOPIC_COUNT=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | wc -l)
GROUP_COUNT=$(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | wc -l)
echo "Topics: $TOPIC_COUNT"
echo "Consumer groups: $GROUP_COUNT"
echo "Brokers: $BROKER_COUNT"
echo "Under-replicated: $URP"
echo "Offline: $OFFLINE"
if [ "$OFFLINE" -gt 0 ]; then
echo ""
echo "🔴 STATUS: CRITICAL"
exit 2
elif [ "$URP" -gt 0 ]; then
echo ""
echo "🟡 STATUS: WARNING"
exit 1
else
echo ""
echo "🟢 STATUS: OK"
exit 0
fi
Comment Utiliser ces Scripts
Installation des dépendances
pip install prometheus_client requests
Exemples d'utilisation
# Monitoring du lag
python kafka_lag_monitor.py --bootstrap-server kafka:9092 --threshold 5000
# Avec alertes Slack
python kafka_lag_monitor.py --bootstrap-server kafka:9092 \
--slack-webhook https://hooks.slack.com/services/XXX
# Health check
python kafka_health_check.py --bootstrap-server kafka:9092
# Exporteur Prometheus (daemon)
python kafka_prometheus_exporter.py --bootstrap-server kafka:9092 --port 8000
# Diagnostic rapide bash
./kafka_quick_check.sh kafka:9092
Intégration Cron
# Crontab pour vérification toutes les 5 minutes
*/5 * * * * /opt/scripts/kafka_health_check.py --bootstrap-server kafka:9092 --json >> /var/log/kafka-health.log
Dashboard Grafana (JSON)
Un dashboard Grafana compatible avec l'exporteur Prometheus est disponible dans la section Ressources de votre espace membre.
Ces scripts sont fournis "as is" et testés sur Kafka 2.8+ et 3.x. Adaptez les chemins des binaires Kafka selon votre installation.