Practical Guide to Self-Healing Infrastructure with Kubernetes and AI Ops
Practical Guide to Self-Healing Infrastructure with Kubernetes and AI Ops
Introduction
In today’s hyper-scale cloud environments, traditional manual DevOps practices are no longer sustainable. As organizations scale to handle millions of users and complex microservices architectures, the need for intelligent, self-healing infrastructure has become paramount. Kubernetes has already revolutionized container orchestration with its native self-healing capabilities, but the integration of AI/ML (AIOps) is taking this to the next level.
The convergence of Kubernetes and AIOps represents a fundamental shift from reactive to proactive infrastructure management. While Kubernetes provides the foundation with pod restarts, replica sets, and health probes, AIOps adds the intelligence layer that can predict issues before they occur, automatically scale resources based on demand patterns, and implement sophisticated remediation strategies.
This guide explores how to build a comprehensive self-healing infrastructure that combines Kubernetes’ native capabilities with AI-powered monitoring, anomaly detection, and automated remediation. We’ll cover practical implementations, real-world architectures, and the challenges organizations face when adopting these technologies.
Why Manual DevOps is Not Enough
Traditional DevOps practices rely heavily on manual intervention and reactive responses to infrastructure issues. This approach becomes increasingly untenable as systems scale:
The Scale Problem
Modern applications often consist of hundreds or thousands of microservices, each with their own scaling requirements, dependencies, and failure modes. Manual monitoring and intervention simply cannot keep pace with the complexity and velocity of these systems.
The Velocity Problem
In high-velocity development environments, deployments happen multiple times per day, sometimes per hour. Manual oversight of each deployment and the subsequent monitoring is not scalable.
The Complexity Problem
Modern infrastructure spans multiple cloud providers, regions, and technologies. The interdependencies between services create complex failure scenarios that are difficult to predict and resolve manually.
The Cost Problem
Manual infrastructure management requires significant human resources, and the cost of downtime due to human error or delayed response can be astronomical.
The Rise of Self-Healing + AIOps
Self-healing infrastructure represents the evolution from manual, reactive operations to automated, proactive systems. This evolution has several stages:
Level 1: Basic Automation
- Automated deployments
- Basic health checks
- Simple scaling rules
Level 2: Intelligent Monitoring
- Advanced metrics collection
- Pattern recognition
- Predictive analytics
Level 3: Autonomous Operations
- Self-healing systems
- Predictive scaling
- Automated remediation
Level 4: AI-Powered Intelligence
- Machine learning for anomaly detection
- Natural language processing for incident analysis
- Autonomous decision-making
Kubernetes Native Self-Healing
Kubernetes provides several built-in mechanisms for self-healing that form the foundation of any intelligent infrastructure:
Pod Restarts and Replica Sets
Kubernetes automatically restarts failed pods and maintains the desired number of replicas:
apiVersion: apps/v1
kind: Deployment
metadata:
name: web-application
spec:
replicas: 3
selector:
matchLabels:
app: web-application
template:
metadata:
labels:
app: web-application
spec:
containers:
- name: web-app
image: nginx:latest
ports:
- containerPort: 80
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
Liveness and Readiness Probes
Health checks ensure applications are truly healthy, not just running:
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-service
spec:
replicas: 3
selector:
matchLabels:
app: api-service
template:
metadata:
labels:
app: api-service
spec:
containers:
- name: api
image: my-api:latest
ports:
- containerPort: 8080
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
startupProbe:
httpGet:
path: /startup
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 30
Horizontal Pod Autoscaler (HPA)
Automatic scaling based on CPU and memory usage:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Pod Disruption Budgets
Ensure high availability during maintenance:
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: api-service-pdb
spec:
minAvailable: 2
selector:
matchLabels:
app: api-service
Going Beyond with AIOps
While Kubernetes provides excellent foundation-level self-healing, AIOps adds the intelligence layer that enables predictive and proactive operations:
AI-Powered Anomaly Detection
Traditional monitoring relies on static thresholds, but AI can detect complex patterns and anomalies:
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import prometheus_client
from prometheus_client import start_http_server, Gauge
import logging
import json
from datetime import datetime, timedelta
import requests
class KubernetesAnomalyDetector:
def __init__(self, prometheus_url: str, model_path: str = None):
self.prometheus_url = prometheus_url
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=0.1,
random_state=42,
n_estimators=100
)
self.logger = logging.getLogger(__name__)
# Prometheus metrics for anomaly detection
self.anomaly_score = Gauge('k8s_anomaly_score', 'Anomaly score for Kubernetes metrics')
self.anomaly_detected = Gauge('k8s_anomaly_detected', 'Whether anomaly is detected (1) or not (0)')
# Start Prometheus metrics server
start_http_server(8000)
def collect_metrics(self, namespace: str = "default") -> pd.DataFrame:
"""
Collect metrics from Prometheus for anomaly detection.
"""
metrics = {}
# CPU usage
cpu_query = f'avg(rate(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[5m])) by (pod)'
cpu_response = requests.get(f"{self.prometheus_url}/api/v1/query", params={'query': cpu_query})
if cpu_response.status_code == 200:
cpu_data = cpu_response.json()['data']['result']
for result in cpu_data:
pod_name = result['metric']['pod']
cpu_value = float(result['value'][1])
if pod_name not in metrics:
metrics[pod_name] = {}
metrics[pod_name]['cpu_usage'] = cpu_value
# Memory usage
memory_query = f'avg(container_memory_usage_bytes{{namespace="{namespace}"}}) by (pod)'
memory_response = requests.get(f"{self.prometheus_url}/api/v1/query", params={'query': memory_query})
if memory_response.status_code == 200:
memory_data = memory_response.json()['data']['result']
for result in memory_data:
pod_name = result['metric']['pod']
memory_value = float(result['value'][1]) / (1024 * 1024) # Convert to MB
if pod_name in metrics:
metrics[pod_name]['memory_usage_mb'] = memory_value
# Network I/O
network_query = f'sum(rate(container_network_receive_bytes_total{{namespace="{namespace}"}}[5m])) by (pod)'
network_response = requests.get(f"{self.prometheus_url}/api/v1/query", params={'query': network_query})
if network_response.status_code == 200:
network_data = network_response.json()['data']['result']
for result in network_data:
pod_name = result['metric']['pod']
network_value = float(result['value'][1])
if pod_name in metrics:
metrics[pod_name]['network_io'] = network_value
# Error rates
error_query = f'sum(rate(http_requests_total{{namespace="{namespace}", status=~"5.."}}[5m])) by (pod)'
error_response = requests.get(f"{self.prometheus_url}/api/v1/query", params={'query': error_query})
if error_response.status_code == 200:
error_data = error_response.json()['data']['result']
for result in error_data:
pod_name = result['metric']['pod']
error_value = float(result['value'][1])
if pod_name in metrics:
metrics[pod_name]['error_rate'] = error_value
# Convert to DataFrame
df = pd.DataFrame.from_dict(metrics, orient='index')
df = df.fillna(0) # Fill missing values with 0
return df
def train_model(self, training_days: int = 7):
"""
Train the anomaly detection model on historical data.
"""
self.logger.info(f"Training anomaly detection model on {training_days} days of data")
# Collect historical data
end_time = datetime.now()
start_time = end_time - timedelta(days=training_days)
all_data = []
current_time = start_time
while current_time < end_time:
# Simulate collecting data at regular intervals
# In production, you'd query Prometheus for historical data
sample_data = self.collect_metrics()
if not sample_data.empty:
all_data.append(sample_data)
current_time += timedelta(minutes=5)
if all_data:
# Combine all data
combined_data = pd.concat(all_data, ignore_index=True)
# Prepare features
features = combined_data[['cpu_usage', 'memory_usage_mb', 'network_io', 'error_rate']]
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Train model
self.model.fit(features_scaled)
self.logger.info("Anomaly detection model trained successfully")
else:
self.logger.warning("No training data available")
def detect_anomalies(self) -> dict:
"""
Detect anomalies in current Kubernetes metrics.
"""
try:
# Collect current metrics
current_data = self.collect_metrics()
if current_data.empty:
return {"anomalies": [], "message": "No data available"}
# Prepare features
features = current_data[['cpu_usage', 'memory_usage_mb', 'network_io', 'error_rate']]
features_scaled = self.scaler.transform(features)
# Predict anomalies
predictions = self.model.predict(features_scaled)
scores = self.model.decision_function(features_scaled)
# Identify anomalous pods
anomalies = []
for i, (pod_name, prediction, score) in enumerate(zip(current_data.index, predictions, scores)):
is_anomaly = prediction == -1
# Update Prometheus metrics
self.anomaly_score.labels(pod=pod_name).set(score)
self.anomaly_detected.labels(pod=pod_name).set(1 if is_anomaly else 0)
if is_anomaly:
anomaly_info = {
"pod_name": pod_name,
"anomaly_score": float(score),
"metrics": {
"cpu_usage": float(features.iloc[i]['cpu_usage']),
"memory_usage_mb": float(features.iloc[i]['memory_usage_mb']),
"network_io": float(features.iloc[i]['network_io']),
"error_rate": float(features.iloc[i]['error_rate'])
},
"timestamp": datetime.now().isoformat()
}
anomalies.append(anomaly_info)
return {
"anomalies": anomalies,
"total_pods": len(current_data),
"anomalous_pods": len(anomalies),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Error detecting anomalies: {e}")
return {"anomalies": [], "error": str(e)}
# Example usage
def main():
# Initialize detector
detector = KubernetesAnomalyDetector(
prometheus_url="http://prometheus:9090"
)
# Train model (in production, this would be done periodically)
detector.train_model(training_days=7)
# Detect anomalies
results = detector.detect_anomalies()
print("Anomaly Detection Results:")
print(json.dumps(results, indent=2))
# Trigger remediation for anomalies
for anomaly in results['anomalies']:
print(f"Anomaly detected in pod: {anomaly['pod_name']}")
# Here you would trigger remediation actions
if __name__ == "__main__":
main()
Challenges and Considerations
Implementing self-healing infrastructure with AIOps presents several challenges that organizations must address:
False Positives in Anomaly Detection
AI models can generate false positives, leading to unnecessary remediation actions:
Problem: Anomaly detection systems may flag normal behavior as anomalous, causing unnecessary pod restarts or scaling actions.
Solutions:
- Implement confidence thresholds for AI decisions
- Use ensemble methods combining multiple AI models
- Maintain human oversight for critical systems
- Implement gradual rollout of AI decisions
class ConfidenceBasedAnomalyDetector:
def __init__(self, confidence_threshold: float = 0.8):
self.confidence_threshold = confidence_threshold
self.models = [
IsolationForest(contamination=0.1),
LocalOutlierFactor(contamination=0.1),
OneClassSVM()
]
def detect_anomalies_with_confidence(self, data: pd.DataFrame) -> dict:
"""
Detect anomalies with confidence scoring.
"""
predictions = []
scores = []
for model in self.models:
pred = model.fit_predict(data)
score = model.decision_function(data) if hasattr(model, 'decision_function') else None
predictions.append(pred)
scores.append(score)
# Ensemble prediction
ensemble_pred = np.mean(predictions, axis=0)
ensemble_score = np.mean(scores, axis=0) if all(s is not None for s in scores) else None
# Calculate confidence
confidence = self._calculate_confidence(predictions, ensemble_score)
# Only flag as anomaly if confidence is high enough
anomalies = ensemble_pred < 0 and confidence > self.confidence_threshold
return {
"anomalies": anomalies,
"confidence": confidence,
"ensemble_score": ensemble_score,
"should_act": confidence > self.confidence_threshold
}
Balancing Automation vs Manual Overrides
Finding the right balance between automation and human control:
Problem: Too much automation can lead to unexpected behaviors, while too little reduces the benefits of self-healing.
Solutions:
- Implement different automation levels based on system criticality
- Provide manual override capabilities
- Use progressive automation (start with recommendations, move to actions)
- Maintain audit trails of all automated actions
# automation-levels.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: automation-config
namespace: aiops
data:
automation-levels.yaml: |
automation_levels:
critical_systems:
level: "recommendation_only"
requires_approval: true
max_automated_actions: 0
production_systems:
level: "limited_automation"
requires_approval: false
max_automated_actions: 5
allowed_actions:
- "pod_restart"
- "scale_up"
development_systems:
level: "full_automation"
requires_approval: false
max_automated_actions: 50
allowed_actions:
- "pod_restart"
- "scale_up"
- "scale_down"
- "rollback"
- "image_update"
Cost Implications of Predictive Scaling
Predictive scaling can lead to increased costs if not properly managed:
Problem: AI systems might over-predict resource needs, leading to unnecessary costs.
Solutions:
- Implement cost-aware scaling algorithms
- Set maximum scaling limits
- Use spot instances for non-critical workloads
- Monitor and optimize scaling predictions
class CostAwareScaler:
def __init__(self, cost_per_replica: float = 10.0, max_cost: float = 1000.0):
self.cost_per_replica = cost_per_replica
self.max_cost = max_cost
def predict_scaling_with_cost_constraints(self, predictions: list) -> dict:
"""
Predict scaling needs while respecting cost constraints.
"""
# Calculate cost for each prediction
for pred in predictions:
pred['estimated_cost'] = pred['predicted_replicas'] * self.cost_per_replica
# Find optimal scaling that fits within budget
affordable_predictions = []
total_cost = 0
for pred in predictions:
if total_cost + pred['estimated_cost'] <= self.max_cost:
affordable_predictions.append(pred)
total_cost += pred['estimated_cost']
else:
# Scale down to fit budget
max_affordable_replicas = int((self.max_cost - total_cost) / self.cost_per_replica)
if max_affordable_replicas > 0:
pred['predicted_replicas'] = max_affordable_replicas
pred['estimated_cost'] = max_affordable_replicas * self.cost_per_replica
affordable_predictions.append(pred)
break
return {
"predictions": affordable_predictions,
"total_estimated_cost": total_cost,
"budget_utilization": (total_cost / self.max_cost) * 100
}
Future of AIOps
The future of AIOps in Kubernetes environments is moving toward increasingly autonomous and intelligent systems:
Integration with Observability Stacks
Modern observability tools are integrating AI capabilities:
- Distributed Tracing with AI: AI can analyze trace data to identify performance bottlenecks and suggest optimizations
- Log Analysis with NLP: Natural language processing can understand log messages and automatically categorize issues
- Metrics Correlation: AI can correlate metrics from multiple sources to identify root causes
Toward Autonomous Infrastructure
The ultimate goal is fully autonomous infrastructure that can:
- Self-Diagnose: Automatically identify issues without human intervention
- Self-Heal: Implement fixes without manual approval
- Self-Optimize: Continuously improve performance and efficiency
- Self-Secure: Proactively identify and mitigate security threats
Emerging Technologies
Several technologies are accelerating AIOps adoption:
- Edge AI: Running AI models closer to where data is generated
- Federated Learning: Training AI models across distributed environments
- Explainable AI: Making AI decisions transparent and understandable
- Quantum Computing: Potential for more sophisticated AI models
Conclusion
Self-healing infrastructure with Kubernetes and AIOps represents the future of DevOps. By combining Kubernetes’ native capabilities with AI-powered intelligence, organizations can create systems that are not just resilient, but truly intelligent and autonomous.
The key to success is starting small and building incrementally. Begin with basic Kubernetes self-healing features, then gradually add AI capabilities for monitoring, prediction, and automated remediation. Focus on solving real problems rather than implementing technology for its own sake.
As we move toward 2027 and beyond, the organizations that successfully implement AI-powered self-healing infrastructure will gain significant competitive advantages. They’ll be able to deploy faster, more reliably, and more efficiently than their competitors. They’ll have the agility to respond quickly to market changes and the resilience to handle unexpected challenges.
The future of infrastructure is autonomous, intelligent, and human-centered. The question is not whether AI will transform infrastructure management—it’s how quickly and effectively your organization can adapt to this transformation.
Remember, the goal is not to replace humans with AI, but to create powerful partnerships where AI handles routine tasks and humans focus on strategic decisions and innovation. The most successful organizations will be those that can effectively integrate AI into their infrastructure while maintaining human expertise and judgment where it matters most.
Start your journey toward self-healing infrastructure today. The future is waiting.
Predictive Scaling with Machine Learning
AI can predict scaling needs before they occur:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
import joblib
from datetime import datetime, timedelta
import requests
import logging
class PredictiveScaler:
def __init__(self, k8s_api_url: str, namespace: str = "default"):
self.k8s_api_url = k8s_api_url
self.namespace = namespace
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
self.logger = logging.getLogger(__name__)
def collect_historical_data(self, days: int = 30) -> pd.DataFrame:
"""
Collect historical load and scaling data.
"""
# In production, this would query your monitoring system
# For this example, we'll generate synthetic data
dates = pd.date_range(start=datetime.now() - timedelta(days=days),
end=datetime.now(), freq='H')
data = []
for date in dates:
# Simulate realistic load patterns
hour = date.hour
day_of_week = date.weekday()
# Base load with daily and weekly patterns
base_load = 50 + 30 * np.sin(2 * np.pi * hour / 24)
weekly_pattern = 20 if day_of_week < 5 else 10 # Weekday vs weekend
# Add some randomness
noise = np.random.normal(0, 10)
# Calculate predicted load
predicted_load = max(0, base_load + weekly_pattern + noise)
# Calculate required replicas (simplified)
required_replicas = max(1, int(predicted_load / 25))
data.append({
'timestamp': date,
'hour': hour,
'day_of_week': day_of_week,
'is_weekend': 1 if day_of_week >= 5 else 0,
'is_business_hours': 1 if 9 <= hour <= 17 else 0,
'predicted_load': predicted_load,
'required_replicas': required_replicas
})
return pd.DataFrame(data)
def train_model(self, training_days: int = 30):
"""
Train the predictive scaling model.
"""
self.logger.info(f"Training predictive scaling model on {training_days} days of data")
# Collect historical data
data = self.collect_historical_data(training_days)
# Prepare features
features = data[['hour', 'day_of_week', 'is_weekend', 'is_business_hours']]
target = data['required_replicas']
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Train model
self.model.fit(features_scaled, target)
self.logger.info("Predictive scaling model trained successfully")
def predict_scaling_needs(self, hours_ahead: int = 24) -> dict:
"""
Predict scaling needs for the next N hours.
"""
predictions = []
for i in range(hours_ahead):
future_time = datetime.now() + timedelta(hours=i)
features = pd.DataFrame([{
'hour': future_time.hour,
'day_of_week': future_time.weekday(),
'is_weekend': 1 if future_time.weekday() >= 5 else 0,
'is_business_hours': 1 if 9 <= future_time.hour <= 17 else 0
}])
features_scaled = self.scaler.transform(features)
predicted_replicas = self.model.predict(features_scaled)[0]
predictions.append({
'timestamp': future_time.isoformat(),
'predicted_replicas': int(predicted_replicas),
'confidence': 0.85 # In production, calculate actual confidence
})
return {
'predictions': predictions,
'next_scale_up_time': self._find_next_scale_up(predictions),
'next_scale_down_time': self._find_next_scale_down(predictions),
'max_replicas_needed': max(p['predicted_replicas'] for p in predictions)
}
def _find_next_scale_up(self, predictions: list) -> str:
"""Find the next time when scaling up will be needed."""
current_replicas = 3 # Get current replica count from K8s
for pred in predictions:
if pred['predicted_replicas'] > current_replicas:
return pred['timestamp']
return None
def _find_next_scale_down(self, predictions: list) -> str:
"""Find the next time when scaling down will be safe."""
current_replicas = 3 # Get current replica count from K8s
for pred in predictions:
if pred['predicted_replicas'] < current_replicas:
return pred['timestamp']
return None
def apply_predictive_scaling(self, deployment_name: str):
"""
Apply predictive scaling to a Kubernetes deployment.
"""
try:
# Get predictions
predictions = self.predict_scaling_needs(hours_ahead=24)
# Find optimal scaling time
next_scale_up = predictions['next_scale_up_time']
max_replicas = predictions['max_replicas_needed']
if next_scale_up:
# Schedule scaling up
self._schedule_scale_up(deployment_name, max_replicas, next_scale_up)
self.logger.info(f"Scheduled scale up to {max_replicas} replicas at {next_scale_up}")
return predictions
except Exception as e:
self.logger.error(f"Error applying predictive scaling: {e}")
return None
def _schedule_scale_up(self, deployment_name: str, replicas: int, schedule_time: str):
"""
Schedule a scale up operation using Kubernetes CronJob.
"""
# Create a CronJob to scale up at the predicted time
cronjob_manifest = {
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": {
"name": f"{deployment_name}-scale-up-{schedule_time[:10]}",
"namespace": self.namespace
},
"spec": {
"schedule": self._convert_to_cron(schedule_time),
"jobTemplate": {
"spec": {
"template": {
"spec": {
"restartPolicy": "OnFailure",
"containers": [{
"name": "kubectl",
"image": "bitnami/kubectl:latest",
"command": [
"kubectl", "scale", "deployment", deployment_name,
f"--replicas={replicas}", f"-n={self.namespace}"
]
}]
}
}
}
}
}
}
# Apply the CronJob
# In production, use the Kubernetes API client
print(f"Would create CronJob: {cronjob_manifest}")
def _convert_to_cron(self, iso_time: str) -> str:
"""Convert ISO time to cron format."""
dt = datetime.fromisoformat(iso_time.replace('Z', '+00:00'))
return f"{dt.minute} {dt.hour} {dt.day} {dt.month} *"
# Example usage
def main():
scaler = PredictiveScaler(
k8s_api_url="https://kubernetes.default.svc",
namespace="production"
)
# Train the model
scaler.train_model(training_days=30)
# Get predictions
predictions = scaler.predict_scaling_needs(hours_ahead=24)
print("Predictive Scaling Results:")
print(f"Max replicas needed: {predictions['max_replicas_needed']}")
print(f"Next scale up time: {predictions['next_scale_up_time']}")
print(f"Next scale down time: {predictions['next_scale_down_time']}")
# Apply predictive scaling
scaler.apply_predictive_scaling("web-application")
if __name__ == "__main__":
main()
Architecture: Building the Complete System
A comprehensive self-healing infrastructure requires multiple components working together:
System Architecture Overview
# Architecture components
components:
- name: "Kubernetes Cluster"
description: "Core orchestration platform"
- name: "Prometheus + Grafana"
description: "Metrics collection and visualization"
- name: "AI/ML Anomaly Detection"
description: "Intelligent monitoring and prediction"
- name: "Automated Remediation"
description: "Self-healing actions and workflows"
- name: "Argo Workflows"
description: "Orchestration of complex remediation tasks"
- name: "Custom Controllers"
description: "Kubernetes operators for domain-specific logic"
Prometheus Configuration for AIOps
# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_pod_name]
action: replace
target_label: kubernetes_pod_name
- job_name: 'kubernetes-service-endpoints'
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
action: replace
target_label: __scheme__
regex: (https?)
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
Custom Kubernetes Controller for Self-Healing
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import logging
import time
import json
from typing import Dict, List, Optional
import requests
class SelfHealingController:
def __init__(self, namespace: str = "default"):
# Load Kubernetes configuration
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
self.namespace = namespace
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.logger = logging.getLogger(__name__)
# AI service endpoint
self.ai_service_url = "http://ai-anomaly-detector:8000"
def watch_pods(self):
"""
Watch for pod events and trigger healing actions.
"""
w = watch.Watch()
for event in w.stream(self.v1.list_namespaced_pod, namespace=self.namespace):
pod = event['object']
event_type = event['type']
self.logger.info(f"Pod event: {event_type} - {pod.metadata.name}")
if event_type == 'MODIFIED':
self._handle_pod_modified(pod)
elif event_type == 'DELETED':
self._handle_pod_deleted(pod)
def _handle_pod_modified(self, pod):
"""
Handle pod modification events.
"""
# Check if pod is in a failed state
if pod.status.phase == 'Failed':
self.logger.warning(f"Pod {pod.metadata.name} is in Failed state")
self._trigger_healing_action(pod, "pod_failed")
# Check container status
for container in pod.status.container_statuses or []:
if container.state.waiting:
if container.state.waiting.reason in ['CrashLoopBackOff', 'ImagePullBackOff']:
self.logger.warning(f"Container {container.name} in {pod.metadata.name} has issue: {container.state.waiting.reason}")
self._trigger_healing_action(pod, f"container_{container.state.waiting.reason.lower()}")
def _handle_pod_deleted(self, pod):
"""
Handle pod deletion events.
"""
self.logger.info(f"Pod {pod.metadata.name} was deleted")
# Could trigger scaling actions if needed
def _trigger_healing_action(self, pod, issue_type: str):
"""
Trigger appropriate healing action based on the issue.
"""
try:
# Get AI analysis
ai_analysis = self._get_ai_analysis(pod, issue_type)
# Determine healing action
healing_action = self._determine_healing_action(ai_analysis, issue_type)
# Execute healing action
self._execute_healing_action(pod, healing_action)
except Exception as e:
self.logger.error(f"Error triggering healing action: {e}")
def _get_ai_analysis(self, pod, issue_type: str) -> dict:
"""
Get AI analysis for the pod issue.
"""
try:
response = requests.post(f"{self.ai_service_url}/analyze", json={
"pod_name": pod.metadata.name,
"namespace": pod.metadata.namespace,
"issue_type": issue_type,
"pod_status": pod.status.to_dict(),
"labels": pod.metadata.labels
})
if response.status_code == 200:
return response.json()
else:
self.logger.error(f"AI analysis failed: {response.status_code}")
return {}
except Exception as e:
self.logger.error(f"Error getting AI analysis: {e}")
return {}
def _determine_healing_action(self, ai_analysis: dict, issue_type: str) -> dict:
"""
Determine the appropriate healing action based on AI analysis and issue type.
"""
# Default actions based on issue type
default_actions = {
"pod_failed": {
"action": "restart_pod",
"priority": "high"
},
"container_crashloopbackoff": {
"action": "restart_pod",
"priority": "high"
},
"container_imagepullbackoff": {
"action": "check_image_registry",
"priority": "medium"
}
}
# Override with AI recommendations if available
if ai_analysis.get("recommended_action"):
return {
"action": ai_analysis["recommended_action"],
"priority": ai_analysis.get("priority", "medium"),
"confidence": ai_analysis.get("confidence", 0.5)
}
return default_actions.get(issue_type, {"action": "no_action", "priority": "low"})
def _execute_healing_action(self, pod, healing_action: dict):
"""
Execute the determined healing action.
"""
action = healing_action["action"]
priority = healing_action["priority"]
self.logger.info(f"Executing healing action: {action} for pod {pod.metadata.name}")
if action == "restart_pod":
self._restart_pod(pod)
elif action == "check_image_registry":
self._check_image_registry(pod)
elif action == "scale_up":
self._scale_up_deployment(pod)
elif action == "rollback_deployment":
self._rollback_deployment(pod)
else:
self.logger.warning(f"Unknown healing action: {action}")
def _restart_pod(self, pod):
"""
Restart a pod by deleting it (Kubernetes will recreate it).
"""
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace
)
self.logger.info(f"Restarted pod {pod.metadata.name}")
except ApiException as e:
self.logger.error(f"Error restarting pod: {e}")
def _check_image_registry(self, pod):
"""
Check if the image registry is accessible.
"""
# This would implement registry connectivity checks
self.logger.info(f"Checking image registry for pod {pod.metadata.name}")
def _scale_up_deployment(self, pod):
"""
Scale up the deployment if the pod is part of one.
"""
try:
# Find the deployment for this pod
deployment_name = self._get_deployment_name(pod)
if deployment_name:
# Get current replicas
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=pod.metadata.namespace
)
current_replicas = deployment.spec.replicas
new_replicas = current_replicas + 1
# Scale up
deployment.spec.replicas = new_replicas
self.apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=pod.metadata.namespace,
body=deployment
)
self.logger.info(f"Scaled up deployment {deployment_name} to {new_replicas} replicas")
except ApiException as e:
self.logger.error(f"Error scaling up deployment: {e}")
def _rollback_deployment(self, pod):
"""
Rollback the deployment to a previous version.
"""
try:
deployment_name = self._get_deployment_name(pod)
if deployment_name:
# Rollback to previous revision
self.apps_v1.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=pod.metadata.namespace,
body={"spec": {"replicas": 0}}
)
# Wait a moment, then scale back up
time.sleep(5)
self.apps_v1.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=pod.metadata.namespace,
body={"spec": {"replicas": 3}}
)
self.logger.info(f"Rolled back deployment {deployment_name}")
except ApiException as e:
self.logger.error(f"Error rolling back deployment: {e}")
def _get_deployment_name(self, pod) -> Optional[str]:
"""
Get the deployment name for a pod.
"""
if pod.metadata.labels:
for label, value in pod.metadata.labels.items():
if label.startswith('app.kubernetes.io/name') or label == 'app':
return value
return None
# Example usage
def main():
controller = SelfHealingController(namespace="production")
print("Starting self-healing controller...")
print("Watching for pod events...")
try:
controller.watch_pods()
except KeyboardInterrupt:
print("Stopping self-healing controller...")
if __name__ == "__main__":
main()
Join the Discussion
Have thoughts on this article? Share your insights and engage with the community.