Python pour DevOps : Automatisation et Outils d'Infrastructure
Python est devenu l'outil de choix pour l'automatisation DevOps grâce à sa simplicité, sa richesse en bibliothèques et sa capacité d'intégration. Ce guide explore les techniques essentielles pour automatiser vos infrastructures avec Python.
💡Fondamentaux Python pour DevOps
Environnement et Bonnes Pratiques
#!/usr/bin/env python3
# setup_devops_env.py
import os
import subprocess
import sys
from pathlib import Path
class DevOpsEnvironment:
"""Gestionnaire d'environnement DevOps Python"""
def __init__(self, project_name="devops-automation"):
self.project_name = project_name
self.project_path = Path.cwd() / project_name
self.venv_path = self.project_path / "venv"
def create_project_structure(self):
"""Crée la structure de projet DevOps"""
directories = [
"scripts/infrastructure",
"scripts/monitoring",
"scripts/deployment",
"config",
"templates",
"tests",
"logs",
"docs"
]
for directory in directories:
(self.project_path / directory).mkdir(parents=True, exist_ok=True)
# Créer les fichiers de configuration
self._create_config_files()
def _create_config_files(self):
"""Crée les fichiers de configuration essentiels"""
# requirements.txt
requirements = """
boto3>=1.26.0
paramiko>=2.11.0
requests>=2.28.0
pyyaml>=6.0
jinja2>=3.1.0
click>=8.1.0
psutil>=5.9.0
docker>=6.0.0
kubernetes>=24.2.0
prometheus-client>=0.15.0
grafana-api>=1.0.3
slack-sdk>=3.19.0
"""
(self.project_path / "requirements.txt").write_text(requirements.strip())
# .env template
env_template = """
# AWS Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_DEFAULT_REGION=us-east-1
# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=devops
DB_USER=devops_user
DB_PASSWORD=secure_password
# Monitoring
PROMETHEUS_URL=http://localhost:9090
GRAFANA_URL=http://localhost:3000
GRAFANA_API_KEY=your_api_key
# Notifications
SLACK_TOKEN=xoxb-your-slack-token
SLACK_CHANNEL=#devops-alerts
# SSH Configuration
SSH_KEY_PATH=~/.ssh/id_rsa
SSH_USER=ubuntu
"""
(self.project_path / ".env.template").write_text(env_template.strip())
# Configuration Python
config_py = '''
import os
from pathlib import Path
from dotenv import load_dotenv
# Charger les variables d'environnement
load_dotenv()
class Config:
"""Configuration centralisée"""
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
CONFIG_DIR = PROJECT_ROOT / "config"
TEMPLATES_DIR = PROJECT_ROOT / "templates"
# AWS
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
# Database
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 5432))
DB_NAME = os.getenv("DB_NAME", "devops")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
# Monitoring
PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
GRAFANA_URL = os.getenv("GRAFANA_URL", "http://localhost:3000")
GRAFANA_API_KEY = os.getenv("GRAFANA_API_KEY")
# Notifications
SLACK_TOKEN = os.getenv("SLACK_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#devops-alerts")
# SSH
SSH_KEY_PATH = os.path.expanduser(os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"))
SSH_USER = os.getenv("SSH_USER", "ubuntu")
@classmethod
def validate(cls):
"""Valide la configuration"""
required_vars = [
"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
"DB_PASSWORD", "SLACK_TOKEN"
]
missing = [var for var in required_vars if not getattr(cls, var)]
if missing:
raise ValueError(f"Variables manquantes: {', '.join(missing)}")
'''
(self.project_path / "config" / "__init__.py").write_text(config_py)
if __name__ == "__main__":
env = DevOpsEnvironment()
env.create_project_structure()
print(f"Projet DevOps créé dans: {env.project_path}")
💡Automatisation d'Infrastructure
Gestion AWS avec Boto3
# scripts/infrastructure/aws_manager.py
import boto3
import json
import time
from typing import List, Dict, Optional
from botocore.exceptions import ClientError
from config import Config
class AWSInfrastructureManager:
"""Gestionnaire d'infrastructure AWS"""
def __init__(self):
self.session = boto3.Session(
aws_access_key_id=Config.AWS_ACCESS_KEY_ID,
aws_secret_access_key=Config.AWS_SECRET_ACCESS_KEY,
region_name=Config.AWS_DEFAULT_REGION
)
self.ec2 = self.session.client('ec2')
self.elbv2 = self.session.client('elbv2')
self.rds = self.session.client('rds')
self.cloudformation = self.session.client('cloudformation')
def create_vpc_infrastructure(self, vpc_name: str, cidr_block: str = "10.0.0.0/16") -> Dict:
"""Crée une infrastructure VPC complète"""
try:
# Créer le VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=cidr_block,
TagSpecifications=[{
'ResourceType': 'vpc',
'Tags': [{'Key': 'Name', 'Value': vpc_name}]
}]
)
vpc_id = vpc_response['Vpc']['VpcId']
# Attendre que le VPC soit disponible
self.ec2.get_waiter('vpc_available').wait(VpcIds=[vpc_id])
# Créer les subnets
subnets = self._create_subnets(vpc_id, cidr_block)
# Créer Internet Gateway
igw_id = self._create_internet_gateway(vpc_id)
# Créer les tables de routage
route_tables = self._create_route_tables(vpc_id, igw_id, subnets)
# Créer les groupes de sécurité
security_groups = self._create_security_groups(vpc_id)
infrastructure = {
'vpc_id': vpc_id,
'subnets': subnets,
'internet_gateway_id': igw_id,
'route_tables': route_tables,
'security_groups': security_groups
}
print(f"Infrastructure VPC créée: {vpc_name}")
return infrastructure
except ClientError as e:
print(f"Erreur lors de la création du VPC: {e}")
raise
def _create_subnets(self, vpc_id: str, vpc_cidr: str) -> Dict:
"""Crée les subnets publics et privés"""
# Récupérer les zones de disponibilité
azs = self.ec2.describe_availability_zones()['AvailabilityZones']
subnets = {'public': [], 'private': []}
for i, az in enumerate(azs[:2]): # Utiliser 2 AZ
# Subnet public
public_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=f"10.0.{i+1}.0/24",
AvailabilityZone=az['ZoneName'],
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f'public-subnet-{i+1}'},
{'Key': 'Type', 'Value': 'public'}
]
}]
)
subnets['public'].append(public_subnet['Subnet']['SubnetId'])
# Subnet privé
private_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=f"10.0.{i+10}.0/24",
AvailabilityZone=az['ZoneName'],
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f'private-subnet-{i+1}'},
{'Key': 'Type', 'Value': 'private'}
]
}]
)
subnets['private'].append(private_subnet['Subnet']['SubnetId'])
return subnets
def launch_ec2_instances(self, instance_config: Dict) -> List[str]:
"""Lance des instances EC2"""
try:
response = self.ec2.run_instances(
ImageId=instance_config['ami_id'],
MinCount=instance_config.get('min_count', 1),
MaxCount=instance_config.get('max_count', 1),
InstanceType=instance_config.get('instance_type', 't3.micro'),
KeyName=instance_config.get('key_name'),
SecurityGroupIds=instance_config.get('security_groups', []),
SubnetId=instance_config.get('subnet_id'),
UserData=instance_config.get('user_data', ''),
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': instance_config.get('tags', [])
}]
)
instance_ids = [instance['InstanceId'] for instance in response['Instances']]
# Attendre que les instances soient en cours d'exécution
self.ec2.get_waiter('instance_running').wait(InstanceIds=instance_ids)
print(f"Instances lancées: {instance_ids}")
return instance_ids
except ClientError as e:
print(f"Erreur lors du lancement des instances: {e}")
raise
def create_load_balancer(self, lb_config: Dict) -> str:
"""Crée un Application Load Balancer"""
try:
# Créer le load balancer
response = self.elbv2.create_load_balancer(
Name=lb_config['name'],
Subnets=lb_config['subnets'],
SecurityGroups=lb_config.get('security_groups', []),
Scheme=lb_config.get('scheme', 'internet-facing'),
Type='application',
Tags=lb_config.get('tags', [])
)
lb_arn = response['LoadBalancers'][0]['LoadBalancerArn']
# Créer le target group
tg_response = self.elbv2.create_target_group(
Name=f"{lb_config['name']}-tg",
Protocol='HTTP',
Port=80,
VpcId=lb_config['vpc_id'],
HealthCheckPath='/health',
HealthCheckIntervalSeconds=30,
HealthyThresholdCount=2,
UnhealthyThresholdCount=5
)
tg_arn = tg_response['TargetGroups'][0]['TargetGroupArn']
# Créer le listener
self.elbv2.create_listener(
LoadBalancerArn=lb_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[{
'Type': 'forward',
'TargetGroupArn': tg_arn
}]
)
# Enregistrer les instances dans le target group
if 'instance_ids' in lb_config:
targets = [{'Id': instance_id, 'Port': 80}
for instance_id in lb_config['instance_ids']]
self.elbv2.register_targets(
TargetGroupArn=tg_arn,
Targets=targets
)
print(f"Load Balancer créé: {lb_config['name']}")
return lb_arn
except ClientError as e:
print(f"Erreur lors de la création du Load Balancer: {e}")
raise
def deploy_cloudformation_stack(self, stack_name: str, template_path: str, parameters: Dict = None) -> str:
"""Déploie une stack CloudFormation"""
try:
with open(template_path, 'r') as template_file:
template_body = template_file.read()
# Préparer les paramètres
cf_parameters = []
if parameters:
cf_parameters = [
{'ParameterKey': key, 'ParameterValue': str(value)}
for key, value in parameters.items()
]
# Créer ou mettre à jour la stack
try:
response = self.cloudformation.create_stack(
StackName=stack_name,
TemplateBody=template_body,
Parameters=cf_parameters,
Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
)
stack_id = response['StackId']
operation = 'CREATE'
except ClientError as e:
if 'AlreadyExistsException' in str(e):
response = self.cloudformation.update_stack(
StackName=stack_name,
TemplateBody=template_body,
Parameters=cf_parameters,
Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
)
stack_id = response['StackId']
operation = 'UPDATE'
else:
raise
# Attendre la fin de l'opération
waiter_name = f'stack_{operation.lower()}_complete'
waiter = self.cloudformation.get_waiter(waiter_name)
waiter.wait(StackName=stack_name)
print(f"Stack CloudFormation {operation.lower()}d: {stack_name}")
return stack_id
except Exception as e:
print(f"Erreur lors du déploiement CloudFormation: {e}")
raise
# Exemple d'utilisation
if __name__ == "__main__":
aws_manager = AWSInfrastructureManager()
# Créer l'infrastructure VPC
infrastructure = aws_manager.create_vpc_infrastructure("devops-vpc")
# Configuration des instances
instance_config = {
'ami_id': 'ami-0c02fb55956c7d316', # Amazon Linux 2
'instance_type': 't3.micro',
'min_count': 2,
'max_count': 2,
'key_name': 'my-key-pair',
'security_groups': infrastructure['security_groups']['web'],
'subnet_id': infrastructure['subnets']['public'][0],
'tags': [
{'Key': 'Name', 'Value': 'web-server'},
{'Key': 'Environment', 'Value': 'production'}
]
}
# Lancer les instances
instance_ids = aws_manager.launch_ec2_instances(instance_config)
# Créer le load balancer
lb_config = {
'name': 'web-lb',
'subnets': infrastructure['subnets']['public'],
'security_groups': infrastructure['security_groups']['lb'],
'vpc_id': infrastructure['vpc_id'],
'instance_ids': instance_ids
}
lb_arn = aws_manager.create_load_balancer(lb_config)
💡Monitoring et Métriques
Collecteur de Métriques Système
# scripts/monitoring/system_monitor.py
import psutil
import time
import json
import requests
from datetime import datetime
from typing import Dict, List
from dataclasses import dataclass, asdict
from config import Config
@dataclass
class SystemMetrics:
"""Structure des métriques système"""
timestamp: str
hostname: str
cpu_percent: float
memory_percent: float
memory_available: int
disk_usage: Dict[str, float]
network_io: Dict[str, int]
load_average: List[float]
process_count: int
uptime: float
class SystemMonitor:
"""Moniteur système avancé"""
def __init__(self, interval: int = 60):
self.interval = interval
self.hostname = psutil.os.uname().nodename
def collect_metrics(self) -> SystemMetrics:
"""Collecte les métriques système"""
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
# Mémoire
memory = psutil.virtual_memory()
# Disque
disk_usage = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_usage[partition.mountpoint] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': (usage.used / usage.total) * 100
}
except PermissionError:
continue
# Réseau
network = psutil.net_io_counters()
network_io = {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
}
# Load average
load_avg = list(psutil.getloadavg())
# Processus
process_count = len(psutil.pids())
# Uptime
uptime = time.time() - psutil.boot_time()
return SystemMetrics(
timestamp=datetime.now().isoformat(),
hostname=self.hostname,
cpu_percent=cpu_percent,
memory_percent=memory.percent,
memory_available=memory.available,
disk_usage=disk_usage,
network_io=network_io,
load_average=load_avg,
process_count=process_count,
uptime=uptime
)
def get_process_metrics(self) -> List[Dict]:
"""Récupère les métriques des processus"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Trier par utilisation CPU
return sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:10]
def check_alerts(self, metrics: SystemMetrics) -> List[Dict]:
"""Vérifie les seuils d'alerte"""
alerts = []
# CPU
if metrics.cpu_percent > 80:
alerts.append({
'type': 'cpu',
'level': 'warning' if metrics.cpu_percent < 90 else 'critical',
'message': f'CPU usage: {metrics.cpu_percent:.1f}%',
'value': metrics.cpu_percent
})
# Mémoire
if metrics.memory_percent > 85:
alerts.append({
'type': 'memory',
'level': 'warning' if metrics.memory_percent < 95 else 'critical',
'message': f'Memory usage: {metrics.memory_percent:.1f}%',
'value': metrics.memory_percent
})
# Disque
for mount, usage in metrics.disk_usage.items():
if usage['percent'] > 85:
alerts.append({
'type': 'disk',
'level': 'warning' if usage['percent'] < 95 else 'critical',
'message': f'Disk usage {mount}: {usage["percent"]:.1f}%',
'value': usage['percent']
})
# Load average
cpu_count = psutil.cpu_count()
if metrics.load_average[0] > cpu_count * 0.8:
alerts.append({
'type': 'load',
'level': 'warning' if metrics.load_average[0] < cpu_count else 'critical',
'message': f'Load average: {metrics.load_average[0]:.2f}',
'value': metrics.load_average[0]
})
return alerts
class PrometheusExporter:
"""Exporteur de métriques vers Prometheus"""
def __init__(self, port: int = 8000):
from prometheus_client import start_http_server, Gauge, Counter
self.port = port
# Métriques Prometheus
self.cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
self.memory_usage = Gauge('system_memory_usage_percent', 'Memory usage percentage')
self.disk_usage = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['mountpoint'])
self.load_average = Gauge('system_load_average', 'Load average', ['period'])
self.network_bytes = Counter('system_network_bytes_total', 'Network bytes', ['direction'])
# Démarrer le serveur HTTP
start_http_server(self.port)
print(f"Prometheus exporter started on port {self.port}")
def update_metrics(self, metrics: SystemMetrics):
"""Met à jour les métriques Prometheus"""
self.cpu_usage.set(metrics.cpu_percent)
self.memory_usage.set(metrics.memory_percent)
for mount, usage in metrics.disk_usage.items():
self.disk_usage.labels(mountpoint=mount).set(usage['percent'])
for i, load in enumerate(metrics.load_average):
period = ['1m', '5m', '15m'][i]
self.load_average.labels(period=period).set(load)
self.network_bytes.labels(direction='sent').inc(metrics.network_io['bytes_sent'])
self.network_bytes.labels(direction='received').inc(metrics.network_io['bytes_recv'])
class AlertManager:
"""Gestionnaire d'alertes"""
def __init__(self):
self.slack_token = Config.SLACK_TOKEN
self.slack_channel = Config.SLACK_CHANNEL
def send_slack_alert(self, alert: Dict):
"""Envoie une alerte Slack"""
if not self.slack_token:
print(f"Alert: {alert['message']}")
return
from slack_sdk import WebClient
client = WebClient(token=self.slack_token)
color = {
'warning': '#ffcc00',
'critical': '#ff0000'
}.get(alert['level'], '#00ff00')
attachment = {
'color': color,
'title': f"{alert['level'].upper()}: {alert['type'].upper()} Alert",
'text': alert['message'],
'fields': [
{
'title': 'Hostname',
'value': psutil.os.uname().nodename,
'short': True
},
{
'title': 'Timestamp',
'value': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'short': True
}
]
}
try:
client.chat_postMessage(
channel=self.slack_channel,
text=f"System Alert: {alert['message']}",
attachments=[attachment]
)
except Exception as e:
print(f"Failed to send Slack alert: {e}")
# Script principal de monitoring
def main():
monitor = SystemMonitor(interval=60)
prometheus_exporter = PrometheusExporter()
alert_manager = AlertManager()
print("System monitoring started...")
while True:
try:
# Collecter les métriques
metrics = monitor.collect_metrics()
# Exporter vers Prometheus
prometheus_exporter.update_metrics(metrics)
# Vérifier les alertes
alerts = monitor.check_alerts(metrics)
# Envoyer les alertes
for alert in alerts:
alert_manager.send_slack_alert(alert)
# Log des métriques
print(f"[{metrics.timestamp}] CPU: {metrics.cpu_percent:.1f}%, "
f"Memory: {metrics.memory_percent:.1f}%, "
f"Load: {metrics.load_average[0]:.2f}")
time.sleep(monitor.interval)
except KeyboardInterrupt:
print("Monitoring stopped.")
break
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(10)
if __name__ == "__main__":
main()