kafka
Intermédiaire

Scripts de Monitoring Kafka : Copier-Coller et Prêts à l'Emploi

Scripts de Monitoring Kafka : Copier-Coller et Prêts à l'Emploi

Collection de scripts Python et Bash pour monitorer vos clusters Kafka. Alertes lag, health checks, métriques JMX - tout est inclus.

Florian Courouge
15 min de lecture
3,050 mots
0 vues
#Kafka
#Monitoring
#Scripts
#Python
#Prometheus
#Alerting

Scripts de Monitoring Kafka : Prêts à l'Emploi

Contenu Exclusif Membres

Cet article contient des scripts complets et testés en production. Copiez, adaptez, déployez.

Ces scripts sont ceux que j'utilise en mission pour monitorer des clusters Kafka de 3 à 47 brokers. Ils sont battle-tested sur des environnements traitant plusieurs millions de messages par seconde.


1. Script de Vérification du Lag Consumer

Ce script vérifie le lag de tous les consumer groups et envoie des alertes si le lag dépasse un seuil.

#!/usr/bin/env python3
"""
Kafka Consumer Lag Monitor
Envoie des alertes Slack/Email si le lag dépasse le seuil configuré.

Usage:
    python kafka_lag_monitor.py --bootstrap-server localhost:9092 --threshold 10000
"""

import argparse
import json
import subprocess
import sys
from datetime import datetime
from typing import Dict, List, Optional
import requests

