Intermédiaire
⭐ Article vedette

Python pour DevOps : Automatisation et Outils d'Infrastructure

Maîtrisez Python pour l'automatisation DevOps : scripts d'infrastructure, monitoring, déploiement, et intégrations avec les outils cloud.

Publié le
16 décembre 2024
Lecture
17 min
Vues
0
Auteur
Florian Courouge
Python
DevOps
Automation
Infrastructure
Monitoring
Cloud

Table des matières

📋 Vue d'ensemble rapide des sujets traités dans cet article

Cliquez sur les sections ci-dessous pour naviguer rapidement

Python pour DevOps : Automatisation et Outils d'Infrastructure

Python est devenu l'outil de choix pour l'automatisation DevOps grâce à sa simplicité, sa richesse en bibliothèques et sa capacité d'intégration. Ce guide explore les techniques essentielles pour automatiser vos infrastructures avec Python.

💡Fondamentaux Python pour DevOps

Environnement et Bonnes Pratiques

#!/usr/bin/env python3
# setup_devops_env.py

import os
import subprocess
import sys
from pathlib import Path

class DevOpsEnvironment:
    """Gestionnaire d'environnement DevOps Python"""
    
    def __init__(self, project_name="devops-automation"):
        self.project_name = project_name
        self.project_path = Path.cwd() / project_name
        self.venv_path = self.project_path / "venv"
        
    def create_project_structure(self):
        """Crée la structure de projet DevOps"""
        directories = [
            "scripts/infrastructure",
            "scripts/monitoring", 
            "scripts/deployment",
            "config",
            "templates",
            "tests",
            "logs",
            "docs"
        ]
        
        for directory in directories:
            (self.project_path / directory).mkdir(parents=True, exist_ok=True)
            
        # Créer les fichiers de configuration
        self._create_config_files()
        
    def _create_config_files(self):
        """Crée les fichiers de configuration essentiels"""
        
        # requirements.txt
        requirements = """
boto3>=1.26.0
paramiko>=2.11.0
requests>=2.28.0
pyyaml>=6.0
jinja2>=3.1.0
click>=8.1.0
psutil>=5.9.0
docker>=6.0.0
kubernetes>=24.2.0
prometheus-client>=0.15.0
grafana-api>=1.0.3
slack-sdk>=3.19.0
"""
        (self.project_path / "requirements.txt").write_text(requirements.strip())
        
        # .env template
        env_template = """
# AWS Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_DEFAULT_REGION=us-east-1

# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=devops
DB_USER=devops_user
DB_PASSWORD=secure_password

# Monitoring
PROMETHEUS_URL=http://localhost:9090
GRAFANA_URL=http://localhost:3000
GRAFANA_API_KEY=your_api_key

# Notifications
SLACK_TOKEN=xoxb-your-slack-token
SLACK_CHANNEL=#devops-alerts

# SSH Configuration
SSH_KEY_PATH=~/.ssh/id_rsa
SSH_USER=ubuntu
"""
        (self.project_path / ".env.template").write_text(env_template.strip())
        
        # Configuration Python
        config_py = '''
import os
from pathlib import Path
from dotenv import load_dotenv

# Charger les variables d'environnement
load_dotenv()

class Config:
    """Configuration centralisée"""
    
    # Paths
    PROJECT_ROOT = Path(__file__).parent.parent
    LOGS_DIR = PROJECT_ROOT / "logs"
    CONFIG_DIR = PROJECT_ROOT / "config"
    TEMPLATES_DIR = PROJECT_ROOT / "templates"
    
    # AWS
    AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
    AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
    
    # Database
    DB_HOST = os.getenv("DB_HOST", "localhost")
    DB_PORT = int(os.getenv("DB_PORT", 5432))
    DB_NAME = os.getenv("DB_NAME", "devops")
    DB_USER = os.getenv("DB_USER")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    
    # Monitoring
    PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
    GRAFANA_URL = os.getenv("GRAFANA_URL", "http://localhost:3000")
    GRAFANA_API_KEY = os.getenv("GRAFANA_API_KEY")
    
    # Notifications
    SLACK_TOKEN = os.getenv("SLACK_TOKEN")
    SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#devops-alerts")
    
    # SSH
    SSH_KEY_PATH = os.path.expanduser(os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"))
    SSH_USER = os.getenv("SSH_USER", "ubuntu")
    
    @classmethod
    def validate(cls):
        """Valide la configuration"""
        required_vars = [
            "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
            "DB_PASSWORD", "SLACK_TOKEN"
        ]
        
        missing = [var for var in required_vars if not getattr(cls, var)]
        if missing:
            raise ValueError(f"Variables manquantes: {', '.join(missing)}")
'''
        (self.project_path / "config" / "__init__.py").write_text(config_py)

