By Ali Elborey

CI/CD for Agentic AI: How to Ship Tool-Using Agents Without Breaking Production

ai-agentscicddevopslanggraphmlopsproductiontestingdeploymentobservability

You built an agent. It uses tools. It makes decisions. It works in your notebook. You deploy it. It breaks in production.

The agent calls the wrong tool. It times out. It costs too much. It escalates when it shouldn’t. Your users see errors. Your team gets paged at 3 AM.

This article shows how to ship agentic AI systems that don’t break production. We’ll build CI/CD pipelines that treat agents as software systems, not magical black boxes.

The Problem: Agents Are Different

Classic CI/CD assumes: code → tests → build → deploy.

Agentic AI adds: skills, tools, prompts, graphs, policies, memories, evals.

Traditional pipelines test code. They don’t test behavior. They don’t test tool selection. They don’t test multi-step workflows. They don’t test guardrails.

What Goes Wrong

Here’s what happens when you treat agents like regular code:

Silent regressions in tools: You update a tool’s API. The agent still calls it. The tool fails. The agent doesn’t know why. It retries. It fails again. Your pipeline breaks.

Broken workflows: You change a prompt. The agent takes a different path. It skips a step. It calls tools in the wrong order. The workflow fails. You don’t know until production.

New unsafe behaviors: You add a new tool. The agent uses it in ways you didn’t expect. It deletes data. It escalates incorrectly. It costs too much. You find out from users.

Version drift: You deploy a new model version. The agent behaves differently. Same code. Same config. Different behavior. Your tests pass. Production breaks.

The key idea: Agents are software systems with behavior that changes under the same code + config. CI/CD must reflect that.

Model vs Agent vs Workflow: What Exactly Are We Deploying?

Before we build pipelines, we need to define what we’re deploying. There are three layers:

Base Models

These are the foundation. GPT-4, Claude, your company’s internal models. They’re versioned separately. You don’t deploy them. You reference them.

# Model configuration
MODEL_CONFIG = {
    "provider": "openai",
    "model": "gpt-4-turbo-preview",
    "version": "2024-11-20",
    "temperature": 0.7,
    "max_tokens": 2000
}

Models change. New versions come out. Pricing changes. You need to track which version you’re using. You need to test behavior changes.

Agents

Agents are the decision-makers. Planner, worker, critic, router. They use models. They select tools. They make choices.

from typing import List, Dict, Any
from enum import Enum

class AgentRole(str, Enum):
    PLANNER = "planner"
    WORKER = "worker"
    CRITIC = "critic"
    ROUTER = "router"

class Agent:
    def __init__(
        self,
        role: AgentRole,
        model_config: Dict[str, Any],
        tools: List[str],
        version: str
    ):
        self.role = role
        self.model_config = model_config
        self.tools = tools
        self.version = version  # Explicit versioning
    
    def run(self, input: str) -> Dict[str, Any]:
        # Agent logic here
        pass

Agents have versions. They have configs. They have tool lists. They have behavior. You deploy agents. You test agents.

Agentic Workflows

Workflows tie agents together. They’re graphs. State machines. They define the flow. They handle errors. They manage state.

from typing import Callable, Dict, Any

class WorkflowNode:
    def __init__(
        self,
        name: str,
        agent: Agent,
        condition: Callable[[Dict[str, Any]], bool] = None
    ):
        self.name = name
        self.agent = agent
        self.condition = condition

class Workflow:
    def __init__(
        self,
        name: str,
        nodes: List[WorkflowNode],
        edges: List[tuple],
        version: str
    ):
        self.name = name
        self.nodes = nodes
        self.edges = edges
        self.version = version  # Workflow version
    
    def execute(self, initial_state: Dict[str, Any]) -> Dict[str, Any]:
        # Workflow execution logic
        current_state = initial_state
        current_node = self.nodes[0]
        
        while current_node:
            if current_node.condition and not current_node.condition(current_state):
                break
            
            result = current_node.agent.run(current_state)
            current_state.update(result)
            
            # Find next node based on edges
            current_node = self._get_next_node(current_node, current_state)
        
        return current_state

Workflows have versions. They have graphs. They have state. You deploy workflows. You test workflows.

Why Separate Versioning Matters

Version each layer separately:

  • Model version: Track which model version you’re using. Test behavior changes.
  • Agent version: Track agent code and config. Test agent behavior.
  • Workflow version: Track workflow structure. Test workflow paths.