class KafkaLagMonitor:
    def __init__(self, bootstrap_servers: str, threshold: int = 10000):
        self.bootstrap_servers = bootstrap_servers
        self.threshold = threshold
        self.alerts: List[Dict] = []

    def get_consumer_groups(self) -> List[str]:
        """Liste tous les consumer groups."""
        result = subprocess.run([
            'kafka-consumer-groups.sh',
            '--bootstrap-server', self.bootstrap_servers,
            '--list'
        ], capture_output=True, text=True)

        if result.returncode != 0:
            raise Exception(f"Erreur: {result.stderr}")

        return [g.strip() for g in result.stdout.strip().split('\n') if g.strip()]

    def get_group_lag(self, group_id: str) -> Dict:
        """Récupère le lag détaillé d'un consumer group."""
        result = subprocess.run([
            'kafka-consumer-groups.sh',
            '--bootstrap-server', self.bootstrap_servers,
            '--describe',
            '--group', group_id
        ], capture_output=True, text=True)

        if result.returncode != 0:
            return {'group': group_id, 'error': result.stderr, 'total_lag': -1}

        lines = result.stdout.strip().split('\n')
        total_lag = 0
        partitions = []

        for line in lines[1:]:  # Skip header
            if not line.strip():
                continue
            parts = line.split()
            if len(parts) >= 6:
                try:
                    lag = int(parts[5]) if parts[5] != '-' else 0
                    total_lag += lag
                    partitions.append({
                        'topic': parts[1],
                        'partition': int(parts[2]),
                        'current_offset': parts[3],
                        'log_end_offset': parts[4],
                        'lag': lag,
                        'consumer_id': parts[6] if len(parts) > 6 else 'N/A'
                    })
                except (ValueError, IndexError):
                    continue

        return {
            'group': group_id,
            'total_lag': total_lag,
            'partitions': partitions,
            'partition_count': len(partitions)
        }

    def check_all_groups(self) -> List[Dict]:
        """Vérifie le lag de tous les consumer groups."""
        groups = self.get_consumer_groups()
        results = []

        for group in groups:
            lag_info = self.get_group_lag(group)
            results.append(lag_info)

            if lag_info['total_lag'] > self.threshold:
                self.alerts.append({
                    'group': group,
                    'lag': lag_info['total_lag'],
                    'threshold': self.threshold,
                    'timestamp': datetime.now().isoformat()
                })

        return results

    def send_slack_alert(self, webhook_url: str):
        """Envoie les alertes vers Slack."""
        if not self.alerts:
            return

        blocks = [{
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "⚠️ Kafka Consumer Lag Alert"
            }
        }]

        for alert in self.alerts:
            blocks.append({
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Group:* `{alert['group']}`\n*Lag:* {alert['lag']:,} messages\n*Threshold:* {alert['threshold']:,}"
                }
            })

        payload = {"blocks": blocks}
        requests.post(webhook_url, json=payload)

    def generate_report(self, results: List[Dict]) -> str:
        """Génère un rapport texte."""
        report = []
        report.append("=" * 60)
        report.append(f"KAFKA CONSUMER LAG REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        report.append("")

        # Tri par lag décroissant
        sorted_results = sorted(results, key=lambda x: x['total_lag'], reverse=True)

        for r in sorted_results:
            status = "🔴 CRITICAL" if r['total_lag'] > self.threshold else "🟢 OK"
            report.append(f"{status} | {r['group']:<40} | Lag: {r['total_lag']:>10,}")

        report.append("")
        report.append(f"Total groups: {len(results)}")
        report.append(f"Groups in alert: {len(self.alerts)}")

        return '\n'.join(report)


def main():
    parser = argparse.ArgumentParser(description='Monitor Kafka consumer lag')
    parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
    parser.add_argument('--threshold', type=int, default=10000, help='Lag threshold for alerts')
    parser.add_argument('--slack-webhook', help='Slack webhook URL for alerts')
    parser.add_argument('--json', action='store_true', help='Output as JSON')

    args = parser.parse_args()

    monitor = KafkaLagMonitor(args.bootstrap_server, args.threshold)

    try:
        results = monitor.check_all_groups()

        if args.json:
            print(json.dumps(results, indent=2))
        else:
            print(monitor.generate_report(results))

        if args.slack_webhook and monitor.alerts:
            monitor.send_slack_alert(args.slack_webhook)
            print(f"\n📤 {len(monitor.alerts)} alertes envoyées vers Slack")

        # Exit code basé sur les alertes
        sys.exit(1 if monitor.alerts else 0)

    except Exception as e:
        print(f"❌ Erreur: {e}", file=sys.stderr)
        sys.exit(2)


if __name__ == "__main__":
    main()

2. Health Check Complet du Cluster

Script qui vérifie la santé globale du cluster : brokers, topics, réplication.

#!/usr/bin/env python3
"""
Kafka Cluster Health Check
Vérifie brokers, topics, réplication, et ISR.

Usage:
    python kafka_health_check.py --bootstrap-server localhost:9092
"""

import argparse
import json
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional
from datetime import datetime


@dataclass
class HealthStatus:
    component: str
    status: str  # OK, WARNING, CRITICAL
    message: str
    details: Optional[Dict] = None


class KafkaHealthChecker:
    def __init__(self, bootstrap_servers: str, zookeeper: str = None):
        self.bootstrap_servers = bootstrap_servers
        self.zookeeper = zookeeper
        self.checks: List[HealthStatus] = []

    def run_kafka_command(self, args: List[str]) -> subprocess.CompletedProcess:
        """Exécute une commande Kafka."""
        return subprocess.run(args, capture_output=True, text=True, timeout=30)

    def check_brokers(self) -> HealthStatus:
        """Vérifie l'état des brokers."""
        result = self.run_kafka_command([
            'kafka-broker-api-versions.sh',
            '--bootstrap-server', self.bootstrap_servers
        ])

        if result.returncode != 0:
            return HealthStatus(
                component="Brokers",
                status="CRITICAL",
                message="Impossible de contacter les brokers",
                details={"error": result.stderr}
            )

        # Compter les brokers
        broker_count = result.stdout.count("ApiVersion")

        if broker_count == 0:
            return HealthStatus(
                component="Brokers",
                status="CRITICAL",
                message="Aucun broker accessible"
            )

        return HealthStatus(
            component="Brokers",
            status="OK",
            message=f"{broker_count} broker(s) accessible(s)",
            details={"broker_count": broker_count}
        )

    def check_under_replicated_partitions(self) -> HealthStatus:
        """Vérifie les partitions sous-répliquées."""
        result = self.run_kafka_command([
            'kafka-topics.sh',
            '--bootstrap-server', self.bootstrap_servers,
            '--describe',
            '--under-replicated-partitions'
        ])

        if result.returncode != 0:
            return HealthStatus(
                component="Replication",
                status="WARNING",
                message="Impossible de vérifier la réplication",
                details={"error": result.stderr}
            )

        urp_count = len([l for l in result.stdout.split('\n') if l.strip()])

        if urp_count > 0:
            return HealthStatus(
                component="Replication",
                status="CRITICAL",
                message=f"{urp_count} partition(s) sous-répliquée(s)",
                details={"under_replicated": urp_count}
            )

        return HealthStatus(
            component="Replication",
            status="OK",
            message="Toutes les partitions sont répliquées"
        )

    def check_offline_partitions(self) -> HealthStatus:
        """Vérifie les partitions offline (sans leader)."""
        result = self.run_kafka_command([
            'kafka-topics.sh',
            '--bootstrap-server', self.bootstrap_servers,
            '--describe',
            '--unavailable-partitions'
        ])

        if result.returncode != 0:
            return HealthStatus(
                component="Partitions",
                status="WARNING",
                message="Impossible de vérifier les partitions"
            )

        offline_count = len([l for l in result.stdout.split('\n') if l.strip()])

        if offline_count > 0:
            return HealthStatus(
                component="Partitions",
                status="CRITICAL",
                message=f"{offline_count} partition(s) OFFLINE!",
                details={"offline": offline_count}
            )

        return HealthStatus(
            component="Partitions",
            status="OK",
            message="Toutes les partitions ont un leader"
        )

    def check_consumer_groups(self) -> HealthStatus:
        """Vérifie l'état des consumer groups."""
        result = self.run_kafka_command([
            'kafka-consumer-groups.sh',
            '--bootstrap-server', self.bootstrap_servers,
            '--list'
        ])

        if result.returncode != 0:
            return HealthStatus(
                component="Consumer Groups",
                status="WARNING",
                message="Impossible de lister les consumer groups"
            )

        groups = [g.strip() for g in result.stdout.split('\n') if g.strip()]

        return HealthStatus(
            component="Consumer Groups",
            status="OK",
            message=f"{len(groups)} consumer group(s) actif(s)",
            details={"group_count": len(groups)}
        )

    def run_all_checks(self) -> List[HealthStatus]:
        """Exécute tous les health checks."""
        self.checks = [
            self.check_brokers(),
            self.check_under_replicated_partitions(),
            self.check_offline_partitions(),
            self.check_consumer_groups()
        ]
        return self.checks

    def get_overall_status(self) -> str:
        """Retourne le statut global."""
        statuses = [c.status for c in self.checks]

        if "CRITICAL" in statuses:
            return "CRITICAL"
        elif "WARNING" in statuses:
            return "WARNING"
        return "OK"

    def generate_report(self) -> str:
        """Génère un rapport de santé."""
        status_icons = {
            "OK": "🟢",
            "WARNING": "🟡",
            "CRITICAL": "🔴"
        }

        report = []
        report.append("=" * 60)
        report.append(f"KAFKA HEALTH CHECK - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Cluster: {self.bootstrap_servers}")
        report.append("=" * 60)
        report.append("")

        for check in self.checks:
            icon = status_icons.get(check.status, "⚪")
            report.append(f"{icon} {check.component:<20} | {check.status:<10} | {check.message}")

        report.append("")
        report.append("-" * 60)
        overall = self.get_overall_status()
        report.append(f"OVERALL STATUS: {status_icons[overall]} {overall}")

        return '\n'.join(report)

    def to_json(self) -> str:
        """Exporte les résultats en JSON."""
        return json.dumps({
            "timestamp": datetime.now().isoformat(),
            "bootstrap_servers": self.bootstrap_servers,
            "overall_status": self.get_overall_status(),
            "checks": [asdict(c) for c in self.checks]
        }, indent=2)


def main():
    parser = argparse.ArgumentParser(description='Kafka cluster health check')
    parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
    parser.add_argument('--json', action='store_true', help='Output as JSON')

    args = parser.parse_args()

    checker = KafkaHealthChecker(args.bootstrap_server)

    try:
        checker.run_all_checks()

        if args.json:
            print(checker.to_json())
        else:
            print(checker.generate_report())

        # Exit code basé sur le statut
        status = checker.get_overall_status()
        sys.exit(0 if status == "OK" else (1 if status == "WARNING" else 2))

    except Exception as e:
        print(f"❌ Erreur: {e}", file=sys.stderr)
        sys.exit(3)


if __name__ == "__main__":
    main()

3. Exporteur Prometheus pour Métriques Custom

#!/usr/bin/env python3
"""
Exporteur Prometheus pour métriques Kafka personnalisées.
Expose les métriques sur :8000/metrics

Métriques exposées:
- kafka_consumer_lag_total
- kafka_topic_partition_count
- kafka_broker_count
- kafka_under_replicated_partitions
"""

from prometheus_client import start_http_server, Gauge, Counter, Info
import subprocess
import time
import argparse


# Définition des métriques
BROKER_COUNT = Gauge('kafka_broker_count', 'Number of active brokers')
TOPIC_COUNT = Gauge('kafka_topic_count', 'Number of topics')
PARTITION_COUNT = Gauge('kafka_partition_count', 'Total number of partitions')
UNDER_REPLICATED = Gauge('kafka_under_replicated_partitions', 'Under-replicated partitions')
CONSUMER_LAG = Gauge('kafka_consumer_lag_total', 'Consumer group lag', ['group', 'topic'])
GROUP_COUNT = Gauge('kafka_consumer_group_count', 'Number of consumer groups')


class KafkaMetricsCollector:
    def __init__(self, bootstrap_servers: str):
        self.bootstrap_servers = bootstrap_servers

    def collect_broker_metrics(self):
        """Collecte les métriques des brokers."""
        try:
            result = subprocess.run([
                'kafka-broker-api-versions.sh',
                '--bootstrap-server', self.bootstrap_servers
            ], capture_output=True, text=True, timeout=30)

            broker_count = result.stdout.count("ApiVersion") if result.returncode == 0 else 0
            BROKER_COUNT.set(broker_count)
        except Exception as e:
            print(f"Erreur collecte brokers: {e}")

    def collect_topic_metrics(self):
        """Collecte les métriques des topics."""
        try:
            result = subprocess.run([
                'kafka-topics.sh',
                '--bootstrap-server', self.bootstrap_servers,
                '--describe'
            ], capture_output=True, text=True, timeout=60)

            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                topics = set()
                partitions = 0

                for line in lines:
                    if line.startswith('Topic:'):
                        parts = line.split('\t')
                        for part in parts:
                            if part.startswith('Topic:'):
                                topics.add(part.split(':')[1].strip())
                            elif part.startswith('PartitionCount:'):
                                partitions += int(part.split(':')[1].strip())

                TOPIC_COUNT.set(len(topics))
                PARTITION_COUNT.set(partitions)
        except Exception as e:
            print(f"Erreur collecte topics: {e}")

    def collect_replication_metrics(self):
        """Collecte les métriques de réplication."""
        try:
            result = subprocess.run([
                'kafka-topics.sh',
                '--bootstrap-server', self.bootstrap_servers,
                '--describe',
                '--under-replicated-partitions'
            ], capture_output=True, text=True, timeout=30)

            urp = len([l for l in result.stdout.split('\n') if l.strip()]) if result.returncode == 0 else -1
            UNDER_REPLICATED.set(urp)
        except Exception as e:
            print(f"Erreur collecte réplication: {e}")

    def collect_consumer_lag(self):
        """Collecte le lag des consumer groups."""
        try:
            # Liste des groups
            result = subprocess.run([
                'kafka-consumer-groups.sh',
                '--bootstrap-server', self.bootstrap_servers,
                '--list'
            ], capture_output=True, text=True, timeout=30)

            if result.returncode != 0:
                return

            groups = [g.strip() for g in result.stdout.split('\n') if g.strip()]
            GROUP_COUNT.set(len(groups))

            # Lag par groupe
            for group in groups[:20]:  # Limiter pour éviter timeout
                try:
                    result = subprocess.run([
                        'kafka-consumer-groups.sh',
                        '--bootstrap-server', self.bootstrap_servers,
                        '--describe',
                        '--group', group
                    ], capture_output=True, text=True, timeout=10)

                    if result.returncode == 0:
                        lag_by_topic = {}
                        for line in result.stdout.split('\n')[1:]:
                            parts = line.split()
                            if len(parts) >= 6:
                                topic = parts[1]
                                try:
                                    lag = int(parts[5]) if parts[5] != '-' else 0
                                    lag_by_topic[topic] = lag_by_topic.get(topic, 0) + lag
                                except ValueError:
                                    continue

                        for topic, lag in lag_by_topic.items():
                            CONSUMER_LAG.labels(group=group, topic=topic).set(lag)

                except Exception:
                    continue

        except Exception as e:
            print(f"Erreur collecte lag: {e}")

    def collect_all(self):
        """Collecte toutes les métriques."""
        self.collect_broker_metrics()
        self.collect_topic_metrics()
        self.collect_replication_metrics()
        self.collect_consumer_lag()


def main():
    parser = argparse.ArgumentParser(description='Kafka Prometheus exporter')
    parser.add_argument('--bootstrap-server', required=True, help='Kafka bootstrap servers')
    parser.add_argument('--port', type=int, default=8000, help='Prometheus metrics port')
    parser.add_argument('--interval', type=int, default=30, help='Collection interval in seconds')

    args = parser.parse_args()

    # Démarrer le serveur HTTP
    start_http_server(args.port)
    print(f"📊 Prometheus exporter started on :{args.port}/metrics")

    collector = KafkaMetricsCollector(args.bootstrap_server)

    while True:
        try:
            collector.collect_all()
            print(f"✅ Métriques collectées - {time.strftime('%H:%M:%S')}")
        except Exception as e:
            print(f"❌ Erreur: {e}")

        time.sleep(args.interval)


if __name__ == "__main__":
    main()

4. Script Bash de Diagnostic Rapide

Pour un diagnostic rapide sans Python :

#!/bin/bash
# Diagnostic rapide d'un cluster Kafka
# Usage: ./kafka_quick_check.sh localhost:9092

BOOTSTRAP=${1:-localhost:9092}

echo "========================================"
echo "KAFKA QUICK DIAGNOSTIC"
echo "Bootstrap: $BOOTSTRAP"
echo "Date: $(date)"
echo "========================================"
echo ""

# 1. Test de connectivité
echo "🔍 Test de connectivité..."
if kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP > /dev/null 2>&1; then
    echo "✅ Cluster accessible"
    BROKER_COUNT=$(kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP 2>/dev/null | grep -c "ApiVersion")
    echo "   Brokers détectés: $BROKER_COUNT"
else
    echo "❌ ERREUR: Impossible de contacter le cluster"
    exit 1
fi
echo ""

# 2. Partitions sous-répliquées
echo "🔍 Vérification de la réplication..."
URP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:")
if [ "$URP" -eq 0 ]; then
    echo "✅ Aucune partition sous-répliquée"
else
    echo "⚠️  $URP partition(s) sous-répliquée(s)"
    kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --under-replicated-partitions 2>/dev/null
fi
echo ""

# 3. Partitions offline
echo "🔍 Vérification des partitions offline..."
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --unavailable-partitions 2>/dev/null | grep -c "Topic:")
if [ "$OFFLINE" -eq 0 ]; then
    echo "✅ Aucune partition offline"