if __name__ == "__main__":
    env = DevOpsEnvironment()
    env.create_project_structure()
    print(f"Projet DevOps créé dans: {env.project_path}")

💡Automatisation d'Infrastructure

Gestion AWS avec Boto3

# scripts/infrastructure/aws_manager.py

import boto3
import json
import time
from typing import List, Dict, Optional
from botocore.exceptions import ClientError
from config import Config

class AWSInfrastructureManager:
    """Gestionnaire d'infrastructure AWS"""
    
    def __init__(self):
        self.session = boto3.Session(
            aws_access_key_id=Config.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=Config.AWS_SECRET_ACCESS_KEY,
            region_name=Config.AWS_DEFAULT_REGION
        )
        
        self.ec2 = self.session.client('ec2')
        self.elbv2 = self.session.client('elbv2')
        self.rds = self.session.client('rds')
        self.cloudformation = self.session.client('cloudformation')
        
    def create_vpc_infrastructure(self, vpc_name: str, cidr_block: str = "10.0.0.0/16") -> Dict:
        """Crée une infrastructure VPC complète"""
        
        try:
            # Créer le VPC
            vpc_response = self.ec2.create_vpc(
                CidrBlock=cidr_block,
                TagSpecifications=[{
                    'ResourceType': 'vpc',
                    'Tags': [{'Key': 'Name', 'Value': vpc_name}]
                }]
            )
            vpc_id = vpc_response['Vpc']['VpcId']
            
            # Attendre que le VPC soit disponible
            self.ec2.get_waiter('vpc_available').wait(VpcIds=[vpc_id])
            
            # Créer les subnets
            subnets = self._create_subnets(vpc_id, cidr_block)
            
            # Créer Internet Gateway
            igw_id = self._create_internet_gateway(vpc_id)
            
            # Créer les tables de routage
            route_tables = self._create_route_tables(vpc_id, igw_id, subnets)
            
            # Créer les groupes de sécurité
            security_groups = self._create_security_groups(vpc_id)
            
            infrastructure = {
                'vpc_id': vpc_id,
                'subnets': subnets,
                'internet_gateway_id': igw_id,
                'route_tables': route_tables,
                'security_groups': security_groups
            }
            
            print(f"Infrastructure VPC créée: {vpc_name}")
            return infrastructure
            
        except ClientError as e:
            print(f"Erreur lors de la création du VPC: {e}")
            raise
    
    def _create_subnets(self, vpc_id: str, vpc_cidr: str) -> Dict:
        """Crée les subnets publics et privés"""
        
        # Récupérer les zones de disponibilité
        azs = self.ec2.describe_availability_zones()['AvailabilityZones']
        
        subnets = {'public': [], 'private': []}
        
        for i, az in enumerate(azs[:2]):  # Utiliser 2 AZ
            # Subnet public
            public_subnet = self.ec2.create_subnet(
                VpcId=vpc_id,
                CidrBlock=f"10.0.{i+1}.0/24",
                AvailabilityZone=az['ZoneName'],
                TagSpecifications=[{
                    'ResourceType': 'subnet',
                    'Tags': [
                        {'Key': 'Name', 'Value': f'public-subnet-{i+1}'},
                        {'Key': 'Type', 'Value': 'public'}
                    ]
                }]
            )
            subnets['public'].append(public_subnet['Subnet']['SubnetId'])
            
            # Subnet privé
            private_subnet = self.ec2.create_subnet(
                VpcId=vpc_id,
                CidrBlock=f"10.0.{i+10}.0/24",
                AvailabilityZone=az['ZoneName'],
                TagSpecifications=[{
                    'ResourceType': 'subnet',
                    'Tags': [
                        {'Key': 'Name', 'Value': f'private-subnet-{i+1}'},
                        {'Key': 'Type', 'Value': 'private'}
                    ]
                }]
            )
            subnets['private'].append(private_subnet['Subnet']['SubnetId'])
        
        return subnets
    
    def launch_ec2_instances(self, instance_config: Dict) -> List[str]:
        """Lance des instances EC2"""
        
        try:
            response = self.ec2.run_instances(
                ImageId=instance_config['ami_id'],
                MinCount=instance_config.get('min_count', 1),
                MaxCount=instance_config.get('max_count', 1),
                InstanceType=instance_config.get('instance_type', 't3.micro'),
                KeyName=instance_config.get('key_name'),
                SecurityGroupIds=instance_config.get('security_groups', []),
                SubnetId=instance_config.get('subnet_id'),
                UserData=instance_config.get('user_data', ''),
                TagSpecifications=[{
                    'ResourceType': 'instance',
                    'Tags': instance_config.get('tags', [])
                }]
            )
            
            instance_ids = [instance['InstanceId'] for instance in response['Instances']]
            
            # Attendre que les instances soient en cours d'exécution
            self.ec2.get_waiter('instance_running').wait(InstanceIds=instance_ids)
            
            print(f"Instances lancées: {instance_ids}")
            return instance_ids
            
        except ClientError as e:
            print(f"Erreur lors du lancement des instances: {e}")
            raise
    
    def create_load_balancer(self, lb_config: Dict) -> str:
        """Crée un Application Load Balancer"""
        
        try:
            # Créer le load balancer
            response = self.elbv2.create_load_balancer(
                Name=lb_config['name'],
                Subnets=lb_config['subnets'],
                SecurityGroups=lb_config.get('security_groups', []),
                Scheme=lb_config.get('scheme', 'internet-facing'),
                Type='application',
                Tags=lb_config.get('tags', [])
            )
            
            lb_arn = response['LoadBalancers'][0]['LoadBalancerArn']
            
            # Créer le target group
            tg_response = self.elbv2.create_target_group(
                Name=f"{lb_config['name']}-tg",
                Protocol='HTTP',
                Port=80,
                VpcId=lb_config['vpc_id'],
                HealthCheckPath='/health',
                HealthCheckIntervalSeconds=30,
                HealthyThresholdCount=2,
                UnhealthyThresholdCount=5
            )
            
            tg_arn = tg_response['TargetGroups'][0]['TargetGroupArn']
            
            # Créer le listener
            self.elbv2.create_listener(
                LoadBalancerArn=lb_arn,
                Protocol='HTTP',
                Port=80,
                DefaultActions=[{
                    'Type': 'forward',
                    'TargetGroupArn': tg_arn
                }]
            )
            
            # Enregistrer les instances dans le target group
            if 'instance_ids' in lb_config:
                targets = [{'Id': instance_id, 'Port': 80} 
                          for instance_id in lb_config['instance_ids']]
                
                self.elbv2.register_targets(
                    TargetGroupArn=tg_arn,
                    Targets=targets
                )
            
            print(f"Load Balancer créé: {lb_config['name']}")
            return lb_arn
            
        except ClientError as e:
            print(f"Erreur lors de la création du Load Balancer: {e}")
            raise
    
    def deploy_cloudformation_stack(self, stack_name: str, template_path: str, parameters: Dict = None) -> str:
        """Déploie une stack CloudFormation"""
        
        try:
            with open(template_path, 'r') as template_file:
                template_body = template_file.read()
            
            # Préparer les paramètres
            cf_parameters = []
            if parameters:
                cf_parameters = [
                    {'ParameterKey': key, 'ParameterValue': str(value)}
                    for key, value in parameters.items()
                ]
            
            # Créer ou mettre à jour la stack
            try:
                response = self.cloudformation.create_stack(
                    StackName=stack_name,
                    TemplateBody=template_body,
                    Parameters=cf_parameters,
                    Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
                )
                stack_id = response['StackId']
                operation = 'CREATE'
                
            except ClientError as e:
                if 'AlreadyExistsException' in str(e):
                    response = self.cloudformation.update_stack(
                        StackName=stack_name,
                        TemplateBody=template_body,
                        Parameters=cf_parameters,
                        Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
                    )
                    stack_id = response['StackId']
                    operation = 'UPDATE'
                else:
                    raise
            
            # Attendre la fin de l'opération
            waiter_name = f'stack_{operation.lower()}_complete'
            waiter = self.cloudformation.get_waiter(waiter_name)
            waiter.wait(StackName=stack_name)
            
            print(f"Stack CloudFormation {operation.lower()}d: {stack_name}")
            return stack_id
            
        except Exception as e:
            print(f"Erreur lors du déploiement CloudFormation: {e}")
            raise

