By Abdelrahman Elborey

Failure-First AI Agents: Designing Timeouts, Fallbacks, and Human Handoffs That Don't Break Prod

ai-agentserror-handlingtimeoutsfallbackshuman-handoffpythonproductionreliability

Your agent works in development. It calls tools. It makes decisions. It completes tasks.

Then you deploy it. A tool times out. The model returns garbage. An API returns 500. The agent gets stuck. Users see errors. Data gets corrupted.

The problem isn’t that things fail. The problem is how they fail.

This article shows you how to design agents that fail safely. Not agents that never fail. Agents that handle failure as a first-class concern.

Why “Failure-First” Agents Matter

Agents are built on many moving parts. The model. Tools. APIs. Network. User input. Each part can fail.

Things will fail. The question is how they fail.

Most agents are built to work when everything goes right. Failure handling is an afterthought. You add try-catch blocks. You log errors. You hope it works.

But that’s not enough. You need to design for failure from the start.

Real Risks

When agents fail badly, you get:

Stuck tasks: The agent loops forever. It keeps trying the same thing. It never stops. Your system hangs.

Partial writes: The agent updates half your database. Then it fails. Now your data is inconsistent. You can’t tell what changed.

Duplicated actions: The agent retries a payment. It doesn’t check if it already succeeded. You charge the user twice.

Silent data corruption: The agent writes bad data. No error is thrown. The data looks fine. But it’s wrong. You find out weeks later.

These aren’t edge cases. They happen in production. Every day.

Designing for Failure

Failure-first design means:

  • Every tool call can fail. Plan for it.
  • Every model call can fail. Plan for it.
  • Every workflow can get stuck. Plan for it.
  • Every action can be retried. Make it safe.

You don’t need complex systems. You need simple patterns. Patterns that work when things go wrong.

A Simple Failure Taxonomy for AI Agents

Here’s a clean mental model for agent failures. Four categories. Each needs different handling.

Tool-Level Failures

Tools are external. They call APIs. Databases. APIs. They fail.

API timeouts: The API doesn’t respond. Your request hangs. After 30 seconds, it times out. Or after 5 minutes. Or never.

HTTP 4xx/5xx: The API returns an error. 400 means bad request. 401 means unauthorized. 500 means server error. Each means something different.

Schema / validation errors: The tool returns data. But it’s the wrong shape. Missing fields. Wrong types. Your code breaks.

How it shows up: Logs show “Connection timeout” or “HTTP 500” or “KeyError: ‘result’”. The agent stops. Or it crashes.

Why it’s dangerous: The agent might be in the middle of a workflow. It already did some steps. Now it fails. Partial state. Inconsistent data.

What good failure looks like: Clear error message. Contained to that tool call. Retryable if appropriate. Context preserved for debugging.

Model-Level Failures

The model itself can fail. Or it can return garbage.

Hallucinated tool arguments: The model decides to call a tool. But the arguments are wrong. Missing required fields. Invalid values. The tool call fails.

Misunderstood instructions: The model does something different than you asked. It calls the wrong tool. It uses wrong parameters. It goes in the wrong direction.

Empty / truncated outputs: The model returns nothing. Or it gets cut off mid-sentence. Your code expects a response. It gets empty string.

How it shows up: Logs show “Invalid tool arguments” or “Tool call failed” or “Empty response from model”. The agent might retry. Or it might continue with bad data.

Why it’s dangerous: The agent makes decisions based on bad output. It might do the wrong thing. It might call tools incorrectly. It might corrupt data.

What good failure looks like: Validation before tool calls. Retry with clearer prompt. Fallback to simpler model. Human review if confidence is low.

Workflow-Level Failures

The agent gets stuck. Or it goes in circles.

Infinite loops and “stuck” planning: The agent keeps trying the same thing. It can’t make progress. It loops forever. Or it plans forever without acting.

Conflicting goals between steps: Step 1 says do X. Step 2 says do Y. But X and Y conflict. The agent doesn’t know what to do.

How it shows up: Logs show the same step repeating. Or the agent planning for minutes. CPU usage spikes. Memory grows. Nothing completes.

Why it’s dangerous: Resources get consumed. Other requests queue. System slows down. Users wait forever.

What good failure looks like: Step limit. Timeout per step. Timeout for entire workflow. Clear exit when stuck. Human escalation.

User / Data Failures

The input is bad. Or permissions are wrong.

Invalid inputs: Bad IDs. Missing fields. Wrong format. The agent can’t proceed.

