Python pour DevOps : Automatisation et Outils d'Infrastructure
Python est devenu l'outil de choix pour l'automatisation DevOps grâce à sa simplicité, sa richesse en bibliothèques et sa capacité d'intégration. Ce guide explore les techniques essentielles pour automatiser vos infrastructures avec Python.
Fondamentaux Python pour DevOps
Environnement et Bonnes Pratiques
#!/usr/bin/env python3
# setup_devops_env.py
import os
import subprocess
import sys
from pathlib import Path
class DevOpsEnvironment:
"""Gestionnaire d'environnement DevOps Python"""
def __init__(self, project_name="devops-automation"):
self.project_name = project_name
self.project_path = Path.cwd() / project_name
self.venv_path = self.project_path / "venv"
def create_project_structure(self):
"""Crée la structure de projet DevOps"""
directories = [
"scripts/infrastructure",
"scripts/monitoring",
"scripts/deployment",
"config",
"templates",
"tests",
"logs",
"docs"
]
for directory in directories:
(self.project_path / directory).mkdir(parents=True, exist_ok=True)
# Créer les fichiers de configuration
self._create_config_files()
def _create_config_files(self):
"""Crée les fichiers de configuration essentiels"""
# requirements.txt
requirements = """
boto3>=1.26.0
paramiko>=2.11.0
requests>=2.28.0
pyyaml>=6.0
jinja2>=3.1.0
click>=8.1.0
psutil>=5.9.0
docker>=6.0.0
kubernetes>=24.2.0
prometheus-client>=0.15.0
grafana-api>=1.0.3
slack-sdk>=3.19.0
"""
(self.project_path / "requirements.txt").write_text(requirements.strip())
# .env template
env_template = """
# AWS Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_DEFAULT_REGION=us-east-1
# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=devops
DB_USER=devops_user
DB_PASSWORD=secure_password
# Monitoring
PROMETHEUS_URL=http://localhost:9090
GRAFANA_URL=http://localhost:3000
GRAFANA_API_KEY=your_api_key
# Notifications
SLACK_TOKEN=xoxb-your-slack-token
SLACK_CHANNEL=#devops-alerts
# SSH Configuration
SSH_KEY_PATH=~/.ssh/id_rsa
SSH_USER=ubuntu
"""
(self.project_path / ".env.template").write_text(env_template.strip())
# Configuration Python
config_py = '''
import os
from pathlib import Path
from dotenv import load_dotenv
# Charger les variables d'environnement
load_dotenv()
class Config:
"""Configuration centralisée"""
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
CONFIG_DIR = PROJECT_ROOT / "config"
TEMPLATES_DIR = PROJECT_ROOT / "templates"
# AWS
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
# Database
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 5432))
DB_NAME = os.getenv("DB_NAME", "devops")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
# Monitoring
PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
GRAFANA_URL = os.getenv("GRAFANA_URL", "http://localhost:3000")
GRAFANA_API_KEY = os.getenv("GRAFANA_API_KEY")
# Notifications
SLACK_TOKEN = os.getenv("SLACK_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#devops-alerts")
# SSH
SSH_KEY_PATH = os.path.expanduser(os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"))
SSH_USER = os.getenv("SSH_USER", "ubuntu")
@classmethod
def validate(cls):
"""Valide la configuration"""
required_vars = [
"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
"DB_PASSWORD", "SLACK_TOKEN"
]
missing = [var for var in required_vars if not getattr(cls, var)]
if missing:
raise ValueError(f"Variables manquantes: {', '.join(missing)}")
'''
(self.project_path / "config" / "__init__.py").write_text(config_py)
if __name__ == "__main__":
env = DevOpsEnvironment()
env.create_project_structure()
print(f"Projet DevOps créé dans: {env.project_path}")
Automatisation d'Infrastructure
Gestion AWS avec Boto3
# scripts/infrastructure/aws_manager.py
import boto3
import json
import time
from typing import List, Dict, Optional
from botocore.exceptions import ClientError
from config import Config
class AWSInfrastructureManager:
"""Gestionnaire d'infrastructure AWS"""
def __init__(self):
self.session = boto3.Session(
aws_access_key_id=Config.AWS_ACCESS_KEY_ID,
aws_secret_access_key=Config.AWS_SECRET_ACCESS_KEY,
region_name=Config.AWS_DEFAULT_REGION
)
self.ec2 = self.session.client('ec2')
self.elbv2 = self.session.client('elbv2')
self.rds = self.session.client('rds')
self.cloudformation = self.session.client('cloudformation')
def create_vpc_infrastructure(self, vpc_name: str, cidr_block: str = "10.0.0.0/16") -> Dict:
"""Crée une infrastructure VPC complète"""
try:
# Créer le VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=cidr_block,
TagSpecifications=[{
'ResourceType': 'vpc',
'Tags': [{'Key': 'Name', 'Value': vpc_name}]
}]
)
vpc_id = vpc_response['Vpc']['VpcId']
# Attendre que le VPC soit disponible
self.ec2.get_waiter('vpc_available').wait(VpcIds=[vpc_id])
# Créer les subnets
subnets = self._create_subnets(vpc_id, cidr_block)
# Créer Internet Gateway
igw_id = self._create_internet_gateway(vpc_id)
# Créer les tables de routage
route_tables = self._create_route_tables(vpc_id, igw_id, subnets)
# Créer les groupes de sécurité
security_groups = self._create_security_groups(vpc_id)
infrastructure = {
'vpc_id': vpc_id,
'subnets': subnets,
'internet_gateway_id': igw_id,
'route_tables': route_tables,
'security_groups': security_groups
}
print(f"Infrastructure VPC créée: {vpc_name}")
return infrastructure
except ClientError as e:
print(f"Erreur lors de la création du VPC: {e}")
raise
def _create_subnets(self, vpc_id: str, vpc_cidr: str) -> Dict:
"""Crée les subnets publics et privés"""
# Récupérer les zones de disponibilité
azs = self.ec2.describe_availability_zones()['AvailabilityZones']
subnets = {'public': [], 'private': []}
for i, az in enumerate(azs[:2]): # Utiliser 2 AZ
# Subnet public
public_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=f"10.0.{i+1}.0/24",
AvailabilityZone=az['ZoneName'],
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f'public-subnet-{i+1}'},
{'Key': 'Type', 'Value': 'public'}
]
}]
)
subnets['public'].append(public_subnet['Subnet']['SubnetId'])
# Subnet privé
private_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=f"10.0.{i+10}.0/24",
AvailabilityZone=az['ZoneName'],
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f'private-subnet-{i+1}'},
{'Key': 'Type', 'Value': 'private'}
]
}]
)
subnets['private'].append(private_subnet['Subnet']['SubnetId'])
return subnets
def launch_ec2_instances(self, instance_config: Dict) -> List[str]:
"""Lance des instances EC2"""
try:
response = self.ec2.run_instances(
ImageId=instance_config['ami_id'],
MinCount=instance_config.get('min_count', 1),
MaxCount=instance_config.get('max_count', 1),
InstanceType=instance_config.get('instance_type', 't3.micro'),
KeyName=instance_config.get('key_name'),
SecurityGroupIds=instance_config.get('security_groups', []),
SubnetId=instance_config.get('subnet_id'),
UserData=instance_config.get('user_data', ''),
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': instance_config.get('tags', [])
}]
)
instance_ids = [instance['InstanceId'] for instance in response['Instances']]
# Attendre que les instances soient en cours d'exécution
self.ec2.get_waiter('instance_running').wait(InstanceIds=instance_ids)
print(f"Instances lancées: {instance_ids}")
return instance_ids
except ClientError as e:
print(f"Erreur lors du lancement des instances: {e}")
raise
def create_load_balancer(self, lb_config: Dict) -> str:
"""Crée un Application Load Balancer"""
try:
# Créer le load balancer
response = self.elbv2.create_load_balancer(
Name=lb_config['name'],
Subnets=lb_config['subnets'],
SecurityGroups=lb_config.get('security_groups', []),
Scheme=lb_config.get('scheme', 'internet-facing'),
Type='application',
Tags=lb_config.get('tags', [])
)
lb_arn = response['LoadBalancers'][0]['LoadBalancerArn']
# Créer le target group
tg_response = self.elbv2.create_target_group(
Name=f"{lb_config['name']}-tg",
Protocol='HTTP',
Port=80,
VpcId=lb_config['vpc_id'],
HealthCheckPath='/health',
HealthCheckIntervalSeconds=30,
HealthyThresholdCount=2,
UnhealthyThresholdCount=5
)
tg_arn = tg_response['TargetGroups'][0]['TargetGroupArn']
# Créer le listener
self.elbv2.create_listener(
LoadBalancerArn=lb_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[{
'Type': 'forward',
'TargetGroupArn': tg_arn
}]
)
# Enregistrer les instances dans le target group
if 'instance_ids' in lb_config:
targets = [{'Id': instance_id, 'Port': 80}
for instance_id in lb_config['instance_ids']]
self.elbv2.register_targets(
TargetGroupArn=tg_arn,
Targets=targets
)
print(f"Load Balancer créé: {lb_config['name']}")
return lb_arn
except ClientError as e:
print(f"Erreur lors de la création du Load Balancer: {e}")
raise
def deploy_cloudformation_stack(self, stack_name: str, template_path: str, parameters: Dict = None) -> str:
"""Déploie une stack CloudFormation"""
try:
with open(template_path, 'r') as template_file:
template_body = template_file.read()
# Préparer les paramètres
cf_parameters = []
if parameters:
cf_parameters = [
{'ParameterKey': key, 'ParameterValue': str(value)}
for key, value in parameters.items()
]
# Créer ou mettre à jour la stack
try:
response = self.cloudformation.create_stack(
StackName=stack_name,
TemplateBody=template_body,
Parameters=cf_parameters,
Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
)
stack_id = response['StackId']
operation = 'CREATE'
except ClientError as e:
if 'AlreadyExistsException' in str(e):
response = self.cloudformation.update_stack(
StackName=stack_name,
TemplateBody=template_body,
Parameters=cf_parameters,
Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
)
stack_id = response['StackId']
operation = 'UPDATE'
else:
raise
# Attendre la fin de l'opération
waiter_name = f'stack_{operation.lower()}_complete'
waiter = self.cloudformation.get_waiter(waiter_name)
waiter.wait(StackName=stack_name)
print(f"Stack CloudFormation {operation.lower()}d: {stack_name}")
return stack_id
except Exception as e:
print(f"Erreur lors du déploiement CloudFormation: {e}")
raise
# Exemple d'utilisation
if __name__ == "__main__":
aws_manager = AWSInfrastructureManager()
# Créer l'infrastructure VPC
infrastructure = aws_manager.create_vpc_infrastructure("devops-vpc")
# Configuration des instances
instance_config = {
'ami_id': 'ami-0c02fb55956c7d316', # Amazon Linux 2
'instance_type': 't3.micro',
'min_count': 2,
'max_count': 2,
'key_name': 'my-key-pair',
'security_groups': infrastructure['security_groups']['web'],
'subnet_id': infrastructure['subnets']['public'][0],
'tags': [
{'Key': 'Name', 'Value': 'web-server'},
{'Key': 'Environment', 'Value': 'production'}
]
}
# Lancer les instances
instance_ids = aws_manager.launch_ec2_instances(instance_config)
# Créer le load balancer
lb_config = {
'name': 'web-lb',
'subnets': infrastructure['subnets']['public'],
'security_groups': infrastructure['security_groups']['lb'],
'vpc_id': infrastructure['vpc_id'],
'instance_ids': instance_ids
}
lb_arn = aws_manager.create_load_balancer(lb_config)
Monitoring et Métriques
Collecteur de Métriques Système
# scripts/monitoring/system_monitor.py
import psutil
import time
import json
import requests
from datetime import datetime
from typing import Dict, List
from dataclasses import dataclass, asdict
from config import Config
@dataclass
class SystemMetrics:
"""Structure des métriques système"""
timestamp: str
hostname: str
cpu_percent: float
memory_percent: float
memory_available: int
disk_usage: Dict[str, float]
network_io: Dict[str, int]
load_average: List[float]
process_count: int
uptime: float
class SystemMonitor:
"""Moniteur système avancé"""
def __init__(self, interval: int = 60):
self.interval = interval
self.hostname = psutil.os.uname().nodename
def collect_metrics(self) -> SystemMetrics:
"""Collecte les métriques système"""
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
# Mémoire
memory = psutil.virtual_memory()
# Disque
disk_usage = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_usage[partition.mountpoint] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': (usage.used / usage.total) * 100
}
except PermissionError:
continue
# Réseau
network = psutil.net_io_counters()
network_io = {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
}
# Load average
load_avg = list(psutil.getloadavg())
# Processus
process_count = len(psutil.pids())
# Uptime
uptime = time.time() - psutil.boot_time()
return SystemMetrics(
timestamp=datetime.now().isoformat(),
hostname=self.hostname,
cpu_percent=cpu_percent,
memory_percent=memory.percent,
memory_available=memory.available,
disk_usage=disk_usage,
network_io=network_io,
load_average=load_avg,
process_count=process_count,
uptime=uptime
)
def get_process_metrics(self) -> List[Dict]:
"""Récupère les métriques des processus"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Trier par utilisation CPU
return sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:10]
def check_alerts(self, metrics: SystemMetrics) -> List[Dict]:
"""Vérifie les seuils d'alerte"""
alerts = []
# CPU
if metrics.cpu_percent > 80:
alerts.append({
'type': 'cpu',
'level': 'warning' if metrics.cpu_percent < 90 else 'critical',
'message': f'CPU usage: {metrics.cpu_percent:.1f}%',
'value': metrics.cpu_percent
})
# Mémoire
if metrics.memory_percent > 85:
alerts.append({
'type': 'memory',
'level': 'warning' if metrics.memory_percent < 95 else 'critical',
'message': f'Memory usage: {metrics.memory_percent:.1f}%',
'value': metrics.memory_percent
})
# Disque
for mount, usage in metrics.disk_usage.items():
if usage['percent'] > 85:
alerts.append({
'type': 'disk',
'level': 'warning' if usage['percent'] < 95 else 'critical',
'message': f'Disk usage {mount}: {usage["percent"]:.1f}%',
'value': usage['percent']
})
# Load average
cpu_count = psutil.cpu_count()
if metrics.load_average[0] > cpu_count * 0.8:
alerts.append({
'type': 'load',
'level': 'warning' if metrics.load_average[0] < cpu_count else 'critical',
'message': f'Load average: {metrics.load_average[0]:.2f}',
'value': metrics.load_average[0]
})
return alerts
class PrometheusExporter:
"""Exporteur de métriques vers Prometheus"""
def __init__(self, port: int = 8000):
from prometheus_client import start_http_server, Gauge, Counter
self.port = port
# Métriques Prometheus
self.cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
self.memory_usage = Gauge('system_memory_usage_percent', 'Memory usage percentage')
self.disk_usage = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['mountpoint'])
self.load_average = Gauge('system_load_average', 'Load average', ['period'])
self.network_bytes = Counter('system_network_bytes_total', 'Network bytes', ['direction'])
# Démarrer le serveur HTTP
start_http_server(self.port)
print(f"Prometheus exporter started on port {self.port}")
def update_metrics(self, metrics: SystemMetrics):
"""Met à jour les métriques Prometheus"""
self.cpu_usage.set(metrics.cpu_percent)
self.memory_usage.set(metrics.memory_percent)
for mount, usage in metrics.disk_usage.items():
self.disk_usage.labels(mountpoint=mount).set(usage['percent'])
for i, load in enumerate(metrics.load_average):
period = ['1m', '5m', '15m'][i]
self.load_average.labels(period=period).set(load)
self.network_bytes.labels(direction='sent').inc(metrics.network_io['bytes_sent'])
self.network_bytes.labels(direction='received').inc(metrics.network_io['bytes_recv'])
class AlertManager:
"""Gestionnaire d'alertes"""
def __init__(self):
self.slack_token = Config.SLACK_TOKEN
self.slack_channel = Config.SLACK_CHANNEL
def send_slack_alert(self, alert: Dict):
"""Envoie une alerte Slack"""
if not self.slack_token:
print(f"Alert: {alert['message']}")
return
from slack_sdk import WebClient
client = WebClient(token=self.slack_token)
color = {
'warning': '#ffcc00',
'critical': '#ff0000'
}.get(alert['level'], '#00ff00')
attachment = {
'color': color,
'title': f"{alert['level'].upper()}: {alert['type'].upper()} Alert",
'text': alert['message'],
'fields': [
{
'title': 'Hostname',
'value': psutil.os.uname().nodename,
'short': True
},
{
'title': 'Timestamp',
'value': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'short': True
}
]
}
try:
client.chat_postMessage(
channel=self.slack_channel,
text=f"System Alert: {alert['message']}",
attachments=[attachment]
)
except Exception as e:
print(f"Failed to send Slack alert: {e}")
# Script principal de monitoring
def main():
monitor = SystemMonitor(interval=60)
prometheus_exporter = PrometheusExporter()
alert_manager = AlertManager()
print("System monitoring started...")
while True:
try:
# Collecter les métriques
metrics = monitor.collect_metrics()
# Exporter vers Prometheus
prometheus_exporter.update_metrics(metrics)
# Vérifier les alertes
alerts = monitor.check_alerts(metrics)
# Envoyer les alertes
for alert in alerts:
alert_manager.send_slack_alert(alert)
# Log des métriques
print(f"[{metrics.timestamp}] CPU: {metrics.cpu_percent:.1f}%, "
f"Memory: {metrics.memory_percent:.1f}%, "
f"Load: {metrics.load_average[0]:.2f}")
time.sleep(monitor.interval)
except KeyboardInterrupt:
print("Monitoring stopped.")
break
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(10)
if __name__ == "__main__":
main()
Déploiement et CI/CD
Gestionnaire de Déploiement
# scripts/deployment/deploy_manager.py
import os
import subprocess
import yaml
import docker
import paramiko
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass
from config import Config
@dataclass
class DeploymentConfig:
"""Configuration de déploiement"""
app_name: str
version: str
environment: str
servers: List[str]
docker_image: Optional[str] = None
health_check_url: Optional[str] = None
rollback_on_failure: bool = True
class DockerDeployment:
"""Gestionnaire de déploiement Docker"""
def __init__(self):
self.client = docker.from_env()
def build_image(self, dockerfile_path: str, image_name: str, tag: str) -> str:
"""Construit une image Docker"""
try:
print(f"Building Docker image: {image_name}:{tag}")
image, logs = self.client.images.build(
path=str(Path(dockerfile_path).parent),
dockerfile=Path(dockerfile_path).name,
tag=f"{image_name}:{tag}",
rm=True,
forcerm=True
)
for log in logs:
if 'stream' in log:
print(log['stream'].strip())
print(f"Image built successfully: {image.id}")
return image.id
except docker.errors.BuildError as e:
print(f"Build failed: {e}")
raise
def push_image(self, image_name: str, tag: str, registry: str = None) -> bool:
"""Pousse l'image vers un registry"""
try:
full_name = f"{registry}/{image_name}:{tag}" if registry else f"{image_name}:{tag}"
print(f"Pushing image: {full_name}")
for line in self.client.images.push(full_name, stream=True, decode=True):
if 'status' in line:
print(f"{line['status']}: {line.get('progress', '')}")
print("Image pushed successfully")
return True
except Exception as e:
print(f"Push failed: {e}")
return False
def deploy_container(self, config: DeploymentConfig, server: str) -> bool:
"""Déploie un conteneur sur un serveur"""
try:
# Se connecter au serveur distant
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=server,
username=Config.SSH_USER,
key_filename=Config.SSH_KEY_PATH
)
# Commandes de déploiement
commands = [
f"docker pull {config.docker_image}",
f"docker stop {config.app_name} || true",
f"docker rm {config.app_name} || true",
f"docker run -d --name {config.app_name} "
f"--restart unless-stopped "
f"-p 80:8000 "
f"{config.docker_image}"
]
for command in commands:
print(f"Executing on {server}: {command}")
stdin, stdout, stderr = ssh.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
print(f"Command failed: {error}")
return False
output = stdout.read().decode()
if output:
print(output)
ssh.close()
return True
except Exception as e:
print(f"Deployment failed on {server}: {e}")
return False
class KubernetesDeployment:
"""Gestionnaire de déploiement Kubernetes"""
def __init__(self):
from kubernetes import client, config as k8s_config
try:
k8s_config.load_incluster_config()
except:
k8s_config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
def create_deployment_manifest(self, config: DeploymentConfig) -> Dict:
"""Crée un manifeste de déploiement Kubernetes"""
return {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': {
'name': config.app_name,
'namespace': config.environment,
'labels': {
'app': config.app_name,
'version': config.version,
'environment': config.environment
}
},
'spec': {
'replicas': 3,
'selector': {
'matchLabels': {
'app': config.app_name
}
},
'template': {
'metadata': {
'labels': {
'app': config.app_name,
'version': config.version
}
},
'spec': {
'containers': [{
'name': config.app_name,
'image': config.docker_image,
'ports': [{
'containerPort': 8000
}],
'livenessProbe': {
'httpGet': {
'path': '/health',
'port': 8000
},
'initialDelaySeconds': 30,
'periodSeconds': 10
},
'readinessProbe': {
'httpGet': {
'path': '/ready',
'port': 8000
},
'initialDelaySeconds': 5,
'periodSeconds': 5
},
'resources': {
'requests': {
'cpu': '100m',
'memory': '128Mi'
},
'limits': {
'cpu': '500m',
'memory': '512Mi'
}
}
}]
}
},
'strategy': {
'type': 'RollingUpdate',
'rollingUpdate': {
'maxUnavailable': 1,
'maxSurge': 1
}
}
}
}
def deploy_to_kubernetes(self, config: DeploymentConfig) -> bool:
"""Déploie vers Kubernetes"""
try:
manifest = self.create_deployment_manifest(config)
# Vérifier si le déploiement existe
try:
existing = self.apps_v1.read_namespaced_deployment(
name=config.app_name,
namespace=config.environment
)
# Mettre à jour
self.apps_v1.patch_namespaced_deployment(
name=config.app_name,
namespace=config.environment,
body=manifest
)
print(f"Deployment updated: {config.app_name}")
except:
# Créer nouveau déploiement
self.apps_v1.create_namespaced_deployment(
namespace=config.environment,
body=manifest
)
print(f"Deployment created: {config.app_name}")
# Attendre que le déploiement soit prêt
return self._wait_for_deployment(config)
except Exception as e:
print(f"Kubernetes deployment failed: {e}")
return False
def _wait_for_deployment(self, config: DeploymentConfig, timeout: int = 300) -> bool:
"""Attend que le déploiement soit prêt"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
try:
deployment = self.apps_v1.read_namespaced_deployment(
name=config.app_name,
namespace=config.environment
)
if (deployment.status.ready_replicas == deployment.spec.replicas and
deployment.status.updated_replicas == deployment.spec.replicas):
print(f"Deployment ready: {config.app_name}")
return True
print(f"Waiting for deployment... "
f"{deployment.status.ready_replicas or 0}/{deployment.spec.replicas}")
time.sleep(10)
except Exception as e:
print(f"Error checking deployment status: {e}")
time.sleep(10)
print(f"Deployment timeout: {config.app_name}")
return False
class DeploymentManager:
"""Gestionnaire principal de déploiement"""
def __init__(self):
self.docker_deployment = DockerDeployment()
self.k8s_deployment = KubernetesDeployment()
def deploy(self, config: DeploymentConfig, platform: str = "docker") -> bool:
"""Déploie l'application"""
print(f"Starting deployment: {config.app_name} v{config.version}")
success = False
if platform == "docker":
success = self._deploy_docker(config)
elif platform == "kubernetes":
success = self._deploy_kubernetes(config)
else:
print(f"Unsupported platform: {platform}")
return False
if success:
print(f"Deployment successful: {config.app_name}")
self._post_deployment_checks(config)
else:
print(f"Deployment failed: {config.app_name}")
if config.rollback_on_failure:
self._rollback(config)
return success
def _deploy_docker(self, config: DeploymentConfig) -> bool:
"""Déploiement Docker"""
failed_servers = []
for server in config.servers:
if not self.docker_deployment.deploy_container(config, server):
failed_servers.append(server)
if failed_servers:
print(f"Deployment failed on servers: {failed_servers}")
return False
return True
def _deploy_kubernetes(self, config: DeploymentConfig) -> bool:
"""Déploiement Kubernetes"""
return self.k8s_deployment.deploy_to_kubernetes(config)
def _post_deployment_checks(self, config: DeploymentConfig):
"""Vérifications post-déploiement"""
if not config.health_check_url:
return
import requests
import time
print("Running post-deployment health checks...")
for server in config.servers:
url = f"http://{server}{config.health_check_url}"
for attempt in range(5):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
print(f"Health check passed: {server}")
break
else:
print(f"Health check failed: {server} (HTTP {response.status_code})")
except Exception as e:
print(f"Health check error: {server} - {e}")
if attempt < 4:
time.sleep(10)
def _rollback(self, config: DeploymentConfig):
"""Rollback en cas d'échec"""
print(f"Rolling back deployment: {config.app_name}")
# Implémentation du rollback selon la plateforme
pass
# Exemple d'utilisation
if __name__ == "__main__":
config = DeploymentConfig(
app_name="myapp",
version="1.2.3",
environment="production",
servers=["10.0.1.10", "10.0.1.11"],
docker_image="myregistry/myapp:1.2.3",
health_check_url="/health",
rollback_on_failure=True
)
manager = DeploymentManager()
success = manager.deploy(config, platform="docker")
if success:
print("Deployment completed successfully!")
else:
print("Deployment failed!")
Conclusion
Python offre un écosystème riche pour l'automatisation DevOps :
Avantages Clés
- Syntaxe simple et lisible
- Bibliothèques spécialisées nombreuses
- Intégration native avec les APIs cloud
- Communauté active et documentation riche
Domaines d'Application
- Automatisation d'infrastructure
- Monitoring et alerting
- Déploiement et CI/CD
- Gestion de configuration
Bonnes Pratiques
- Structure de projet claire
- Gestion des secrets sécurisée
- Tests automatisés
- Documentation complète
Les scripts et techniques présentés dans cet article constituent une base solide pour automatiser vos opérations DevOps avec Python. L'investissement dans ces outils se traduit rapidement par des gains de productivité et de fiabilité.
Pour un accompagnement dans l'automatisation de vos processus DevOps avec Python, contactez-moi pour une consultation personnalisée.