# Exemple d'utilisation
if __name__ == "__main__":
    aws_manager = AWSInfrastructureManager()
    
    # Créer l'infrastructure VPC
    infrastructure = aws_manager.create_vpc_infrastructure("devops-vpc")
    
    # Configuration des instances
    instance_config = {
        'ami_id': 'ami-0c02fb55956c7d316',  # Amazon Linux 2
        'instance_type': 't3.micro',
        'min_count': 2,
        'max_count': 2,
        'key_name': 'my-key-pair',
        'security_groups': infrastructure['security_groups']['web'],
        'subnet_id': infrastructure['subnets']['public'][0],
        'tags': [
            {'Key': 'Name', 'Value': 'web-server'},
            {'Key': 'Environment', 'Value': 'production'}
        ]
    }
    
    # Lancer les instances
    instance_ids = aws_manager.launch_ec2_instances(instance_config)
    
    # Créer le load balancer
    lb_config = {
        'name': 'web-lb',
        'subnets': infrastructure['subnets']['public'],
        'security_groups': infrastructure['security_groups']['lb'],
        'vpc_id': infrastructure['vpc_id'],
        'instance_ids': instance_ids
    }
    
    lb_arn = aws_manager.create_load_balancer(lb_config)

💡Monitoring et Métriques

Collecteur de Métriques Système