Permissions / access issues: The user doesn’t have permission. The resource doesn’t exist. Access denied.

How it shows up: Logs show “Invalid user ID” or “Permission denied” or “Resource not found”. The agent fails immediately. Or it tries anyway and fails later.

Why it’s dangerous: The agent might try to work around it. It might use wrong data. It might expose data it shouldn’t.

What good failure looks like: Validate input early. Fail fast with clear message. Don’t retry. Alert user. Log for security review.

Timeouts and Retries That Don’t Cause Chaos

Timeouts and retries sound simple. But they’re easy to get wrong.

Choosing Timeouts

Different tools need different timeouts. A payment gateway should timeout fast. A search API can take longer.

Different timeouts for different tools:

TOOL_TIMEOUTS = {
    "payment_gateway": 5.0,  # Fast fail for payments
    "search_api": 30.0,      # Can take longer
    "database": 10.0,         # Medium timeout
    "llm_api": 60.0          # LLMs can be slow
}

Total workflow timeout vs per-step timeout: Set a timeout for each step. Also set a timeout for the entire workflow. If any step times out, fail that step. If the workflow times out, stop everything.

WORKFLOW_TIMEOUT = 300.0  # 5 minutes total
STEP_TIMEOUT = 30.0       # 30 seconds per step

Why this matters: If you only have a workflow timeout, one slow step can eat all your time. If you only have step timeouts, the workflow can run forever with many small steps.

Smart Retries

Not everything should be retried. And retries need backoff.

Exponential backoff: Wait longer between each retry. First retry after 1 second. Second after 2 seconds. Third after 4 seconds. Don’t hammer the API.

import asyncio
import random
from typing import Callable, TypeVar, Any

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable[[], T],
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0
) -> T:
    """Retry a function with exponential backoff."""
    last_error = None
    
    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            last_error = e
            
            # Don't retry on certain errors
            if isinstance(e, (ValueError, KeyError, PermissionError)):
                raise
            
            # Last attempt - don't wait
            if attempt == max_attempts - 1:
                break
            
            # Calculate delay with jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            await asyncio.sleep(delay + jitter)
    
    raise last_error

Max attempts and when to stop: Set a max. Usually 3 attempts. After that, give up. Don’t retry forever.

When not to retry: Don’t retry 4xx errors. They mean your request is wrong. Retrying won’t help.

Don’t retry invalid input. The input is wrong. Fix the input, don’t retry.

Don’t retry hard business rule violations. If the user doesn’t have permission, retrying won’t help.

def should_retry(error: Exception) -> bool:
    """Decide if an error should be retried."""
    # Don't retry client errors (4xx)
    if isinstance(error, HTTPError):
        if 400 <= error.status_code < 500:
            return False
    
    # Don't retry validation errors
    if isinstance(error, (ValueError, KeyError, TypeError)):
        return False
    
    # Don't retry permission errors
    if isinstance(error, PermissionError):
        return False
    
    # Retry server errors (5xx) and timeouts
    return True

Retry Decision Table

Here’s a simple table to guide retry decisions:

ScenarioRetry?Backoff?Human Alert?
500 from tool APIYesYesMaybe
400 invalid payloadNoNoYes
Timeout to DBYesYesIf repeated
401 unauthorizedNoNoYes
429 rate limitYesYesIf persistent
Network errorYesYesIf repeated

Idempotency Basics

If you retry, make sure it’s safe. Use idempotency keys.

Idempotency keys for write operations: Every write operation gets a unique key. If you retry with the same key, the operation is safe. It won’t duplicate.

import hashlib
import json

def generate_idempotency_key(operation: str, params: dict) -> str:
    """Generate an idempotency key for an operation."""
    data = json.dumps({"op": operation, "params": params}, sort_keys=True)
    return hashlib.sha256(data.encode()).hexdigest()

async def safe_write_operation(
    operation: str,
    params: dict,
    idempotency_store: dict
):
    """Perform a write operation with idempotency."""
    key = generate_idempotency_key(operation, params)
    
    # Check if we already did this
    if key in idempotency_store:
        return idempotency_store[key]["result"]
    
    # Do the operation
    result = await perform_write(operation, params)
    
    # Store the result
    idempotency_store[key] = {
        "result": result,
        "timestamp": time.time()
    }
    
    return result

Avoiding duplicate actions: Before retrying, check if the action already succeeded. If it did, return the existing result. Don’t do it again.