This reduces risk. You can update models without changing agents. You can update agents without changing workflows. You can test each layer independently.

CI for Agentic AI: What to Test Before You Merge

CI runs on every PR. It needs to be fast. It needs to catch problems early. For agents, that means testing behavior, not just code.

Static Checks

Before you run anything, validate structure.

Lint config and workflow graphs:

# tests/test_static_validation.py
import json
from jsonschema import validate, ValidationError

def test_workflow_schema():
    """Validate workflow structure matches schema"""
    schema = {
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "version": {"type": "string"},
            "nodes": {
                "type": "array",
                "items": {
                    "type": "object",
                    "required": ["name", "agent"]
                }
            },
            "edges": {
                "type": "array",
                "items": {
                    "type": "array",
                    "minItems": 2,
                    "maxItems": 2
                }
            }
        },
        "required": ["name", "version", "nodes"]
    }
    
    workflow = load_workflow("workflows/billing_update.yaml")
    validate(instance=workflow, schema=schema)

Tool contracts:

# tests/test_tool_contracts.py
def test_tool_contracts():
    """Validate tool definitions match expected contracts"""
    tools = load_tools("tools/")
    
    for tool in tools:
        # Required params
        assert "name" in tool
        assert "description" in tool
        assert "parameters" in tool
        
        # Auth scopes
        if "auth" in tool:
            assert "scopes" in tool["auth"]
            assert isinstance(tool["auth"]["scopes"], list)
        
        # Timeouts
        if "timeout" in tool:
            assert tool["timeout"] > 0
            assert tool["timeout"] <= 300  # Max 5 minutes

Config validation:

# tests/test_config_validation.py
def test_agent_configs():
    """Validate agent configs are valid"""
    configs = load_agent_configs("configs/agents/")
    
    for config in configs:
        # Required fields
        assert "version" in config
        assert "model" in config
        assert "tools" in config
        
        # Valid model
        assert config["model"] in ALLOWED_MODELS
        
        # Valid tools
        for tool in config["tools"]:
            assert tool_exists(tool)

Behavioral Unit Tests

Test agent behavior with fixed inputs and mock tools.

# tests/test_agent_behavior.py
import pytest
from unittest.mock import Mock, patch
from src.agent import Agent, AgentRole

def test_planner_selects_correct_tools():
    """Test planner selects appropriate tools for task"""
    # Setup
    agent = Agent(
        role=AgentRole.PLANNER,
        model_config={"model": "gpt-4", "temperature": 0.0},
        tools=["search_kb", "create_ticket", "escalate"],
        version="1.0.0"
    )
    
    # Mock tool calls
    mock_tools = {
        "search_kb": Mock(return_value={"results": []}),
        "create_ticket": Mock(return_value={"ticket_id": "123"}),
        "escalate": Mock(return_value={"escalated": True})
    }
    
    # Test
    result = agent.run("User wants to reset password")
    
    # Assertions
    assert "search_kb" in result["tools_called"]
    assert "create_ticket" in result["tools_called"]
    assert "escalate" not in result["tools_called"]
    assert result["plan"]["steps"] == ["search_kb", "create_ticket"]

def test_agent_retry_strategy():
    """Test agent retries on tool failure"""
    agent = Agent(
        role=AgentRole.WORKER,
        model_config={"model": "gpt-4"},
        tools=["unreliable_tool"],
        version="1.0.0"
    )
    
    call_count = 0
    def unreliable_tool(*args):
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise Exception("Tool failed")
        return {"success": True}
    
    agent.tools["unreliable_tool"] = unreliable_tool
    
    result = agent.run("Do something")
    
    assert call_count == 3
    assert result["success"] is True

def test_agent_argument_shapes():
    """Test agent passes correct argument shapes to tools"""
    agent = Agent(
        role=AgentRole.WORKER,
        model_config={"model": "gpt-4"},
        tools=["update_billing"],
        version="1.0.0"
    )
    
    captured_args = []
    def capture_args(**kwargs):
        captured_args.append(kwargs)
        return {"updated": True}
    
    agent.tools["update_billing"] = capture_args
    
    agent.run("Update user 123 to plan premium")
    
    assert len(captured_args) > 0
    assert "user_id" in captured_args[0]
    assert "plan" in captured_args[0]
    assert captured_args[0]["user_id"] == "123"
    assert captured_args[0]["plan"] == "premium"

Inline Evaluations