# scripts/monitoring/system_monitor.py

import psutil
import time
import json
import requests
from datetime import datetime
from typing import Dict, List
from dataclasses import dataclass, asdict
from config import Config

@dataclass
class SystemMetrics:
    """Structure des métriques système"""
    timestamp: str
    hostname: str
    cpu_percent: float
    memory_percent: float
    memory_available: int
    disk_usage: Dict[str, float]
    network_io: Dict[str, int]
    load_average: List[float]
    process_count: int
    uptime: float

class SystemMonitor:
    """Moniteur système avancé"""
    
    def __init__(self, interval: int = 60):
        self.interval = interval
        self.hostname = psutil.os.uname().nodename
        
    def collect_metrics(self) -> SystemMetrics:
        """Collecte les métriques système"""
        
        # CPU
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # Mémoire
        memory = psutil.virtual_memory()
        
        # Disque
        disk_usage = {}
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                disk_usage[partition.mountpoint] = {
                    'total': usage.total,
                    'used': usage.used,
                    'free': usage.free,
                    'percent': (usage.used / usage.total) * 100
                }
            except PermissionError:
                continue
        
        # Réseau
        network = psutil.net_io_counters()
        network_io = {
            'bytes_sent': network.bytes_sent,
            'bytes_recv': network.bytes_recv,
            'packets_sent': network.packets_sent,
            'packets_recv': network.packets_recv
        }
        
        # Load average
        load_avg = list(psutil.getloadavg())
        
        # Processus
        process_count = len(psutil.pids())
        
        # Uptime
        uptime = time.time() - psutil.boot_time()
        
        return SystemMetrics(
            timestamp=datetime.now().isoformat(),
            hostname=self.hostname,
            cpu_percent=cpu_percent,
            memory_percent=memory.percent,
            memory_available=memory.available,
            disk_usage=disk_usage,
            network_io=network_io,
            load_average=load_avg,
            process_count=process_count,
            uptime=uptime
        )
    
    def get_process_metrics(self) -> List[Dict]:
        """Récupère les métriques des processus"""
        
        processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
            try:
                processes.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        # Trier par utilisation CPU
        return sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:10]
    
    def check_alerts(self, metrics: SystemMetrics) -> List[Dict]:
        """Vérifie les seuils d'alerte"""
        
        alerts = []
        
        # CPU
        if metrics.cpu_percent > 80:
            alerts.append({
                'type': 'cpu',
                'level': 'warning' if metrics.cpu_percent < 90 else 'critical',
                'message': f'CPU usage: {metrics.cpu_percent:.1f}%',
                'value': metrics.cpu_percent
            })
        
        # Mémoire
        if metrics.memory_percent > 85:
            alerts.append({
                'type': 'memory',
                'level': 'warning' if metrics.memory_percent < 95 else 'critical',
                'message': f'Memory usage: {metrics.memory_percent:.1f}%',
                'value': metrics.memory_percent
            })
        
        # Disque
        for mount, usage in metrics.disk_usage.items():
            if usage['percent'] > 85:
                alerts.append({
                    'type': 'disk',
                    'level': 'warning' if usage['percent'] < 95 else 'critical',
                    'message': f'Disk usage {mount}: {usage["percent"]:.1f}%',
                    'value': usage['percent']
                })
        
        # Load average
        cpu_count = psutil.cpu_count()
        if metrics.load_average[0] > cpu_count * 0.8:
            alerts.append({
                'type': 'load',
                'level': 'warning' if metrics.load_average[0] < cpu_count else 'critical',
                'message': f'Load average: {metrics.load_average[0]:.2f}',
                'value': metrics.load_average[0]
            })
        
        return alerts