Fallback Strategies: Degrade Gracefully, Not Dramatically

When something fails, don’t just fail. Fall back to something simpler. But make it explicit. Don’t hide it.

Model Fallback

If the primary model is down, use a backup. But use a simpler prompt. The backup might be weaker.

async def call_model_with_fallback(prompt: str, context: list) -> str:
    """Call model with fallback to backup model."""
    try:
        # Try primary model
        return await call_primary_model(prompt, context)
    except ModelError as e:
        logger.warning(f"Primary model failed: {e}, trying backup")
        
        # Simplify prompt for backup model
        simplified_prompt = simplify_prompt(prompt)
        
        try:
            return await call_backup_model(simplified_prompt, context)
        except ModelError as e2:
            logger.error(f"Backup model also failed: {e2}")
            raise

When to use: Primary model is down. Or it’s too slow. Or it’s returning errors.

What to simplify: Remove complex instructions. Use shorter context. Ask for simpler output.

Tool Fallback

If the main tool fails, use a cached version. Or use a read-only path.

class ToolWithFallback:
    def __init__(self, primary_tool, fallback_tool, cache=None):
        self.primary_tool = primary_tool
        self.fallback_tool = fallback_tool
        self.cache = cache
    
    async def call(self, params: dict) -> dict:
        """Call tool with fallback to cache or read-only version."""
        try:
            result = await self.primary_tool.call(params)
            
            # Cache successful results
            if self.cache:
                cache_key = self._cache_key(params)
                await self.cache.set(cache_key, result, ttl=3600)
            
            return result
        except ToolError as e:
            logger.warning(f"Primary tool failed: {e}, trying fallback")
            
            # Try cache first
            if self.cache:
                cache_key = self._cache_key(params)
                cached = await self.cache.get(cache_key)
                if cached:
                    logger.info("Using cached result")
                    return cached
            
            # Try read-only fallback
            try:
                return await self.fallback_tool.call_readonly(params)
            except ToolError as e2:
                logger.error(f"Fallback tool also failed: {e2}")
                raise

When to use: Main tool is down. Or it’s slow. Or it’s returning errors.

What fallback to use: Cached data from last successful call. Or a read-only version that doesn’t modify state.

Mode Fallback

Switch the agent from “autonomous” to “advisory” mode. When uncertainty is high, ask the user.

class AgentMode(Enum):
    AUTONOMOUS = "autonomous"  # Agent acts on its own
    ADVISORY = "advisory"      # Agent suggests, user decides
    STOPPED = "stopped"        # Agent stops, escalates

class AgentWithModeFallback:
    def __init__(self):
        self.mode = AgentMode.AUTONOMOUS
        self.confidence_threshold = 0.8
    
    async def decide_action(self, plan: dict) -> dict:
        """Decide on action based on mode and confidence."""
        confidence = self._calculate_confidence(plan)
        
        # If confidence is low, switch to advisory mode
        if confidence < self.confidence_threshold:
            if self.mode == AgentMode.AUTONOMOUS:
                logger.info("Switching to advisory mode due to low confidence")
                self.mode = AgentMode.ADVISORY
        
        if self.mode == AgentMode.ADVISORY:
            # Ask user before acting
            return {
                "action": "ask_user",
                "suggestion": plan,
                "confidence": confidence,
                "reason": "Low confidence, need user approval"
            }
        
        # Autonomous mode - act directly
        return {
            "action": "execute",
            "plan": plan,
            "confidence": confidence
        }

When to use: Uncertainty is high. Multiple valid paths. Risk is high.

What to ask: Show the plan. Show confidence. Ask for approval. Or ask for clarification.

Scope Fallback

Narrow the action. Instead of updating 100 records, update 1 and ask for confirmation.

async def update_records_with_fallback(record_ids: list, updates: dict):
    """Update records with scope fallback."""
    if len(record_ids) > 10:
        # Too many - narrow scope
        logger.warning(f"Too many records ({len(record_ids)}), narrowing scope")
        
        # Update just the first one as a test
        test_id = record_ids[0]
        await update_record(test_id, updates)
        
        # Ask for confirmation
        return {
            "status": "partial",
            "updated": [test_id],
            "remaining": record_ids[1:],
            "message": "Updated 1 record as test. Confirm to update remaining."
        }
    
    # Small batch - update all
    results = []
    for record_id in record_ids:
        result = await update_record(record_id, updates)
        results.append(result)
    
    return {
        "status": "complete",
        "updated": results
    }

