CI/CD for Agentic AI: How to Ship Tool-Using Agents Without Breaking Production
You built an agent. It uses tools. It makes decisions. It works in your notebook. You deploy it. It breaks in production.
The agent calls the wrong tool. It times out. It costs too much. It escalates when it shouldn’t. Your users see errors. Your team gets paged at 3 AM.
This article shows how to ship agentic AI systems that don’t break production. We’ll build CI/CD pipelines that treat agents as software systems, not magical black boxes.
The Problem: Agents Are Different
Classic CI/CD assumes: code → tests → build → deploy.
Agentic AI adds: skills, tools, prompts, graphs, policies, memories, evals.
Traditional pipelines test code. They don’t test behavior. They don’t test tool selection. They don’t test multi-step workflows. They don’t test guardrails.
What Goes Wrong
Here’s what happens when you treat agents like regular code:
Silent regressions in tools: You update a tool’s API. The agent still calls it. The tool fails. The agent doesn’t know why. It retries. It fails again. Your pipeline breaks.
Broken workflows: You change a prompt. The agent takes a different path. It skips a step. It calls tools in the wrong order. The workflow fails. You don’t know until production.
New unsafe behaviors: You add a new tool. The agent uses it in ways you didn’t expect. It deletes data. It escalates incorrectly. It costs too much. You find out from users.
Version drift: You deploy a new model version. The agent behaves differently. Same code. Same config. Different behavior. Your tests pass. Production breaks.
The key idea: Agents are software systems with behavior that changes under the same code + config. CI/CD must reflect that.
Model vs Agent vs Workflow: What Exactly Are We Deploying?
Before we build pipelines, we need to define what we’re deploying. There are three layers:
Base Models
These are the foundation. GPT-4, Claude, your company’s internal models. They’re versioned separately. You don’t deploy them. You reference them.
# Model configuration
MODEL_CONFIG = {
"provider": "openai",
"model": "gpt-4-turbo-preview",
"version": "2024-11-20",
"temperature": 0.7,
"max_tokens": 2000
}
Models change. New versions come out. Pricing changes. You need to track which version you’re using. You need to test behavior changes.
Agents
Agents are the decision-makers. Planner, worker, critic, router. They use models. They select tools. They make choices.
from typing import List, Dict, Any
from enum import Enum
class AgentRole(str, Enum):
PLANNER = "planner"
WORKER = "worker"
CRITIC = "critic"
ROUTER = "router"
class Agent:
def __init__(
self,
role: AgentRole,
model_config: Dict[str, Any],
tools: List[str],
version: str
):
self.role = role
self.model_config = model_config
self.tools = tools
self.version = version # Explicit versioning
def run(self, input: str) -> Dict[str, Any]:
# Agent logic here
pass
Agents have versions. They have configs. They have tool lists. They have behavior. You deploy agents. You test agents.
Agentic Workflows
Workflows tie agents together. They’re graphs. State machines. They define the flow. They handle errors. They manage state.
from typing import Callable, Dict, Any
class WorkflowNode:
def __init__(
self,
name: str,
agent: Agent,
condition: Callable[[Dict[str, Any]], bool] = None
):
self.name = name
self.agent = agent
self.condition = condition
class Workflow:
def __init__(
self,
name: str,
nodes: List[WorkflowNode],
edges: List[tuple],
version: str
):
self.name = name
self.nodes = nodes
self.edges = edges
self.version = version # Workflow version
def execute(self, initial_state: Dict[str, Any]) -> Dict[str, Any]:
# Workflow execution logic
current_state = initial_state
current_node = self.nodes[0]
while current_node:
if current_node.condition and not current_node.condition(current_state):
break
result = current_node.agent.run(current_state)
current_state.update(result)
# Find next node based on edges
current_node = self._get_next_node(current_node, current_state)
return current_state
Workflows have versions. They have graphs. They have state. You deploy workflows. You test workflows.
Why Separate Versioning Matters
Version each layer separately:
- Model version: Track which model version you’re using. Test behavior changes.
- Agent version: Track agent code and config. Test agent behavior.
- Workflow version: Track workflow structure. Test workflow paths.
This reduces risk. You can update models without changing agents. You can update agents without changing workflows. You can test each layer independently.
CI for Agentic AI: What to Test Before You Merge
CI runs on every PR. It needs to be fast. It needs to catch problems early. For agents, that means testing behavior, not just code.
Static Checks
Before you run anything, validate structure.
Lint config and workflow graphs:
# tests/test_static_validation.py
import json
from jsonschema import validate, ValidationError
def test_workflow_schema():
"""Validate workflow structure matches schema"""
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"version": {"type": "string"},
"nodes": {
"type": "array",
"items": {
"type": "object",
"required": ["name", "agent"]
}
},
"edges": {
"type": "array",
"items": {
"type": "array",
"minItems": 2,
"maxItems": 2
}
}
},
"required": ["name", "version", "nodes"]
}
workflow = load_workflow("workflows/billing_update.yaml")
validate(instance=workflow, schema=schema)
Tool contracts:
# tests/test_tool_contracts.py
def test_tool_contracts():
"""Validate tool definitions match expected contracts"""
tools = load_tools("tools/")
for tool in tools:
# Required params
assert "name" in tool
assert "description" in tool
assert "parameters" in tool
# Auth scopes
if "auth" in tool:
assert "scopes" in tool["auth"]
assert isinstance(tool["auth"]["scopes"], list)
# Timeouts
if "timeout" in tool:
assert tool["timeout"] > 0
assert tool["timeout"] <= 300 # Max 5 minutes
Config validation:
# tests/test_config_validation.py
def test_agent_configs():
"""Validate agent configs are valid"""
configs = load_agent_configs("configs/agents/")
for config in configs:
# Required fields
assert "version" in config
assert "model" in config
assert "tools" in config
# Valid model
assert config["model"] in ALLOWED_MODELS
# Valid tools
for tool in config["tools"]:
assert tool_exists(tool)
Behavioral Unit Tests
Test agent behavior with fixed inputs and mock tools.
# tests/test_agent_behavior.py
import pytest
from unittest.mock import Mock, patch
from src.agent import Agent, AgentRole
def test_planner_selects_correct_tools():
"""Test planner selects appropriate tools for task"""
# Setup
agent = Agent(
role=AgentRole.PLANNER,
model_config={"model": "gpt-4", "temperature": 0.0},
tools=["search_kb", "create_ticket", "escalate"],
version="1.0.0"
)
# Mock tool calls
mock_tools = {
"search_kb": Mock(return_value={"results": []}),
"create_ticket": Mock(return_value={"ticket_id": "123"}),
"escalate": Mock(return_value={"escalated": True})
}
# Test
result = agent.run("User wants to reset password")
# Assertions
assert "search_kb" in result["tools_called"]
assert "create_ticket" in result["tools_called"]
assert "escalate" not in result["tools_called"]
assert result["plan"]["steps"] == ["search_kb", "create_ticket"]
def test_agent_retry_strategy():
"""Test agent retries on tool failure"""
agent = Agent(
role=AgentRole.WORKER,
model_config={"model": "gpt-4"},
tools=["unreliable_tool"],
version="1.0.0"
)
call_count = 0
def unreliable_tool(*args):
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("Tool failed")
return {"success": True}
agent.tools["unreliable_tool"] = unreliable_tool
result = agent.run("Do something")
assert call_count == 3
assert result["success"] is True
def test_agent_argument_shapes():
"""Test agent passes correct argument shapes to tools"""
agent = Agent(
role=AgentRole.WORKER,
model_config={"model": "gpt-4"},
tools=["update_billing"],
version="1.0.0"
)
captured_args = []
def capture_args(**kwargs):
captured_args.append(kwargs)
return {"updated": True}
agent.tools["update_billing"] = capture_args
agent.run("Update user 123 to plan premium")
assert len(captured_args) > 0
assert "user_id" in captured_args[0]
assert "plan" in captured_args[0]
assert captured_args[0]["user_id"] == "123"
assert captured_args[0]["plan"] == "premium"
Inline Evaluations
Test end-to-end flows with gold test cases.
# tests/test_eval_suite.py
import json
EVAL_CASES = [
{
"name": "update_user_billing_plan",
"input": "Update user 123 to premium plan",
"expected": {
"tools_called": ["get_user", "update_billing"],
"forbidden_tools": ["delete_user", "escalate"],
"max_latency_ms": 5000,
"max_tokens": 5000,
"final_state": {
"user_id": "123",
"plan": "premium"
}
}
},
{
"name": "escalate_complex_issue",
"input": "User reports critical bug in payment system",
"expected": {
"tools_called": ["search_kb", "escalate"],
"forbidden_tools": [],
"max_latency_ms": 10000,
"requires_human": True
}
}
]
def test_eval_suite():
"""Run evaluation suite against agent"""
workflow = load_workflow("workflows/support.yaml")
for case in EVAL_CASES:
result = workflow.execute({"input": case["input"]})
# Check tools called
tools_called = result.get("tools_called", [])
for tool in case["expected"]["tools_called"]:
assert tool in tools_called, f"Expected tool {tool} not called"
# Check forbidden tools
for tool in case["expected"].get("forbidden_tools", []):
assert tool not in tools_called, f"Forbidden tool {tool} was called"
# Check latency
latency = result.get("latency_ms", 0)
assert latency <= case["expected"]["max_latency_ms"], \
f"Latency {latency}ms exceeds max {case['expected']['max_latency_ms']}ms"
# Check token usage
tokens = result.get("tokens_used", 0)
assert tokens <= case["expected"]["max_tokens"], \
f"Token usage {tokens} exceeds max {case['expected']['max_tokens']}"
CI Pipeline Configuration
Here’s a complete CI pipeline:
# .github/workflows/ci.yml
name: CI
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
static_checks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint configs
run: pytest tests/test_static_validation.py -v
- name: Validate tool contracts
run: pytest tests/test_tool_contracts.py -v
- name: Validate configs
run: pytest tests/test_config_validation.py -v
unit_tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run unit tests
run: pytest tests/test_agent_behavior.py -v
eval_suite:
runs-on: ubuntu-latest
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run eval suite
run: pytest tests/test_eval_suite.py -v
- name: Upload eval results
uses: actions/upload-artifact@v3
with:
name: eval-results
path: eval-results.json
CD for Agentic AI: Staged Deployments
Deploying agents is risky. Behavior changes. Costs change. Errors happen. You need staged rollouts.
Shadow / Replay
Mirror traffic to new version. Log outputs. Compare against baseline.
# src/deployment/shadow.py
from typing import Dict, Any, List
import logging
class ShadowDeployment:
def __init__(
self,
baseline_workflow: Workflow,
candidate_workflow: Workflow,
comparison_metrics: List[str]
):
self.baseline = baseline_workflow
self.candidate = candidate_workflow
self.metrics = comparison_metrics
self.logger = logging.getLogger(__name__)
def run_shadow(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""Run both workflows and compare"""
# Run baseline (production)
baseline_result = self.baseline.execute(input)
# Run candidate (shadow)
candidate_result = self.candidate.execute(input)
# Compare
comparison = self._compare_results(
baseline_result,
candidate_result
)
# Log everything
self.logger.info({
"input": input,
"baseline": baseline_result,
"candidate": candidate_result,
"comparison": comparison
})
# Return baseline result (candidate doesn't affect users)
return baseline_result
def _compare_results(
self,
baseline: Dict[str, Any],
candidate: Dict[str, Any]
) -> Dict[str, Any]:
"""Compare baseline and candidate results"""
comparison = {}
for metric in self.metrics:
baseline_val = baseline.get(metric)
candidate_val = candidate.get(metric)
if baseline_val is None or candidate_val is None:
comparison[metric] = "missing"
elif isinstance(baseline_val, (int, float)):
diff = abs(baseline_val - candidate_val)
pct_diff = (diff / baseline_val) * 100 if baseline_val != 0 else 0
comparison[metric] = {
"baseline": baseline_val,
"candidate": candidate_val,
"diff": diff,
"pct_diff": pct_diff
}
else:
comparison[metric] = {
"baseline": baseline_val,
"candidate": candidate_val,
"match": baseline_val == candidate_val
}
return comparison
Canary
Route 1-5% of traffic to new version. Monitor. Rollback if needed.
# src/deployment/canary.py
import random
from typing import Dict, Any
class CanaryDeployment:
def __init__(
self,
baseline_workflow: Workflow,
candidate_workflow: Workflow,
canary_percentage: float = 0.01, # 1%
rollback_conditions: Dict[str, Any] = None
):
self.baseline = baseline_workflow
self.candidate = candidate_workflow
self.canary_pct = canary_percentage
self.rollback_conditions = rollback_conditions or {
"error_rate_threshold": 0.05, # 5%
"latency_threshold_ms": 10000,
"cost_threshold_multiplier": 2.0
}
self.metrics = {
"canary_requests": 0,
"canary_errors": 0,
"canary_latency_sum": 0,
"canary_cost_sum": 0
}
def route(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""Route request to baseline or canary"""
use_canary = random.random() < self.canary_pct
if use_canary:
return self._run_canary(input)
else:
return self.baseline.execute(input)
def _run_canary(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""Run canary and check rollback conditions"""
try:
result = self.candidate.execute(input)
# Track metrics
self.metrics["canary_requests"] += 1
self.metrics["canary_latency_sum"] += result.get("latency_ms", 0)
self.metrics["canary_cost_sum"] += result.get("cost", 0)
# Check rollback conditions
if self._should_rollback():
# Rollback: route to baseline
return self.baseline.execute(input)
return result
except Exception as e:
self.metrics["canary_errors"] += 1
if self._should_rollback():
return self.baseline.execute(input)
raise
def _should_rollback(self) -> bool:
"""Check if canary should rollback"""
if self.metrics["canary_requests"] < 10:
return False # Need minimum samples
error_rate = (
self.metrics["canary_errors"] /
self.metrics["canary_requests"]
)
if error_rate > self.rollback_conditions["error_rate_threshold"]:
return True
avg_latency = (
self.metrics["canary_latency_sum"] /
self.metrics["canary_requests"]
)
if avg_latency > self.rollback_conditions["latency_threshold_ms"]:
return True
return False
Full Rollout
Promote after SLOs hold for a window.
# src/deployment/full_rollout.py
from datetime import datetime, timedelta
from typing import Dict, Any
class FullRollout:
def __init__(
self,
workflow: Workflow,
slo_window_minutes: int = 60,
slo_conditions: Dict[str, Any] = None
):
self.workflow = workflow
self.slo_window = timedelta(minutes=slo_window_minutes)
self.slo_conditions = slo_conditions or {
"error_rate": 0.01, # 1%
"p95_latency_ms": 5000,
"availability": 0.99 # 99%
}
self.start_time = datetime.now()
self.metrics = []
def check_slo(self) -> bool:
"""Check if SLOs are met"""
if datetime.now() - self.start_time < self.slo_window:
return False # Not enough time
# Calculate metrics from window
window_start = datetime.now() - self.slo_window
window_metrics = [
m for m in self.metrics
if m["timestamp"] >= window_start
]
if len(window_metrics) < 100:
return False # Not enough samples
error_rate = sum(
1 for m in window_metrics if m.get("error")
) / len(window_metrics)
if error_rate > self.slo_conditions["error_rate"]:
return False
latencies = [m["latency_ms"] for m in window_metrics]
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
if p95_latency > self.slo_conditions["p95_latency_ms"]:
return False
return True
CD Pipeline Configuration
# .github/workflows/cd.yml
name: CD
on:
push:
branches: [main]
jobs:
test_agents:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run tests
run: pytest tests/ -v
run_eval_suite:
runs-on: ubuntu-latest
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run eval suite
run: pytest tests/test_eval_suite.py -v
deploy_shadow:
needs: [test_agents, run_eval_suite]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy shadow
run: |
python scripts/deploy_shadow.py \
--baseline-version ${{ github.sha }} \
--candidate-version ${{ github.sha }}
promote_canary:
needs: [deploy_shadow]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Promote to canary
run: |
python scripts/promote_canary.py \
--version ${{ github.sha }} \
--percentage 0.01
full_rollout:
needs: [promote_canary]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Full rollout
run: |
python scripts/full_rollout.py \
--version ${{ github.sha }}
Routing Example
Here’s how to route traffic in your API gateway:
# api/routing.py
from flask import Flask, request, jsonify
from src.deployment.canary import CanaryDeployment
from src.workflow import load_workflow
app = Flask(__name__)
# Load workflows
baseline = load_workflow("workflows/support-v1.2.0.yaml")
candidate = load_workflow("workflows/support-v1.3.0.yaml")
# Create canary
canary = CanaryDeployment(
baseline_workflow=baseline,
candidate_workflow=candidate,
canary_percentage=0.01 # 1%
)
@app.route("/api/agent", methods=["POST"])
def handle_request():
input_data = request.json
# Route through canary
result = canary.route(input_data)
return jsonify(result)
Observability Hooks as Part of the Pipeline
You don’t add observability later. You fail the pipeline if observability is missing.
What to Check
Fail the pipeline if:
- Traces are not emitted
- Tool calls aren’t logged
- Cost and latency metrics are missing
- Error rates aren’t tracked
Structured Logging
Log every step with standardized fields.
# src/observability/logging.py
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
class AgentLogger:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
def log_tool_call(
self,
trace_id: str,
agent_version: str,
tool_name: str,
input: Dict[str, Any],
output: Dict[str, Any],
latency_ms: int,
success: bool,
error: Optional[str] = None
):
"""Log tool call with standardized fields"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"trace_id": trace_id,
"agent_version": agent_version,
"event_type": "tool_call",
"tool_name": tool_name,
"input": input,
"output": output,
"latency_ms": latency_ms,
"success": success
}
if error:
log_entry["error"] = error
self.logger.info(json.dumps(log_entry))
def log_workflow_step(
self,
trace_id: str,
workflow_version: str,
step_name: str,
state: Dict[str, Any],
latency_ms: int
):
"""Log workflow step"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"trace_id": trace_id,
"workflow_version": workflow_version,
"event_type": "workflow_step",
"step_name": step_name,
"state": state,
"latency_ms": latency_ms
}
self.logger.info(json.dumps(log_entry))
class JsonFormatter(logging.Formatter):
def format(self, record):
# If already a dict, return as JSON
if isinstance(record.msg, dict):
return json.dumps(record.msg)
return super().format(record)
Metrics Collection
Track success rates, escalations, rollbacks.
# src/observability/metrics.py
from typing import Dict, Any
from collections import defaultdict
import time
class MetricsCollector:
def __init__(self):
self.counters = defaultdict(int)
self.gauges = {}
self.histograms = defaultdict(list)
def increment(self, metric: str, tags: Dict[str, str] = None):
"""Increment counter"""
key = self._make_key(metric, tags)
self.counters[key] += 1
def gauge(self, metric: str, value: float, tags: Dict[str, str] = None):
"""Set gauge value"""
key = self._make_key(metric, tags)
self.gauges[key] = value
def histogram(self, metric: str, value: float, tags: Dict[str, str] = None):
"""Record histogram value"""
key = self._make_key(metric, tags)
self.histograms[key].append(value)
def get_metrics(self) -> Dict[str, Any]:
"""Get all metrics"""
return {
"counters": dict(self.counters),
"gauges": dict(self.gauges),
"histograms": {
k: {
"count": len(v),
"min": min(v),
"max": max(v),
"avg": sum(v) / len(v),
"p95": sorted(v)[int(len(v) * 0.95)] if v else 0
}
for k, v in self.histograms.items()
}
}
def _make_key(self, metric: str, tags: Dict[str, str] = None) -> str:
"""Make metric key with tags"""
if not tags:
return metric
tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
return f"{metric}[{tag_str}]"
# Usage
metrics = MetricsCollector()
def track_agent_execution(agent_version: str, success: bool, latency_ms: int):
"""Track agent execution metrics"""
metrics.increment("agent.executions", {"version": agent_version})
metrics.increment(
"agent.executions",
{"version": agent_version, "status": "success" if success else "error"}
)
metrics.histogram("agent.latency_ms", latency_ms, {"version": agent_version})
CI Validation
Fail CI if observability is missing.
# tests/test_observability.py
import pytest
from src.observability.logging import AgentLogger
from src.observability.metrics import MetricsCollector
def test_traces_emitted():
"""Test that traces are emitted for all tool calls"""
logger = AgentLogger()
metrics = MetricsCollector()
# Run agent
result = run_agent("test input")
# Check logs
logs = get_logs()
assert any(log["event_type"] == "tool_call" for log in logs)
# Check metrics
assert "agent.executions" in metrics.get_metrics()["counters"]
def test_tool_calls_logged():
"""Test that all tool calls are logged"""
logger = AgentLogger()
# Run agent with tools
result = run_agent_with_tools("test input")
# Check logs
logs = get_logs()
tool_calls = [log for log in logs if log["event_type"] == "tool_call"]
assert len(tool_calls) > 0
for call in tool_calls:
assert "trace_id" in call
assert "tool_name" in call
assert "latency_ms" in call
assert "success" in call
def test_cost_metrics_tracked():
"""Test that cost metrics are tracked"""
metrics = MetricsCollector()
# Run agent
result = run_agent("test input")
# Check metrics
metrics_data = metrics.get_metrics()
assert "agent.cost" in metrics_data["gauges"] or \
"agent.tokens" in metrics_data["histograms"]
Governance Hooks: Approvals and Kill Switches
Agents need boundaries. They need approvals. They need kill switches.
RBAC on Tools
Map agent identity to allowed tools.
# config/tool_permissions.yaml
agent_roles:
support_agent:
allowed_tools:
- search_kb
- create_ticket
- get_user_info
forbidden_tools:
- delete_user
- update_billing
- escalate_critical
billing_agent:
allowed_tools:
- get_user_info
- update_billing
- get_payment_history
forbidden_tools:
- delete_user
- escalate_critical
admin_agent:
allowed_tools:
- "*" # All tools
forbidden_tools: []
environments:
production:
write_mode: false # No writes in prod without approval
max_cost_per_request: 1.0
max_latency_ms: 10000
staging:
write_mode: true
max_cost_per_request: 5.0
max_latency_ms: 30000
Environment-Level Safety Toggles
# src/governance/safety.py
import os
from typing import Dict, Any, List
class SafetyToggles:
def __init__(self):
self.env = os.getenv("ENVIRONMENT", "production")
self.config = self._load_config()
def can_write(self, agent_role: str) -> bool:
"""Check if agent can write"""
if not self.config["environments"][self.env]["write_mode"]:
return False
return True
def can_call_tool(self, agent_role: str, tool_name: str) -> bool:
"""Check if agent can call tool"""
role_config = self.config["agent_roles"][agent_role]
# Check forbidden
if tool_name in role_config.get("forbidden_tools", []):
return False
# Check allowed
allowed = role_config.get("allowed_tools", [])
if "*" in allowed:
return True
if tool_name in allowed:
return True
return False
def get_max_cost(self) -> float:
"""Get max cost per request"""
return self.config["environments"][self.env]["max_cost_per_request"]
def get_max_latency(self) -> int:
"""Get max latency in ms"""
return self.config["environments"][self.env]["max_latency_ms"]
Human Approval Steps
For destructive operations, require approval.
# src/governance/approval.py
from typing import Dict, Any, Optional
from enum import Enum
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
class ApprovalWorkflow:
def __init__(self):
self.pending_approvals = {}
def requires_approval(self, action: str) -> bool:
"""Check if action requires approval"""
destructive_actions = [
"delete_user",
"update_billing",
"escalate_critical",
"terraform_apply"
]
return action in destructive_actions
def request_approval(
self,
trace_id: str,
agent_role: str,
action: str,
plan: Dict[str, Any]
) -> str:
"""Request approval for action"""
approval_id = f"approval_{trace_id}_{action}"
self.pending_approvals[approval_id] = {
"trace_id": trace_id,
"agent_role": agent_role,
"action": action,
"plan": plan,
"status": ApprovalStatus.PENDING,
"requested_at": datetime.utcnow()
}
# Send notification to human
send_approval_notification(approval_id, plan)
return approval_id
def check_approval(self, approval_id: str) -> ApprovalStatus:
"""Check approval status"""
approval = self.pending_approvals.get(approval_id)
if not approval:
return ApprovalStatus.REJECTED
return approval["status"]
def approve(self, approval_id: str, approver: str):
"""Approve action"""
approval = self.pending_approvals.get(approval_id)
if approval:
approval["status"] = ApprovalStatus.APPROVED
approval["approver"] = approver
approval["approved_at"] = datetime.utcnow()
def reject(self, approval_id: str, reason: str):
"""Reject action"""
approval = self.pending_approvals.get(approval_id)
if approval:
approval["status"] = ApprovalStatus.REJECTED
approval["rejection_reason"] = reason
# Usage in workflow
def execute_with_approval(workflow, input_data):
"""Execute workflow with approval steps"""
approval_workflow = ApprovalWorkflow()
# Agent proposes plan
plan = workflow.plan(input_data)
# Check if approval needed
if approval_workflow.requires_approval(plan["action"]):
approval_id = approval_workflow.request_approval(
trace_id=plan["trace_id"],
agent_role=plan["agent_role"],
action=plan["action"],
plan=plan
)
# Wait for approval
status = approval_workflow.check_approval(approval_id)
if status != ApprovalStatus.APPROVED:
return {"error": "Action not approved"}
# Execute plan
return workflow.execute(plan)
Checklist and Template
Here’s a checklist for shipping agents safely:
- All agents and workflows are versioned
- Tests cover planner + tool selection
- Eval suite exists and is part of CI
- Shadow + canary deployment paths in CD
- Observability + governance hooked in
- Tool permissions defined and enforced
- Approval workflows for destructive actions
- Cost and latency budgets set
- Rollback procedures documented
- Monitoring and alerting configured
Template Repository Structure
cicd-agentic-ai/
├── agents/
│ ├── planner.py
│ ├── worker.py
│ └── critic.py
├── workflows/
│ ├── support.yaml
│ └── billing.yaml
├── tools/
│ ├── search_kb.py
│ ├── create_ticket.py
│ └── update_billing.py
├── config/
│ ├── agents/
│ │ └── support_agent.yaml
│ ├── workflows/
│ │ └── support.yaml
│ └── tool_permissions.yaml
├── tests/
│ ├── test_static_validation.py
│ ├── test_agent_behavior.py
│ ├── test_eval_suite.py
│ └── test_observability.py
├── src/
│ ├── agent.py
│ ├── workflow.py
│ ├── deployment/
│ │ ├── shadow.py
│ │ ├── canary.py
│ │ └── full_rollout.py
│ ├── observability/
│ │ ├── logging.py
│ │ └── metrics.py
│ └── governance/
│ ├── safety.py
│ └── approval.py
├── .github/
│ └── workflows/
│ ├── ci.yml
│ └── cd.yml
└── README.md
Conclusion
Agents are software systems. They need CI/CD like any other system. But they’re different. They have behavior that changes. They have tools. They have workflows. They need special treatment.
The key is to:
- Version each layer separately
- Test behavior, not just code
- Deploy in stages
- Observe everything
- Govern with boundaries
Start with static checks. Add behavioral tests. Build eval suites. Deploy with shadow and canary. Add observability from day one. Enforce governance.
Your agents will break. But with proper CI/CD, you’ll catch problems before production. You’ll deploy safely. You’ll sleep better.
The code examples in this article are available in the repository. Use them as a starting point. Adapt them to your needs. Build pipelines you can trust.
Discussion
Loading comments...