class PrometheusExporter:
    """Exporteur de métriques vers Prometheus"""
    
    def __init__(self, port: int = 8000):
        from prometheus_client import start_http_server, Gauge, Counter
        
        self.port = port
        
        # Métriques Prometheus
        self.cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
        self.memory_usage = Gauge('system_memory_usage_percent', 'Memory usage percentage')
        self.disk_usage = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['mountpoint'])
        self.load_average = Gauge('system_load_average', 'Load average', ['period'])
        self.network_bytes = Counter('system_network_bytes_total', 'Network bytes', ['direction'])
        
        # Démarrer le serveur HTTP
        start_http_server(self.port)
        print(f"Prometheus exporter started on port {self.port}")
    
    def update_metrics(self, metrics: SystemMetrics):
        """Met à jour les métriques Prometheus"""
        
        self.cpu_usage.set(metrics.cpu_percent)
        self.memory_usage.set(metrics.memory_percent)
        
        for mount, usage in metrics.disk_usage.items():
            self.disk_usage.labels(mountpoint=mount).set(usage['percent'])
        
        for i, load in enumerate(metrics.load_average):
            period = ['1m', '5m', '15m'][i]
            self.load_average.labels(period=period).set(load)
        
        self.network_bytes.labels(direction='sent').inc(metrics.network_io['bytes_sent'])
        self.network_bytes.labels(direction='received').inc(metrics.network_io['bytes_recv'])

class AlertManager:
    """Gestionnaire d'alertes"""
    
    def __init__(self):
        self.slack_token = Config.SLACK_TOKEN
        self.slack_channel = Config.SLACK_CHANNEL
        
    def send_slack_alert(self, alert: Dict):
        """Envoie une alerte Slack"""
        
        if not self.slack_token:
            print(f"Alert: {alert['message']}")
            return
        
        from slack_sdk import WebClient
        
        client = WebClient(token=self.slack_token)
        
        color = {
            'warning': '#ffcc00',
            'critical': '#ff0000'
        }.get(alert['level'], '#00ff00')
        
        attachment = {
            'color': color,
            'title': f"{alert['level'].upper()}: {alert['type'].upper()} Alert",
            'text': alert['message'],
            'fields': [
                {
                    'title': 'Hostname',
                    'value': psutil.os.uname().nodename,
                    'short': True
                },
                {
                    'title': 'Timestamp',
                    'value': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'short': True
                }
            ]
        }
        
        try:
            client.chat_postMessage(
                channel=self.slack_channel,
                text=f"System Alert: {alert['message']}",
                attachments=[attachment]
            )
        except Exception as e:
            print(f"Failed to send Slack alert: {e}")