When to use: Large batch operations. High-risk operations. First time doing something.

What to narrow: Reduce batch size. Do one as test. Ask for confirmation before continuing.

Making Fallbacks Observable

Don’t hide fallbacks. Log them. Alert on them. Track them.

async def call_with_fallback(primary_func, fallback_func, operation_name: str):
    """Call with fallback and observability."""
    try:
        result = await primary_func()
        logger.info(f"{operation_name}: success (primary)")
        return result
    except Exception as e:
        logger.warning(f"{operation_name}: primary failed: {e}, using fallback")
        
        # Track fallback usage
        metrics.increment("fallback.used", tags={"operation": operation_name})
        
        try:
            result = await fallback_func()
            logger.info(f"{operation_name}: success (fallback)")
            metrics.increment("fallback.success", tags={"operation": operation_name})
            return result
        except Exception as e2:
            logger.error(f"{operation_name}: fallback also failed: {e2}")
            metrics.increment("fallback.failure", tags={"operation": operation_name})
            raise

Human-in-the-Loop and Handoff Patterns

Sometimes the agent should stop. It should ask a person. Not because it failed. Because the risk is too high. Or the decision is too important.

Approval Gateways

Some actions need approval. Always. Before they happen.

High-risk actions:

  • Payments
  • PII changes
  • Production config changes
  • Deletions
  • External API calls that cost money
class ApprovalGateway:
    """Gateway for actions that require human approval."""
    
    REQUIRES_APPROVAL = [
        "payment",
        "delete_user",
        "update_pii",
        "change_production_config",
        "external_api_call"
    ]
    
    async def check_approval(self, action: dict) -> dict:
        """Check if action requires approval."""
        action_type = action.get("type")
        
        if action_type in self.REQUIRES_APPROVAL:
            # Create approval request
            approval_id = await self._create_approval_request(action)
            
            return {
                "requires_approval": True,
                "approval_id": approval_id,
                "action": action,
                "message": f"Action '{action_type}' requires approval"
            }
        
        return {
            "requires_approval": False,
            "action": action
        }
    
    async def _create_approval_request(self, action: dict) -> str:
        """Create an approval request and notify human."""
        approval_id = str(uuid.uuid4())
        
        # Store request
        await self._store_approval_request(approval_id, action)
        
        # Notify human (Slack, email, etc.)
        await self._notify_human(approval_id, action)
        
        return approval_id

How it works: Before executing, check if approval is needed. If yes, create a request. Notify a human. Wait for approval. Then execute.

Escalation Flows

If an error repeats N times, escalate. Create a ticket. Send to Slack. Get a human involved.

class EscalationManager:
    """Manage escalation when errors repeat."""
    
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
        self.error_counts = {}  # error_key -> count
    
    async def handle_error(self, error: Exception, context: dict):
        """Handle error and escalate if needed."""
        error_key = self._error_key(error, context)
        
        # Increment count
        self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
        count = self.error_counts[error_key]
        
        logger.warning(f"Error occurred: {error} (count: {count})")
        
        # If threshold reached, escalate
        if count >= self.max_retries:
            await self._escalate(error, context, count)
    
    async def _escalate(self, error: Exception, context: dict, count: int):
        """Escalate to human."""
        logger.error(f"Escalating error after {count} occurrences: {error}")
        
        # Create ticket
        ticket_id = await self._create_ticket(error, context)
        
        # Send to Slack/Teams
        await self._send_alert(error, context, ticket_id)
        
        # Reset count (or don't, to prevent spam)
        # self.error_counts[error_key] = 0
    
    def _error_key(self, error: Exception, context: dict) -> str:
        """Generate a key for grouping similar errors."""
        error_type = type(error).__name__
        operation = context.get("operation", "unknown")
        return f"{error_type}:{operation}"

When to escalate: Same error happens 3+ times. Error rate spikes. Critical operation fails.

What to include in escalation: Error details. Context. Steps that led to error. Suggested next step.

Handing Context to Humans Cleanly

When you hand off to a human, give them everything they need. But make it clear. Don’t dump raw logs.

What to include:

  • User input
  • Plan the agent had
  • Steps executed so far
  • Failing step
  • Error logs
  • Suggested next step from agent