else
    echo "🔴 CRITIQUE: $OFFLINE partition(s) OFFLINE!"
    kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --unavailable-partitions 2>/dev/null
fi
echo ""

# 4. Consumer groups avec lag
echo "🔍 Consumer groups avec lag élevé (>10000)..."
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | while read group; do
    if [ -n "$group" ]; then
        LAG=$(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --describe --group "$group" 2>/dev/null | awk 'NR>1 && $6 != "-" {sum+=$6} END {print sum+0}')
        if [ "$LAG" -gt 10000 ]; then
            echo "⚠️  $group: lag=$LAG"
        fi
    fi
done
echo ""

# 5. Résumé
echo "========================================"
echo "RÉSUMÉ"
echo "========================================"
TOPIC_COUNT=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | wc -l)
GROUP_COUNT=$(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list 2>/dev/null | wc -l)
echo "Topics: $TOPIC_COUNT"
echo "Consumer groups: $GROUP_COUNT"
echo "Brokers: $BROKER_COUNT"
echo "Under-replicated: $URP"
echo "Offline: $OFFLINE"

if [ "$OFFLINE" -gt 0 ]; then
    echo ""
    echo "🔴 STATUS: CRITICAL"
    exit 2
elif [ "$URP" -gt 0 ]; then
    echo ""
    echo "🟡 STATUS: WARNING"
    exit 1