# Script principal de monitoring
def main():
    monitor = SystemMonitor(interval=60)
    prometheus_exporter = PrometheusExporter()
    alert_manager = AlertManager()
    
    print("System monitoring started...")
    
    while True:
        try:
            # Collecter les métriques
            metrics = monitor.collect_metrics()
            
            # Exporter vers Prometheus
            prometheus_exporter.update_metrics(metrics)
            
            # Vérifier les alertes
            alerts = monitor.check_alerts(metrics)
            
            # Envoyer les alertes
            for alert in alerts:
                alert_manager.send_slack_alert(alert)
            
            # Log des métriques
            print(f"[{metrics.timestamp}] CPU: {metrics.cpu_percent:.1f}%, "
                  f"Memory: {metrics.memory_percent:.1f}%, "
                  f"Load: {metrics.load_average[0]:.2f}")
            
            time.sleep(monitor.interval)
            
        except KeyboardInterrupt:
            print("Monitoring stopped.")
            break
        except Exception as e:
            print(f"Error in monitoring loop: {e}")
            time.sleep(10)

if __name__ == "__main__":
    main()

💡Déploiement et CI/CD

Gestionnaire de Déploiement

# scripts/deployment/deploy_manager.py

import os
import subprocess
import yaml
import docker
import paramiko
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass
from config import Config

@dataclass
class DeploymentConfig:
    """Configuration de déploiement"""
    app_name: str
    version: str
    environment: str
    servers: List[str]
    docker_image: Optional[str] = None
    health_check_url: Optional[str] = None
    rollback_on_failure: bool = True

class DockerDeployment:
    """Gestionnaire de déploiement Docker"""
    
    def __init__(self):
        self.client = docker.from_env()
    
    def build_image(self, dockerfile_path: str, image_name: str, tag: str) -> str:
        """Construit une image Docker"""
        
        try:
            print(f"Building Docker image: {image_name}:{tag}")
            
            image, logs = self.client.images.build(
                path=str(Path(dockerfile_path).parent),
                dockerfile=Path(dockerfile_path).name,
                tag=f"{image_name}:{tag}",
                rm=True,
                forcerm=True
            )
            
            for log in logs:
                if 'stream' in log:
                    print(log['stream'].strip())
            
            print(f"Image built successfully: {image.id}")
            return image.id
            
        except docker.errors.BuildError as e:
            print(f"Build failed: {e}")
            raise
    
    def push_image(self, image_name: str, tag: str, registry: str = None) -> bool:
        """Pousse l'image vers un registry"""
        
        try:
            full_name = f"{registry}/{image_name}:{tag}" if registry else f"{image_name}:{tag}"
            
            print(f"Pushing image: {full_name}")
            
            for line in self.client.images.push(full_name, stream=True, decode=True):
                if 'status' in line:
                    print(f"{line['status']}: {line.get('progress', '')}")
            
            print("Image pushed successfully")
            return True
            
        except Exception as e:
            print(f"Push failed: {e}")
            return False
    
    def deploy_container(self, config: DeploymentConfig, server: str) -> bool:
        """Déploie un conteneur sur un serveur"""
        
        try:
            # Se connecter au serveur distant
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(
                hostname=server,
                username=Config.SSH_USER,
                key_filename=Config.SSH_KEY_PATH
            )
            
            # Commandes de déploiement
            commands = [
                f"docker pull {config.docker_image}",
                f"docker stop {config.app_name} || true",
                f"docker rm {config.app_name} || true",
                f"docker run -d --name {config.app_name} "
                f"--restart unless-stopped "
                f"-p 80:8000 "
                f"{config.docker_image}"
            ]
            
            for command in commands:
                print(f"Executing on {server}: {command}")
                stdin, stdout, stderr = ssh.exec_command(command)
                
                exit_status = stdout.channel.recv_exit_status()
                if exit_status != 0:
                    error = stderr.read().decode()
                    print(f"Command failed: {error}")
                    return False
                
                output = stdout.read().decode()
                if output:
                    print(output)
            
            ssh.close()
            return True
            
        except Exception as e:
            print(f"Deployment failed on {server}: {e}")
            return False