def package_context_for_human(
    user_input: str,
    plan: dict,
    executed_steps: list,
    failing_step: dict,
    error: Exception,
    agent_suggestion: str = None
) -> dict:
    """Package context for human review."""
    return {
        "summary": {
            "user_input": user_input,
            "status": "failed",
            "error": str(error),
            "error_type": type(error).__name__
        },
        "execution": {
            "plan": plan,
            "steps_executed": executed_steps,
            "failing_step": failing_step,
            "steps_remaining": len(plan.get("steps", [])) - len(executed_steps)
        },
        "error_details": {
            "message": str(error),
            "type": type(error).__name__,
            "traceback": format_traceback(error)
        },
        "agent_suggestion": agent_suggestion or "Please review and provide guidance",
        "timestamp": time.time(),
        "suggested_actions": [
            "Review the error and context",
            "Decide on next step",
            "Approve retry or provide alternative"
        ]
    }

Format it well: Use clear structure. Use plain language. Highlight important parts. Make it scannable.

Patterns to Cover

“Ask-before-act”: Agent proposes action. Human approves. Then agent acts.

async def ask_before_act(action: dict, context: dict) -> dict:
    """Ask human before acting."""
    approval_request = package_context_for_human(
        user_input=context.get("user_input"),
        plan={"proposed_action": action},
        executed_steps=[],
        failing_step=None,
        error=None,
        agent_suggestion=f"Proposed action: {action['type']}"
    )
    
    # Send to human
    approval_id = await send_approval_request(approval_request)
    
    # Wait for approval
    approved = await wait_for_approval(approval_id, timeout=300)
    
    if approved:
        return await execute_action(action)
    else:
        return {"status": "cancelled", "reason": "Not approved"}

“Review-after-act”: Agent acts. Then human reviews. Can undo if needed.

async def review_after_act(action: dict, result: dict, context: dict) -> dict:
    """Act first, then ask human to review."""
    # Execute action
    execution_result = await execute_action(action)
    
    # Package for review
    review_request = {
        "action_taken": action,
        "result": execution_result,
        "context": context,
        "can_undo": execution_result.get("can_undo", False)
    }
    
    # Send for review
    review_id = await send_review_request(review_request)
    
    # If human says undo, undo it
    if await should_undo(review_id):
        return await undo_action(action, execution_result)
    
    return execution_result

“Stop-and-escalate”: Agent stops. Creates ticket. Waits for human. Doesn’t continue until human responds.

async def stop_and_escalate(error: Exception, context: dict) -> dict:
    """Stop execution and escalate to human."""
    # Package context
    escalation_context = package_context_for_human(
        user_input=context.get("user_input"),
        plan=context.get("plan"),
        executed_steps=context.get("executed_steps", []),
        failing_step=context.get("current_step"),
        error=error
    )
    
    # Create ticket
    ticket_id = await create_ticket(escalation_context)
    
    # Notify human
    await notify_human(ticket_id, escalation_context)
    
    # Wait for human response
    human_guidance = await wait_for_human_guidance(ticket_id, timeout=3600)
    
    return {
        "status": "escalated",
        "ticket_id": ticket_id,
        "human_guidance": human_guidance,
        "next_step": human_guidance.get("next_step")
    }

Observability for Failure-First Agents

You can’t fix what you can’t see. You need to log failures. Track them. Alert on them.

What to Log and How

Log structure matters. Make it consistent. Make it searchable.

Log structure:

{
    "trace_id": "abc123",
    "user_id": "user_456",
    "workflow_id": "workflow_789",
    "step_name": "call_payment_api",
    "tool_name": "payment_gateway",
    "error_type": "TimeoutError",
    "error_message": "Request timed out after 5.0 seconds",
    "retries": 2,
    "duration_ms": 5000,
    "timestamp": "2025-11-16T10:30:00Z",
    "context": {
        "operation": "process_payment",
        "amount": 100.00,
        "currency": "USD"
    }
}

Key fields:

  • trace_id: Track a request across services
  • user_id: Who triggered it
  • workflow_id: Which workflow
  • step_name: Which step failed
  • tool_name: Which tool
  • error_type: What kind of error
  • retries: How many retries
  • duration_ms: How long it took

Basic Dashboards

Build simple dashboards. Don’t overcomplicate.

Error rate by tool: Show how often each tool fails. Which tools are unreliable.

Average retries per workflow: Show how often workflows need retries. High retries mean problems.

Top error types per day: Show what’s failing. Timeouts? 500s? Validation errors?