else
    echo ""
    echo "🟢 STATUS: OK"
    exit 0
fi

Comment Utiliser ces Scripts

Installation des dépendances

pip install prometheus_client requests

Exemples d'utilisation

# Monitoring du lag
python kafka_lag_monitor.py --bootstrap-server kafka:9092 --threshold 5000

# Avec alertes Slack
python kafka_lag_monitor.py --bootstrap-server kafka:9092 \
  --slack-webhook https://hooks.slack.com/services/XXX

# Health check
python kafka_health_check.py --bootstrap-server kafka:9092

# Exporteur Prometheus (daemon)
python kafka_prometheus_exporter.py --bootstrap-server kafka:9092 --port 8000

# Diagnostic rapide bash
./kafka_quick_check.sh kafka:9092

Intégration Cron

# Crontab pour vérification toutes les 5 minutes
*/5 * * * * /opt/scripts/kafka_health_check.py --bootstrap-server kafka:9092 --json >> /var/log/kafka-health.log

Dashboard Grafana (JSON)

Un dashboard Grafana compatible avec l'exporteur Prometheus est disponible dans la section Ressources de votre espace membre.


Ces scripts sont fournis "as is" et testés sur Kafka 2.8+ et 3.x. Adaptez les chemins des binaires Kafka selon votre installation.

F

Florian Courouge

Expert DevOps & Kafka | Consultant freelance specialise dans les architectures distribuees et le streaming de donnees.

Articles similaires