class KubernetesDeployment:
    """Gestionnaire de déploiement Kubernetes"""
    
    def __init__(self):
        from kubernetes import client, config as k8s_config
        
        try:
            k8s_config.load_incluster_config()
        except:
            k8s_config.load_kube_config()
        
        self.apps_v1 = client.AppsV1Api()
        self.core_v1 = client.CoreV1Api()
    
    def create_deployment_manifest(self, config: DeploymentConfig) -> Dict:
        """Crée un manifeste de déploiement Kubernetes"""
        
        return {
            'apiVersion': 'apps/v1',
            'kind': 'Deployment',
            'metadata': {
                'name': config.app_name,
                'namespace': config.environment,
                'labels': {
                    'app': config.app_name,
                    'version': config.version,
                    'environment': config.environment
                }
            },
            'spec': {
                'replicas': 3,
                'selector': {
                    'matchLabels': {
                        'app': config.app_name
                    }
                },
                'template': {
                    'metadata': {
                        'labels': {
                            'app': config.app_name,
                            'version': config.version
                        }
                    },
                    'spec': {
                        'containers': [{
                            'name': config.app_name,
                            'image': config.docker_image,
                            'ports': [{
                                'containerPort': 8000
                            }],
                            'livenessProbe': {
                                'httpGet': {
                                    'path': '/health',
                                    'port': 8000
                                },
                                'initialDelaySeconds': 30,
                                'periodSeconds': 10
                            },
                            'readinessProbe': {
                                'httpGet': {
                                    'path': '/ready',
                                    'port': 8000
                                },
                                'initialDelaySeconds': 5,
                                'periodSeconds': 5
                            },
                            'resources': {
                                'requests': {
                                    'cpu': '100m',
                                    'memory': '128Mi'
                                },
                                'limits': {
                                    'cpu': '500m',
                                    'memory': '512Mi'
                                }
                            }
                        }]
                    }
                },
                'strategy': {
                    'type': 'RollingUpdate',
                    'rollingUpdate': {
                        'maxUnavailable': 1,
                        'maxSurge': 1
                    }
                }
            }
        }
    
    def deploy_to_kubernetes(self, config: DeploymentConfig) -> bool:
        """Déploie vers Kubernetes"""
        
        try:
            manifest = self.create_deployment_manifest(config)
            
            # Vérifier si le déploiement existe
            try:
                existing = self.apps_v1.read_namespaced_deployment(
                    name=config.app_name,
                    namespace=config.environment
                )
                
                # Mettre à jour
                self.apps_v1.patch_namespaced_deployment(
                    name=config.app_name,
                    namespace=config.environment,
                    body=manifest
                )
                print(f"Deployment updated: {config.app_name}")
                
            except:
                # Créer nouveau déploiement
                self.apps_v1.create_namespaced_deployment(
                    namespace=config.environment,
                    body=manifest
                )
                print(f"Deployment created: {config.app_name}")
            
            # Attendre que le déploiement soit prêt
            return self._wait_for_deployment(config)
            
        except Exception as e:
            print(f"Kubernetes deployment failed: {e}")
            return False
    
    def _wait_for_deployment(self, config: DeploymentConfig, timeout: int = 300) -> bool:
        """Attend que le déploiement soit prêt"""
        
        import time
        
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            try:
                deployment = self.apps_v1.read_namespaced_deployment(
                    name=config.app_name,
                    namespace=config.environment
                )
                
                if (deployment.status.ready_replicas == deployment.spec.replicas and
                    deployment.status.updated_replicas == deployment.spec.replicas):
                    print(f"Deployment ready: {config.app_name}")
                    return True
                
                print(f"Waiting for deployment... "
                      f"{deployment.status.ready_replicas or 0}/{deployment.spec.replicas}")
                
                time.sleep(10)
                
            except Exception as e:
                print(f"Error checking deployment status: {e}")
                time.sleep(10)
        
        print(f"Deployment timeout: {config.app_name}")
        return False