# Example metrics to track
metrics = {
    "errors.by_tool": {
        "payment_gateway": 5,
        "search_api": 2,
        "database": 1
    },
    "retries.per_workflow": 1.2,  # Average
    "errors.by_type": {
        "TimeoutError": 10,
        "HTTPError": 5,
        "ValidationError": 2
    }
}

Simple Alerts

Set up alerts. But don’t alert on everything.

Spike in a specific error type: If timeouts spike, alert. If 500s spike, alert.

Sudden increase in average retries: If retries double, something is wrong.

Critical operation failures: If payments fail, alert immediately.

class AlertManager:
    """Simple alert manager."""
    
    def __init__(self):
        self.error_counts = {}
        self.retry_counts = {}
    
    def record_error(self, error_type: str, tool_name: str):
        """Record an error and check for alerts."""
        key = f"{error_type}:{tool_name}"
        self.error_counts[key] = self.error_counts.get(key, 0) + 1
        
        # Alert if spike
        if self.error_counts[key] > 10:  # Threshold
            self._send_alert(f"Spike in {error_type} for {tool_name}")
    
    def record_retry(self, workflow_id: str):
        """Record a retry and check for alerts."""
        self.retry_counts[workflow_id] = self.retry_counts.get(workflow_id, 0) + 1
        
        # Alert if too many retries
        if self.retry_counts[workflow_id] > 3:
            self._send_alert(f"Workflow {workflow_id} retried {self.retry_counts[workflow_id]} times")

Keep it simple. No need for a specific vendor. Use what you have. Logs. Metrics. Alerts.

Testing Failure Scenarios (Without Breaking Prod)

You need to test failures. But you can’t break production. Test in staging. Inject failures. See what happens.

Injecting Fake Tool Errors

In staging, inject random errors. See how the agent handles them.

class FailureInjector:
    """Inject failures for testing."""
    
    def __init__(self, failure_rate: float = 0.1):
        self.failure_rate = failure_rate
        self.injected_errors = []
    
    async def call_tool_with_injection(self, tool_name: str, func, *args, **kwargs):
        """Call tool with potential failure injection."""
        # Randomly inject failure
        if random.random() < self.failure_rate:
            error_type = random.choice([
                "TimeoutError",
                "HTTPError",
                "ConnectionError"
            ])
            
            self.injected_errors.append({
                "tool": tool_name,
                "error": error_type,
                "timestamp": time.time()
            })
            
            # Raise the error
            if error_type == "TimeoutError":
                raise asyncio.TimeoutError(f"Injected timeout for {tool_name}")
            elif error_type == "HTTPError":
                raise HTTPError(f"Injected 500 for {tool_name}", status_code=500)
            else:
                raise ConnectionError(f"Injected connection error for {tool_name}")
        
        # Normal call
        return await func(*args, **kwargs)

What to inject:

  • Random 500s
  • Timeouts
  • Slow responses
  • Connection errors

What to test:

  • Retry logic works
  • Fallbacks trigger
  • Timeouts are respected
  • Handoffs happen when needed

Testing Retry Limits

Make sure retries stop. Don’t retry forever.

async def test_retry_limits():
    """Test that retries stop at limit."""
    max_attempts = 3
    attempt_count = 0
    
    async def failing_func():
        nonlocal attempt_count
        attempt_count += 1
        raise Exception("Always fails")
    
    try:
        await retry_with_backoff(failing_func, max_attempts=max_attempts)
        assert False, "Should have raised exception"
    except Exception:
        # Should have tried max_attempts times
        assert attempt_count == max_attempts

Ensuring Handoffs Work

Simulate repeated failure. Make sure tickets are created. Notifications are sent.

async def test_handoff_on_repeated_failure():
    """Test that handoff happens on repeated failure."""
    escalation_manager = EscalationManager(max_retries=3)
    tickets_created = []
    
    # Mock ticket creation
    async def mock_create_ticket(error, context, count):
        tickets_created.append({"error": str(error), "count": count})
    
    escalation_manager._create_ticket = mock_create_ticket
    
    # Simulate 3 failures
    for i in range(3):
        await escalation_manager.handle_error(
            Exception("Test error"),
            {"operation": "test"}
        )
    
    # Should have created a ticket
    assert len(tickets_created) == 1
    assert tickets_created[0]["count"] == 3

Practical Checklist

Here’s a checklist you can use:

  • Timeouts set per tool?
  • Retry rules documented and implemented?
  • Fallbacks defined for critical paths?
  • Human approvals defined for risky actions?
  • Error logging + dashboards in place?
  • Failure test suite at least for happy vs failure paths?