Test end-to-end flows with gold test cases.

# tests/test_eval_suite.py
import json

EVAL_CASES = [
    {
        "name": "update_user_billing_plan",
        "input": "Update user 123 to premium plan",
        "expected": {
            "tools_called": ["get_user", "update_billing"],
            "forbidden_tools": ["delete_user", "escalate"],
            "max_latency_ms": 5000,
            "max_tokens": 5000,
            "final_state": {
                "user_id": "123",
                "plan": "premium"
            }
        }
    },
    {
        "name": "escalate_complex_issue",
        "input": "User reports critical bug in payment system",
        "expected": {
            "tools_called": ["search_kb", "escalate"],
            "forbidden_tools": [],
            "max_latency_ms": 10000,
            "requires_human": True
        }
    }
]

def test_eval_suite():
    """Run evaluation suite against agent"""
    workflow = load_workflow("workflows/support.yaml")
    
    for case in EVAL_CASES:
        result = workflow.execute({"input": case["input"]})
        
        # Check tools called
        tools_called = result.get("tools_called", [])
        for tool in case["expected"]["tools_called"]:
            assert tool in tools_called, f"Expected tool {tool} not called"
        
        # Check forbidden tools
        for tool in case["expected"].get("forbidden_tools", []):
            assert tool not in tools_called, f"Forbidden tool {tool} was called"
        
        # Check latency
        latency = result.get("latency_ms", 0)
        assert latency <= case["expected"]["max_latency_ms"], \
            f"Latency {latency}ms exceeds max {case['expected']['max_latency_ms']}ms"
        
        # Check token usage
        tokens = result.get("tokens_used", 0)
        assert tokens <= case["expected"]["max_tokens"], \
            f"Token usage {tokens} exceeds max {case['expected']['max_tokens']}"

CI Pipeline Configuration

Here’s a complete CI pipeline:

# .github/workflows/ci.yml
name: CI

on:
  pull_request:
    branches: [main]
  push:
    branches: [main]

jobs:
  static_checks:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      - name: Lint configs
        run: pytest tests/test_static_validation.py -v
      - name: Validate tool contracts
        run: pytest tests/test_tool_contracts.py -v
      - name: Validate configs
        run: pytest tests/test_config_validation.py -v

  unit_tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      - name: Run unit tests
        run: pytest tests/test_agent_behavior.py -v

  eval_suite:
    runs-on: ubuntu-latest
    env:
      OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      - name: Run eval suite
        run: pytest tests/test_eval_suite.py -v
      - name: Upload eval results
        uses: actions/upload-artifact@v3
        with:
          name: eval-results
          path: eval-results.json

CD for Agentic AI: Staged Deployments

Deploying agents is risky. Behavior changes. Costs change. Errors happen. You need staged rollouts.

Shadow / Replay

Mirror traffic to new version. Log outputs. Compare against baseline.

# src/deployment/shadow.py
from typing import Dict, Any, List
import logging