class DeploymentManager:
    """Gestionnaire principal de déploiement"""
    
    def __init__(self):
        self.docker_deployment = DockerDeployment()
        self.k8s_deployment = KubernetesDeployment()
    
    def deploy(self, config: DeploymentConfig, platform: str = "docker") -> bool:
        """Déploie l'application"""
        
        print(f"Starting deployment: {config.app_name} v{config.version}")
        
        success = False
        
        if platform == "docker":
            success = self._deploy_docker(config)
        elif platform == "kubernetes":
            success = self._deploy_kubernetes(config)
        else:
            print(f"Unsupported platform: {platform}")
            return False
        
        if success:
            print(f"Deployment successful: {config.app_name}")
            self._post_deployment_checks(config)
        else:
            print(f"Deployment failed: {config.app_name}")
            if config.rollback_on_failure:
                self._rollback(config)
        
        return success
    
    def _deploy_docker(self, config: DeploymentConfig) -> bool:
        """Déploiement Docker"""
        
        failed_servers = []
        
        for server in config.servers:
            if not self.docker_deployment.deploy_container(config, server):
                failed_servers.append(server)
        
        if failed_servers:
            print(f"Deployment failed on servers: {failed_servers}")
            return False
        
        return True
    
    def _deploy_kubernetes(self, config: DeploymentConfig) -> bool:
        """Déploiement Kubernetes"""
        
        return self.k8s_deployment.deploy_to_kubernetes(config)
    
    def _post_deployment_checks(self, config: DeploymentConfig):
        """Vérifications post-déploiement"""
        
        if not config.health_check_url:
            return
        
        import requests
        import time
        
        print("Running post-deployment health checks...")
        
        for server in config.servers:
            url = f"http://{server}{config.health_check_url}"
            
            for attempt in range(5):
                try:
                    response = requests.get(url, timeout=10)
                    if response.status_code == 200:
                        print(f"Health check passed: {server}")
                        break
                    else:
                        print(f"Health check failed: {server} (HTTP {response.status_code})")
                except Exception as e:
                    print(f"Health check error: {server} - {e}")
                
                if attempt < 4:
                    time.sleep(10)
    
    def _rollback(self, config: DeploymentConfig):
        """Rollback en cas d'échec"""
        
        print(f"Rolling back deployment: {config.app_name}")
        # Implémentation du rollback selon la plateforme
        pass

# Exemple d'utilisation
if __name__ == "__main__":
    config = DeploymentConfig(
        app_name="myapp",
        version="1.2.3",
        environment="production",
        servers=["10.0.1.10", "10.0.1.11"],
        docker_image="myregistry/myapp:1.2.3",
        health_check_url="/health",
        rollback_on_failure=True
    )
    
    manager = DeploymentManager()
    success = manager.deploy(config, platform="docker")
    
    if success:
        print("Deployment completed successfully!")
    else:
        print("Deployment failed!")

💡Conclusion

Python offre un écosystème riche pour l'automatisation DevOps :

Avantages Clés

  • Syntaxe simple et lisible
  • Bibliothèques spécialisées nombreuses
  • Intégration native avec les APIs cloud
  • Communauté active et documentation riche

Domaines d'Application

  • Automatisation d'infrastructure
  • Monitoring et alerting
  • Déploiement et CI/CD
  • Gestion de configuration

Bonnes Pratiques

  • Structure de projet claire
  • Gestion des secrets sécurisée
  • Tests automatisés
  • Documentation complète

Les scripts et techniques présentés dans cet article constituent une base solide pour automatiser vos opérations DevOps avec Python. L'investissement dans ces outils se traduit rapidement par des gains de productivité et de fiabilité.

Pour un accompagnement dans l'automatisation de vos processus DevOps avec Python, contactez-moi pour une consultation personnalisée.

À propos de l'auteur

Florian Courouge - Expert DevOps et Apache Kafka avec plus de 5 ans d'expérience dans l'architecture de systèmes distribués et l'automatisation d'infrastructures.

Cet article vous a été utile ?

Découvrez mes autres articles techniques ou contactez-moi pour discuter de vos projets DevOps et Kafka.