Putting It All Together: A Complete Example

Let’s build a simple agent that handles failures well. It calls tools. It has timeouts. It retries. It falls back. It escalates.

The Agent Loop

import asyncio
import time
import logging
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from dataclasses import dataclass

logger = logging.getLogger(__name__)

class AgentState(Enum):
    RUNNING = "running"
    WAITING_APPROVAL = "waiting_approval"
    ESCALATED = "escalated"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class ToolConfig:
    name: str
    timeout: float
    max_retries: int
    requires_approval: bool = False

class FailureFirstAgent:
    """An agent designed to handle failures gracefully."""
    
    def __init__(
        self,
        tool_configs: Dict[str, ToolConfig],
        workflow_timeout: float = 300.0,
        escalation_threshold: int = 3
    ):
        self.tool_configs = tool_configs
        self.workflow_timeout = workflow_timeout
        self.escalation_threshold = escalation_threshold
        self.state = AgentState.RUNNING
        self.start_time = None
        self.error_counts = {}
        self.executed_steps = []
    
    async def run(self, user_input: str, plan: List[Dict]) -> Dict:
        """Run the agent workflow."""
        self.start_time = time.time()
        self.state = AgentState.RUNNING
        
        try:
            for step in plan:
                # Check workflow timeout
                if self._workflow_timed_out():
                    return await self._handle_timeout(user_input, plan)
                
                # Execute step
                result = await self._execute_step(step, user_input, plan)
                
                if result.get("requires_approval"):
                    self.state = AgentState.WAITING_APPROVAL
                    return result
                
                if result.get("escalated"):
                    self.state = AgentState.ESCALATED
                    return result
                
                self.executed_steps.append({
                    "step": step,
                    "result": result,
                    "timestamp": time.time()
                })
            
            self.state = AgentState.COMPLETED
            return {
                "status": "completed",
                "result": "Workflow completed successfully",
                "steps_executed": len(self.executed_steps)
            }
        
        except Exception as e:
            self.state = AgentState.FAILED
            return await self._handle_error(e, user_input, plan)
    
    async def _execute_step(self, step: Dict, user_input: str, plan: List[Dict]) -> Dict:
        """Execute a single step with failure handling."""
        tool_name = step.get("tool")
        tool_config = self.tool_configs.get(tool_name)
        
        if not tool_config:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        # Check if approval needed
        if tool_config.requires_approval:
            return await self._request_approval(step, user_input, plan)
        
        # Execute with retry and timeout
        return await self._call_tool_with_retry(
            tool_name,
            tool_config,
            step.get("params", {})
        )
    
    async def _call_tool_with_retry(
        self,
        tool_name: str,
        config: ToolConfig,
        params: Dict
    ) -> Dict:
        """Call tool with retry and timeout."""
        last_error = None
        
        for attempt in range(config.max_retries):
            try:
                # Call with timeout
                result = await asyncio.wait_for(
                    self._call_tool(tool_name, params),
                    timeout=config.timeout
                )
                
                # Success - reset error count
                error_key = f"{tool_name}:{type(last_error).__name__ if last_error else 'success'}"
                self.error_counts[error_key] = 0
                
                return {
                    "status": "success",
                    "result": result,
                    "attempts": attempt + 1
                }
            
            except asyncio.TimeoutError as e:
                last_error = e
                logger.warning(f"Tool {tool_name} timed out (attempt {attempt + 1})")
                
                # Check if should escalate
                if await self._should_escalate(tool_name, e):
                    return await self._escalate(e, tool_name, params)
            
            except Exception as e:
                last_error = e
                logger.warning(f"Tool {tool_name} failed: {e} (attempt {attempt + 1})")
                
                # Don't retry certain errors
                if not self._should_retry(e):
                    raise
                
                # Check if should escalate
                if await self._should_escalate(tool_name, e):
                    return await self._escalate(e, tool_name, params)
            
            # Wait before retry
            if attempt < config.max_retries - 1:
                delay = min(1.0 * (2 ** attempt), 60.0)
                await asyncio.sleep(delay)
        
        # All retries exhausted
        raise last_error
    
    async def _call_tool(self, tool_name: str, params: Dict) -> Any:
        """Call a tool (implement based on your tools)."""
        # This is a placeholder - implement based on your actual tools
        if tool_name == "search":
            return await self._search_tool(params)
        elif tool_name == "update":
            return await self._update_tool(params)
        else:
            raise ValueError(f"Unknown tool: {tool_name}")
    
    async def _search_tool(self, params: Dict) -> Dict:
        """Example search tool."""
        query = params.get("query")
        # Simulate tool call
        await asyncio.sleep(0.1)
        return {"results": [f"Result for {query}"]}
    
    async def _update_tool(self, params: Dict) -> Dict:
        """Example update tool."""
        record_id = params.get("record_id")
        # Simulate tool call
        await asyncio.sleep(0.1)
        return {"updated": record_id, "status": "success"}
    
    def _should_retry(self, error: Exception) -> bool:
        """Decide if error should be retried."""
        # Don't retry client errors
        if isinstance(error, ValueError):
            return False
        if isinstance(error, KeyError):
            return False
        # Retry timeouts and server errors
        return True
    
    async def _should_escalate(self, tool_name: str, error: Exception) -> bool:
        """Check if error should be escalated."""
        error_key = f"{tool_name}:{type(error).__name__}"
        self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
        
        return self.error_counts[error_key] >= self.escalation_threshold
    
    async def _escalate(self, error: Exception, tool_name: str, params: Dict) -> Dict:
        """Escalate to human."""
        logger.error(f"Escalating error: {error} for tool {tool_name}")
        
        # Package context
        context = {
            "error": str(error),
            "error_type": type(error).__name__,
            "tool": tool_name,
            "params": params,
            "executed_steps": self.executed_steps
        }
        
        # Create ticket (in real implementation)
        ticket_id = f"ticket_{int(time.time())}"
        
        return {
            "status": "escalated",
            "escalated": True,
            "ticket_id": ticket_id,
            "context": context,
            "message": "Error escalated to human for review"
        }
    
    async def _request_approval(self, step: Dict, user_input: str, plan: List[Dict]) -> Dict:
        """Request approval for a step."""
        approval_context = {
            "step": step,
            "user_input": user_input,
            "plan": plan,
            "executed_steps": self.executed_steps
        }
        
        # In real implementation, send to approval system
        approval_id = f"approval_{int(time.time())}"
        
        return {
            "status": "waiting_approval",
            "requires_approval": True,
            "approval_id": approval_id,
            "context": approval_context
        }
    
    def _workflow_timed_out(self) -> bool:
        """Check if workflow has timed out."""
        if not self.start_time:
            return False
        elapsed = time.time() - self.start_time
        return elapsed >= self.workflow_timeout
    
    async def _handle_timeout(self, user_input: str, plan: List[Dict]) -> Dict:
        """Handle workflow timeout."""
        logger.warning("Workflow timed out")
        return {
            "status": "timeout",
            "message": "Workflow exceeded time limit",
            "steps_executed": len(self.executed_steps),
            "steps_remaining": len(plan) - len(self.executed_steps)
        }
    
    async def _handle_error(self, error: Exception, user_input: str, plan: List[Dict]) -> Dict:
        """Handle workflow error."""
        logger.error(f"Workflow failed: {error}")
        return {
            "status": "failed",
            "error": str(error),
            "error_type": type(error).__name__,
            "steps_executed": len(self.executed_steps),
            "context": {
                "user_input": user_input,
                "plan": plan,
                "executed_steps": self.executed_steps
            }
        }

Usage Example

# Configure tools
tool_configs = {
    "search": ToolConfig(
        name="search",
        timeout=10.0,
        max_retries=3,
        requires_approval=False
    ),
    "update": ToolConfig(
        name="update",
        timeout=5.0,
        max_retries=2,
        requires_approval=True  # Updates need approval
    )
}

# Create agent
agent = FailureFirstAgent(
    tool_configs=tool_configs,
    workflow_timeout=300.0,
    escalation_threshold=3
)

# Define plan
plan = [
    {"tool": "search", "params": {"query": "test"}},
    {"tool": "update", "params": {"record_id": "123", "data": {"status": "active"}}}
]

# Run agent
result = await agent.run("Process this record", plan)
print(result)

Summary

Failure-first design isn’t about preventing failures. It’s about handling them well.

Start with simple patterns:

  • Timeouts per tool and per workflow
  • Smart retries with backoff
  • Fallbacks for critical paths
  • Human handoffs for risky actions
  • Good logging and observability

Test failures in staging. Inject errors. See what happens. Make sure your agent fails safely.

The code examples above give you a foundation. Adapt them to your needs. Start simple. Add complexity only when you need it.

Remember: Things will fail. The question is how they fail. Make sure they fail safely.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000