class ShadowDeployment:
    def __init__(
        self,
        baseline_workflow: Workflow,
        candidate_workflow: Workflow,
        comparison_metrics: List[str]
    ):
        self.baseline = baseline_workflow
        self.candidate = candidate_workflow
        self.metrics = comparison_metrics
        self.logger = logging.getLogger(__name__)
    
    def run_shadow(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Run both workflows and compare"""
        # Run baseline (production)
        baseline_result = self.baseline.execute(input)
        
        # Run candidate (shadow)
        candidate_result = self.candidate.execute(input)
        
        # Compare
        comparison = self._compare_results(
            baseline_result,
            candidate_result
        )
        
        # Log everything
        self.logger.info({
            "input": input,
            "baseline": baseline_result,
            "candidate": candidate_result,
            "comparison": comparison
        })
        
        # Return baseline result (candidate doesn't affect users)
        return baseline_result
    
    def _compare_results(
        self,
        baseline: Dict[str, Any],
        candidate: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Compare baseline and candidate results"""
        comparison = {}
        
        for metric in self.metrics:
            baseline_val = baseline.get(metric)
            candidate_val = candidate.get(metric)
            
            if baseline_val is None or candidate_val is None:
                comparison[metric] = "missing"
            elif isinstance(baseline_val, (int, float)):
                diff = abs(baseline_val - candidate_val)
                pct_diff = (diff / baseline_val) * 100 if baseline_val != 0 else 0
                comparison[metric] = {
                    "baseline": baseline_val,
                    "candidate": candidate_val,
                    "diff": diff,
                    "pct_diff": pct_diff
                }
            else:
                comparison[metric] = {
                    "baseline": baseline_val,
                    "candidate": candidate_val,
                    "match": baseline_val == candidate_val
                }
        
        return comparison

Canary

Route 1-5% of traffic to new version. Monitor. Rollback if needed.

# src/deployment/canary.py
import random
from typing import Dict, Any

class CanaryDeployment:
    def __init__(
        self,
        baseline_workflow: Workflow,
        candidate_workflow: Workflow,
        canary_percentage: float = 0.01,  # 1%
        rollback_conditions: Dict[str, Any] = None
    ):
        self.baseline = baseline_workflow
        self.candidate = candidate_workflow
        self.canary_pct = canary_percentage
        self.rollback_conditions = rollback_conditions or {
            "error_rate_threshold": 0.05,  # 5%
            "latency_threshold_ms": 10000,
            "cost_threshold_multiplier": 2.0
        }
        self.metrics = {
            "canary_requests": 0,
            "canary_errors": 0,
            "canary_latency_sum": 0,
            "canary_cost_sum": 0
        }
    
    def route(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Route request to baseline or canary"""
        use_canary = random.random() < self.canary_pct
        
        if use_canary:
            return self._run_canary(input)
        else:
            return self.baseline.execute(input)
    
    def _run_canary(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Run canary and check rollback conditions"""
        try:
            result = self.candidate.execute(input)
            
            # Track metrics
            self.metrics["canary_requests"] += 1
            self.metrics["canary_latency_sum"] += result.get("latency_ms", 0)
            self.metrics["canary_cost_sum"] += result.get("cost", 0)
            
            # Check rollback conditions
            if self._should_rollback():
                # Rollback: route to baseline
                return self.baseline.execute(input)
            
            return result
        except Exception as e:
            self.metrics["canary_errors"] += 1
            
            if self._should_rollback():
                return self.baseline.execute(input)
            
            raise
    
    def _should_rollback(self) -> bool:
        """Check if canary should rollback"""
        if self.metrics["canary_requests"] < 10:
            return False  # Need minimum samples
        
        error_rate = (
            self.metrics["canary_errors"] / 
            self.metrics["canary_requests"]
        )
        if error_rate > self.rollback_conditions["error_rate_threshold"]:
            return True
        
        avg_latency = (
            self.metrics["canary_latency_sum"] / 
            self.metrics["canary_requests"]
        )
        if avg_latency > self.rollback_conditions["latency_threshold_ms"]:
            return True
        
        return False

Full Rollout

Promote after SLOs hold for a window.

# src/deployment/full_rollout.py
from datetime import datetime, timedelta
from typing import Dict, Any

class FullRollout:
    def __init__(
        self,
        workflow: Workflow,
        slo_window_minutes: int = 60,
        slo_conditions: Dict[str, Any] = None
    ):
        self.workflow = workflow
        self.slo_window = timedelta(minutes=slo_window_minutes)
        self.slo_conditions = slo_conditions or {
            "error_rate": 0.01,  # 1%
            "p95_latency_ms": 5000,
            "availability": 0.99  # 99%
        }
        self.start_time = datetime.now()
        self.metrics = []
    
    def check_slo(self) -> bool:
        """Check if SLOs are met"""
        if datetime.now() - self.start_time < self.slo_window:
            return False  # Not enough time
        
        # Calculate metrics from window
        window_start = datetime.now() - self.slo_window
        window_metrics = [
            m for m in self.metrics 
            if m["timestamp"] >= window_start
        ]
        
        if len(window_metrics) < 100:
            return False  # Not enough samples
        
        error_rate = sum(
            1 for m in window_metrics if m.get("error")
        ) / len(window_metrics)
        
        if error_rate > self.slo_conditions["error_rate"]:
            return False
        
        latencies = [m["latency_ms"] for m in window_metrics]
        p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
        
        if p95_latency > self.slo_conditions["p95_latency_ms"]:
            return False
        
        return True

CD Pipeline Configuration

# .github/workflows/cd.yml
name: CD

on:
  push:
    branches: [main]

jobs:
  test_agents:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      - name: Run tests
        run: pytest tests/ -v

  run_eval_suite:
    runs-on: ubuntu-latest
    env:
      OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      - name: Run eval suite
        run: pytest tests/test_eval_suite.py -v

  deploy_shadow:
    needs: [test_agents, run_eval_suite]
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Deploy shadow
        run: |
          python scripts/deploy_shadow.py \
            --baseline-version ${{ github.sha }} \
            --candidate-version ${{ github.sha }}

  promote_canary:
    needs: [deploy_shadow]
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v3
      - name: Promote to canary
        run: |
          python scripts/promote_canary.py \
            --version ${{ github.sha }} \
            --percentage 0.01

  full_rollout:
    needs: [promote_canary]
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v3
      - name: Full rollout
        run: |
          python scripts/full_rollout.py \
            --version ${{ github.sha }}

Routing Example

Here’s how to route traffic in your API gateway:

# api/routing.py
from flask import Flask, request, jsonify
from src.deployment.canary import CanaryDeployment
from src.workflow import load_workflow

app = Flask(__name__)

# Load workflows
baseline = load_workflow("workflows/support-v1.2.0.yaml")
candidate = load_workflow("workflows/support-v1.3.0.yaml")

# Create canary
canary = CanaryDeployment(
    baseline_workflow=baseline,
    candidate_workflow=candidate,
    canary_percentage=0.01  # 1%
)

@app.route("/api/agent", methods=["POST"])
def handle_request():
    input_data = request.json
    
    # Route through canary
    result = canary.route(input_data)
    
    return jsonify(result)

Observability Hooks as Part of the Pipeline

You don’t add observability later. You fail the pipeline if observability is missing.

What to Check

Fail the pipeline if:

  • Traces are not emitted
  • Tool calls aren’t logged
  • Cost and latency metrics are missing
  • Error rates aren’t tracked

Structured Logging

Log every step with standardized fields.

# src/observability/logging.py
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional

class AgentLogger:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        # JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(JsonFormatter())
        self.logger.addHandler(handler)
    
    def log_tool_call(
        self,
        trace_id: str,
        agent_version: str,
        tool_name: str,
        input: Dict[str, Any],
        output: Dict[str, Any],
        latency_ms: int,
        success: bool,
        error: Optional[str] = None
    ):
        """Log tool call with standardized fields"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "trace_id": trace_id,
            "agent_version": agent_version,
            "event_type": "tool_call",
            "tool_name": tool_name,
            "input": input,
            "output": output,
            "latency_ms": latency_ms,
            "success": success
        }
        
        if error:
            log_entry["error"] = error
        
        self.logger.info(json.dumps(log_entry))
    
    def log_workflow_step(
        self,
        trace_id: str,
        workflow_version: str,
        step_name: str,
        state: Dict[str, Any],
        latency_ms: int
    ):
        """Log workflow step"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "trace_id": trace_id,
            "workflow_version": workflow_version,
            "event_type": "workflow_step",
            "step_name": step_name,
            "state": state,
            "latency_ms": latency_ms
        }
        
        self.logger.info(json.dumps(log_entry))

class JsonFormatter(logging.Formatter):
    def format(self, record):
        # If already a dict, return as JSON
        if isinstance(record.msg, dict):
            return json.dumps(record.msg)
        return super().format(record)

Metrics Collection

Track success rates, escalations, rollbacks.

# src/observability/metrics.py
from typing import Dict, Any
from collections import defaultdict
import time

class MetricsCollector:
    def __init__(self):
        self.counters = defaultdict(int)
        self.gauges = {}
        self.histograms = defaultdict(list)
    
    def increment(self, metric: str, tags: Dict[str, str] = None):
        """Increment counter"""
        key = self._make_key(metric, tags)
        self.counters[key] += 1
    
    def gauge(self, metric: str, value: float, tags: Dict[str, str] = None):
        """Set gauge value"""
        key = self._make_key(metric, tags)
        self.gauges[key] = value
    
    def histogram(self, metric: str, value: float, tags: Dict[str, str] = None):
        """Record histogram value"""
        key = self._make_key(metric, tags)
        self.histograms[key].append(value)
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get all metrics"""
        return {
            "counters": dict(self.counters),
            "gauges": dict(self.gauges),
            "histograms": {
                k: {
                    "count": len(v),
                    "min": min(v),
                    "max": max(v),
                    "avg": sum(v) / len(v),
                    "p95": sorted(v)[int(len(v) * 0.95)] if v else 0
                }
                for k, v in self.histograms.items()
            }
        }
    
    def _make_key(self, metric: str, tags: Dict[str, str] = None) -> str:
        """Make metric key with tags"""
        if not tags:
            return metric
        tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
        return f"{metric}[{tag_str}]"

# Usage
metrics = MetricsCollector()

def track_agent_execution(agent_version: str, success: bool, latency_ms: int):
    """Track agent execution metrics"""
    metrics.increment("agent.executions", {"version": agent_version})
    metrics.increment(
        "agent.executions",
        {"version": agent_version, "status": "success" if success else "error"}
    )
    metrics.histogram("agent.latency_ms", latency_ms, {"version": agent_version})

CI Validation

Fail CI if observability is missing.

# tests/test_observability.py
import pytest
from src.observability.logging import AgentLogger
from src.observability.metrics import MetricsCollector

def test_traces_emitted():
    """Test that traces are emitted for all tool calls"""
    logger = AgentLogger()
    metrics = MetricsCollector()
    
    # Run agent
    result = run_agent("test input")
    
    # Check logs
    logs = get_logs()
    assert any(log["event_type"] == "tool_call" for log in logs)
    
    # Check metrics
    assert "agent.executions" in metrics.get_metrics()["counters"]

def test_tool_calls_logged():
    """Test that all tool calls are logged"""
    logger = AgentLogger()
    
    # Run agent with tools
    result = run_agent_with_tools("test input")
    
    # Check logs
    logs = get_logs()
    tool_calls = [log for log in logs if log["event_type"] == "tool_call"]
    
    assert len(tool_calls) > 0
    for call in tool_calls:
        assert "trace_id" in call
        assert "tool_name" in call
        assert "latency_ms" in call
        assert "success" in call

def test_cost_metrics_tracked():
    """Test that cost metrics are tracked"""
    metrics = MetricsCollector()
    
    # Run agent
    result = run_agent("test input")
    
    # Check metrics
    metrics_data = metrics.get_metrics()
    assert "agent.cost" in metrics_data["gauges"] or \
           "agent.tokens" in metrics_data["histograms"]

Governance Hooks: Approvals and Kill Switches

Agents need boundaries. They need approvals. They need kill switches.

RBAC on Tools

Map agent identity to allowed tools.

# config/tool_permissions.yaml
agent_roles:
  support_agent:
    allowed_tools:
      - search_kb
      - create_ticket
      - get_user_info
    forbidden_tools:
      - delete_user
      - update_billing
      - escalate_critical
  
  billing_agent:
    allowed_tools:
      - get_user_info
      - update_billing
      - get_payment_history
    forbidden_tools:
      - delete_user
      - escalate_critical
  
  admin_agent:
    allowed_tools:
      - "*"  # All tools
    forbidden_tools: []

environments:
  production:
    write_mode: false  # No writes in prod without approval
    max_cost_per_request: 1.0
    max_latency_ms: 10000
  
  staging:
    write_mode: true
    max_cost_per_request: 5.0
    max_latency_ms: 30000

Environment-Level Safety Toggles

# src/governance/safety.py
import os
from typing import Dict, Any, List

class SafetyToggles:
    def __init__(self):
        self.env = os.getenv("ENVIRONMENT", "production")
        self.config = self._load_config()
    
    def can_write(self, agent_role: str) -> bool:
        """Check if agent can write"""
        if not self.config["environments"][self.env]["write_mode"]:
            return False
        return True
    
    def can_call_tool(self, agent_role: str, tool_name: str) -> bool:
        """Check if agent can call tool"""
        role_config = self.config["agent_roles"][agent_role]
        
        # Check forbidden
        if tool_name in role_config.get("forbidden_tools", []):
            return False
        
        # Check allowed
        allowed = role_config.get("allowed_tools", [])
        if "*" in allowed:
            return True
        if tool_name in allowed:
            return True
        
        return False
    
    def get_max_cost(self) -> float:
        """Get max cost per request"""
        return self.config["environments"][self.env]["max_cost_per_request"]
    
    def get_max_latency(self) -> int:
        """Get max latency in ms"""
        return self.config["environments"][self.env]["max_latency_ms"]

Human Approval Steps

For destructive operations, require approval.

# src/governance/approval.py
from typing import Dict, Any, Optional
from enum import Enum

class ApprovalStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

class ApprovalWorkflow:
    def __init__(self):
        self.pending_approvals = {}
    
    def requires_approval(self, action: str) -> bool:
        """Check if action requires approval"""
        destructive_actions = [
            "delete_user",
            "update_billing",
            "escalate_critical",
            "terraform_apply"
        ]
        return action in destructive_actions
    
    def request_approval(
        self,
        trace_id: str,
        agent_role: str,
        action: str,
        plan: Dict[str, Any]
    ) -> str:
        """Request approval for action"""
        approval_id = f"approval_{trace_id}_{action}"
        
        self.pending_approvals[approval_id] = {
            "trace_id": trace_id,
            "agent_role": agent_role,
            "action": action,
            "plan": plan,
            "status": ApprovalStatus.PENDING,
            "requested_at": datetime.utcnow()
        }
        
        # Send notification to human
        send_approval_notification(approval_id, plan)
        
        return approval_id
    
    def check_approval(self, approval_id: str) -> ApprovalStatus:
        """Check approval status"""
        approval = self.pending_approvals.get(approval_id)
        if not approval:
            return ApprovalStatus.REJECTED
        
        return approval["status"]
    
    def approve(self, approval_id: str, approver: str):
        """Approve action"""
        approval = self.pending_approvals.get(approval_id)
        if approval:
            approval["status"] = ApprovalStatus.APPROVED
            approval["approver"] = approver
            approval["approved_at"] = datetime.utcnow()
    
    def reject(self, approval_id: str, reason: str):
        """Reject action"""
        approval = self.pending_approvals.get(approval_id)
        if approval:
            approval["status"] = ApprovalStatus.REJECTED
            approval["rejection_reason"] = reason

# Usage in workflow
def execute_with_approval(workflow, input_data):
    """Execute workflow with approval steps"""
    approval_workflow = ApprovalWorkflow()
    
    # Agent proposes plan
    plan = workflow.plan(input_data)
    
    # Check if approval needed
    if approval_workflow.requires_approval(plan["action"]):
        approval_id = approval_workflow.request_approval(
            trace_id=plan["trace_id"],
            agent_role=plan["agent_role"],
            action=plan["action"],
            plan=plan
        )
        
        # Wait for approval
        status = approval_workflow.check_approval(approval_id)
        if status != ApprovalStatus.APPROVED:
            return {"error": "Action not approved"}
    
    # Execute plan
    return workflow.execute(plan)

Checklist and Template

Here’s a checklist for shipping agents safely:

  • All agents and workflows are versioned
  • Tests cover planner + tool selection
  • Eval suite exists and is part of CI
  • Shadow + canary deployment paths in CD
  • Observability + governance hooked in
  • Tool permissions defined and enforced
  • Approval workflows for destructive actions
  • Cost and latency budgets set
  • Rollback procedures documented
  • Monitoring and alerting configured

Template Repository Structure

cicd-agentic-ai/
├── agents/
│   ├── planner.py
│   ├── worker.py
│   └── critic.py
├── workflows/
│   ├── support.yaml
│   └── billing.yaml
├── tools/
│   ├── search_kb.py
│   ├── create_ticket.py
│   └── update_billing.py
├── config/
│   ├── agents/
│   │   └── support_agent.yaml
│   ├── workflows/
│   │   └── support.yaml
│   └── tool_permissions.yaml
├── tests/
│   ├── test_static_validation.py
│   ├── test_agent_behavior.py
│   ├── test_eval_suite.py
│   └── test_observability.py
├── src/
│   ├── agent.py
│   ├── workflow.py
│   ├── deployment/
│   │   ├── shadow.py
│   │   ├── canary.py
│   │   └── full_rollout.py
│   ├── observability/
│   │   ├── logging.py
│   │   └── metrics.py
│   └── governance/
│       ├── safety.py
│       └── approval.py
├── .github/
│   └── workflows/
│       ├── ci.yml
│       └── cd.yml
└── README.md

Conclusion

Agents are software systems. They need CI/CD like any other system. But they’re different. They have behavior that changes. They have tools. They have workflows. They need special treatment.

The key is to:

  1. Version each layer separately
  2. Test behavior, not just code
  3. Deploy in stages
  4. Observe everything
  5. Govern with boundaries

Start with static checks. Add behavioral tests. Build eval suites. Deploy with shadow and canary. Add observability from day one. Enforce governance.

Your agents will break. But with proper CI/CD, you’ll catch problems before production. You’ll deploy safely. You’ll sleep better.

The code examples in this article are available in the repository. Use them as a starting point. Adapt them to your needs. Build pipelines you can trust.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000