Failure-First AI Agents: Designing Timeouts, Fallbacks, and Human Handoffs That Don't Break Prod
Your agent works in development. It calls tools. It makes decisions. It completes tasks.
Then you deploy it. A tool times out. The model returns garbage. An API returns 500. The agent gets stuck. Users see errors. Data gets corrupted.
The problem isn’t that things fail. The problem is how they fail.
This article shows you how to design agents that fail safely. Not agents that never fail. Agents that handle failure as a first-class concern.
Why “Failure-First” Agents Matter
Agents are built on many moving parts. The model. Tools. APIs. Network. User input. Each part can fail.
Things will fail. The question is how they fail.
Most agents are built to work when everything goes right. Failure handling is an afterthought. You add try-catch blocks. You log errors. You hope it works.
But that’s not enough. You need to design for failure from the start.
Real Risks
When agents fail badly, you get:
Stuck tasks: The agent loops forever. It keeps trying the same thing. It never stops. Your system hangs.
Partial writes: The agent updates half your database. Then it fails. Now your data is inconsistent. You can’t tell what changed.
Duplicated actions: The agent retries a payment. It doesn’t check if it already succeeded. You charge the user twice.
Silent data corruption: The agent writes bad data. No error is thrown. The data looks fine. But it’s wrong. You find out weeks later.
These aren’t edge cases. They happen in production. Every day.
Designing for Failure
Failure-first design means:
- Every tool call can fail. Plan for it.
- Every model call can fail. Plan for it.
- Every workflow can get stuck. Plan for it.
- Every action can be retried. Make it safe.
You don’t need complex systems. You need simple patterns. Patterns that work when things go wrong.
A Simple Failure Taxonomy for AI Agents
Here’s a clean mental model for agent failures. Four categories. Each needs different handling.
Tool-Level Failures
Tools are external. They call APIs. Databases. APIs. They fail.
API timeouts: The API doesn’t respond. Your request hangs. After 30 seconds, it times out. Or after 5 minutes. Or never.
HTTP 4xx/5xx: The API returns an error. 400 means bad request. 401 means unauthorized. 500 means server error. Each means something different.
Schema / validation errors: The tool returns data. But it’s the wrong shape. Missing fields. Wrong types. Your code breaks.
How it shows up: Logs show “Connection timeout” or “HTTP 500” or “KeyError: ‘result’”. The agent stops. Or it crashes.
Why it’s dangerous: The agent might be in the middle of a workflow. It already did some steps. Now it fails. Partial state. Inconsistent data.
What good failure looks like: Clear error message. Contained to that tool call. Retryable if appropriate. Context preserved for debugging.
Model-Level Failures
The model itself can fail. Or it can return garbage.
Hallucinated tool arguments: The model decides to call a tool. But the arguments are wrong. Missing required fields. Invalid values. The tool call fails.
Misunderstood instructions: The model does something different than you asked. It calls the wrong tool. It uses wrong parameters. It goes in the wrong direction.
Empty / truncated outputs: The model returns nothing. Or it gets cut off mid-sentence. Your code expects a response. It gets empty string.
How it shows up: Logs show “Invalid tool arguments” or “Tool call failed” or “Empty response from model”. The agent might retry. Or it might continue with bad data.
Why it’s dangerous: The agent makes decisions based on bad output. It might do the wrong thing. It might call tools incorrectly. It might corrupt data.
What good failure looks like: Validation before tool calls. Retry with clearer prompt. Fallback to simpler model. Human review if confidence is low.
Workflow-Level Failures
The agent gets stuck. Or it goes in circles.
Infinite loops and “stuck” planning: The agent keeps trying the same thing. It can’t make progress. It loops forever. Or it plans forever without acting.
Conflicting goals between steps: Step 1 says do X. Step 2 says do Y. But X and Y conflict. The agent doesn’t know what to do.
How it shows up: Logs show the same step repeating. Or the agent planning for minutes. CPU usage spikes. Memory grows. Nothing completes.
Why it’s dangerous: Resources get consumed. Other requests queue. System slows down. Users wait forever.
What good failure looks like: Step limit. Timeout per step. Timeout for entire workflow. Clear exit when stuck. Human escalation.
User / Data Failures
The input is bad. Or permissions are wrong.
Invalid inputs: Bad IDs. Missing fields. Wrong format. The agent can’t proceed.
Permissions / access issues: The user doesn’t have permission. The resource doesn’t exist. Access denied.
How it shows up: Logs show “Invalid user ID” or “Permission denied” or “Resource not found”. The agent fails immediately. Or it tries anyway and fails later.
Why it’s dangerous: The agent might try to work around it. It might use wrong data. It might expose data it shouldn’t.
What good failure looks like: Validate input early. Fail fast with clear message. Don’t retry. Alert user. Log for security review.
Timeouts and Retries That Don’t Cause Chaos
Timeouts and retries sound simple. But they’re easy to get wrong.
Choosing Timeouts
Different tools need different timeouts. A payment gateway should timeout fast. A search API can take longer.
Different timeouts for different tools:
TOOL_TIMEOUTS = {
"payment_gateway": 5.0, # Fast fail for payments
"search_api": 30.0, # Can take longer
"database": 10.0, # Medium timeout
"llm_api": 60.0 # LLMs can be slow
}
Total workflow timeout vs per-step timeout: Set a timeout for each step. Also set a timeout for the entire workflow. If any step times out, fail that step. If the workflow times out, stop everything.
WORKFLOW_TIMEOUT = 300.0 # 5 minutes total
STEP_TIMEOUT = 30.0 # 30 seconds per step
Why this matters: If you only have a workflow timeout, one slow step can eat all your time. If you only have step timeouts, the workflow can run forever with many small steps.
Smart Retries
Not everything should be retried. And retries need backoff.
Exponential backoff: Wait longer between each retry. First retry after 1 second. Second after 2 seconds. Third after 4 seconds. Don’t hammer the API.
import asyncio
import random
from typing import Callable, TypeVar, Any
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> T:
"""Retry a function with exponential backoff."""
last_error = None
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
last_error = e
# Don't retry on certain errors
if isinstance(e, (ValueError, KeyError, PermissionError)):
raise
# Last attempt - don't wait
if attempt == max_attempts - 1:
break
# Calculate delay with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
raise last_error
Max attempts and when to stop: Set a max. Usually 3 attempts. After that, give up. Don’t retry forever.
When not to retry: Don’t retry 4xx errors. They mean your request is wrong. Retrying won’t help.
Don’t retry invalid input. The input is wrong. Fix the input, don’t retry.
Don’t retry hard business rule violations. If the user doesn’t have permission, retrying won’t help.
def should_retry(error: Exception) -> bool:
"""Decide if an error should be retried."""
# Don't retry client errors (4xx)
if isinstance(error, HTTPError):
if 400 <= error.status_code < 500:
return False
# Don't retry validation errors
if isinstance(error, (ValueError, KeyError, TypeError)):
return False
# Don't retry permission errors
if isinstance(error, PermissionError):
return False
# Retry server errors (5xx) and timeouts
return True
Retry Decision Table
Here’s a simple table to guide retry decisions:
| Scenario | Retry? | Backoff? | Human Alert? |
|---|---|---|---|
| 500 from tool API | Yes | Yes | Maybe |
| 400 invalid payload | No | No | Yes |
| Timeout to DB | Yes | Yes | If repeated |
| 401 unauthorized | No | No | Yes |
| 429 rate limit | Yes | Yes | If persistent |
| Network error | Yes | Yes | If repeated |
Idempotency Basics
If you retry, make sure it’s safe. Use idempotency keys.
Idempotency keys for write operations: Every write operation gets a unique key. If you retry with the same key, the operation is safe. It won’t duplicate.
import hashlib
import json
def generate_idempotency_key(operation: str, params: dict) -> str:
"""Generate an idempotency key for an operation."""
data = json.dumps({"op": operation, "params": params}, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()
async def safe_write_operation(
operation: str,
params: dict,
idempotency_store: dict
):
"""Perform a write operation with idempotency."""
key = generate_idempotency_key(operation, params)
# Check if we already did this
if key in idempotency_store:
return idempotency_store[key]["result"]
# Do the operation
result = await perform_write(operation, params)
# Store the result
idempotency_store[key] = {
"result": result,
"timestamp": time.time()
}
return result
Avoiding duplicate actions: Before retrying, check if the action already succeeded. If it did, return the existing result. Don’t do it again.
Fallback Strategies: Degrade Gracefully, Not Dramatically
When something fails, don’t just fail. Fall back to something simpler. But make it explicit. Don’t hide it.
Model Fallback
If the primary model is down, use a backup. But use a simpler prompt. The backup might be weaker.
async def call_model_with_fallback(prompt: str, context: list) -> str:
"""Call model with fallback to backup model."""
try:
# Try primary model
return await call_primary_model(prompt, context)
except ModelError as e:
logger.warning(f"Primary model failed: {e}, trying backup")
# Simplify prompt for backup model
simplified_prompt = simplify_prompt(prompt)
try:
return await call_backup_model(simplified_prompt, context)
except ModelError as e2:
logger.error(f"Backup model also failed: {e2}")
raise
When to use: Primary model is down. Or it’s too slow. Or it’s returning errors.
What to simplify: Remove complex instructions. Use shorter context. Ask for simpler output.
Tool Fallback
If the main tool fails, use a cached version. Or use a read-only path.
class ToolWithFallback:
def __init__(self, primary_tool, fallback_tool, cache=None):
self.primary_tool = primary_tool
self.fallback_tool = fallback_tool
self.cache = cache
async def call(self, params: dict) -> dict:
"""Call tool with fallback to cache or read-only version."""
try:
result = await self.primary_tool.call(params)
# Cache successful results
if self.cache:
cache_key = self._cache_key(params)
await self.cache.set(cache_key, result, ttl=3600)
return result
except ToolError as e:
logger.warning(f"Primary tool failed: {e}, trying fallback")
# Try cache first
if self.cache:
cache_key = self._cache_key(params)
cached = await self.cache.get(cache_key)
if cached:
logger.info("Using cached result")
return cached
# Try read-only fallback
try:
return await self.fallback_tool.call_readonly(params)
except ToolError as e2:
logger.error(f"Fallback tool also failed: {e2}")
raise
When to use: Main tool is down. Or it’s slow. Or it’s returning errors.
What fallback to use: Cached data from last successful call. Or a read-only version that doesn’t modify state.
Mode Fallback
Switch the agent from “autonomous” to “advisory” mode. When uncertainty is high, ask the user.
class AgentMode(Enum):
AUTONOMOUS = "autonomous" # Agent acts on its own
ADVISORY = "advisory" # Agent suggests, user decides
STOPPED = "stopped" # Agent stops, escalates
class AgentWithModeFallback:
def __init__(self):
self.mode = AgentMode.AUTONOMOUS
self.confidence_threshold = 0.8
async def decide_action(self, plan: dict) -> dict:
"""Decide on action based on mode and confidence."""
confidence = self._calculate_confidence(plan)
# If confidence is low, switch to advisory mode
if confidence < self.confidence_threshold:
if self.mode == AgentMode.AUTONOMOUS:
logger.info("Switching to advisory mode due to low confidence")
self.mode = AgentMode.ADVISORY
if self.mode == AgentMode.ADVISORY:
# Ask user before acting
return {
"action": "ask_user",
"suggestion": plan,
"confidence": confidence,
"reason": "Low confidence, need user approval"
}
# Autonomous mode - act directly
return {
"action": "execute",
"plan": plan,
"confidence": confidence
}
When to use: Uncertainty is high. Multiple valid paths. Risk is high.
What to ask: Show the plan. Show confidence. Ask for approval. Or ask for clarification.
Scope Fallback
Narrow the action. Instead of updating 100 records, update 1 and ask for confirmation.
async def update_records_with_fallback(record_ids: list, updates: dict):
"""Update records with scope fallback."""
if len(record_ids) > 10:
# Too many - narrow scope
logger.warning(f"Too many records ({len(record_ids)}), narrowing scope")
# Update just the first one as a test
test_id = record_ids[0]
await update_record(test_id, updates)
# Ask for confirmation
return {
"status": "partial",
"updated": [test_id],
"remaining": record_ids[1:],
"message": "Updated 1 record as test. Confirm to update remaining."
}
# Small batch - update all
results = []
for record_id in record_ids:
result = await update_record(record_id, updates)
results.append(result)
return {
"status": "complete",
"updated": results
}
When to use: Large batch operations. High-risk operations. First time doing something.
What to narrow: Reduce batch size. Do one as test. Ask for confirmation before continuing.
Making Fallbacks Observable
Don’t hide fallbacks. Log them. Alert on them. Track them.
async def call_with_fallback(primary_func, fallback_func, operation_name: str):
"""Call with fallback and observability."""
try:
result = await primary_func()
logger.info(f"{operation_name}: success (primary)")
return result
except Exception as e:
logger.warning(f"{operation_name}: primary failed: {e}, using fallback")
# Track fallback usage
metrics.increment("fallback.used", tags={"operation": operation_name})
try:
result = await fallback_func()
logger.info(f"{operation_name}: success (fallback)")
metrics.increment("fallback.success", tags={"operation": operation_name})
return result
except Exception as e2:
logger.error(f"{operation_name}: fallback also failed: {e2}")
metrics.increment("fallback.failure", tags={"operation": operation_name})
raise
Human-in-the-Loop and Handoff Patterns
Sometimes the agent should stop. It should ask a person. Not because it failed. Because the risk is too high. Or the decision is too important.
Approval Gateways
Some actions need approval. Always. Before they happen.
High-risk actions:
- Payments
- PII changes
- Production config changes
- Deletions
- External API calls that cost money
class ApprovalGateway:
"""Gateway for actions that require human approval."""
REQUIRES_APPROVAL = [
"payment",
"delete_user",
"update_pii",
"change_production_config",
"external_api_call"
]
async def check_approval(self, action: dict) -> dict:
"""Check if action requires approval."""
action_type = action.get("type")
if action_type in self.REQUIRES_APPROVAL:
# Create approval request
approval_id = await self._create_approval_request(action)
return {
"requires_approval": True,
"approval_id": approval_id,
"action": action,
"message": f"Action '{action_type}' requires approval"
}
return {
"requires_approval": False,
"action": action
}
async def _create_approval_request(self, action: dict) -> str:
"""Create an approval request and notify human."""
approval_id = str(uuid.uuid4())
# Store request
await self._store_approval_request(approval_id, action)
# Notify human (Slack, email, etc.)
await self._notify_human(approval_id, action)
return approval_id
How it works: Before executing, check if approval is needed. If yes, create a request. Notify a human. Wait for approval. Then execute.
Escalation Flows
If an error repeats N times, escalate. Create a ticket. Send to Slack. Get a human involved.
class EscalationManager:
"""Manage escalation when errors repeat."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.error_counts = {} # error_key -> count
async def handle_error(self, error: Exception, context: dict):
"""Handle error and escalate if needed."""
error_key = self._error_key(error, context)
# Increment count
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
count = self.error_counts[error_key]
logger.warning(f"Error occurred: {error} (count: {count})")
# If threshold reached, escalate
if count >= self.max_retries:
await self._escalate(error, context, count)
async def _escalate(self, error: Exception, context: dict, count: int):
"""Escalate to human."""
logger.error(f"Escalating error after {count} occurrences: {error}")
# Create ticket
ticket_id = await self._create_ticket(error, context)
# Send to Slack/Teams
await self._send_alert(error, context, ticket_id)
# Reset count (or don't, to prevent spam)
# self.error_counts[error_key] = 0
def _error_key(self, error: Exception, context: dict) -> str:
"""Generate a key for grouping similar errors."""
error_type = type(error).__name__
operation = context.get("operation", "unknown")
return f"{error_type}:{operation}"
When to escalate: Same error happens 3+ times. Error rate spikes. Critical operation fails.
What to include in escalation: Error details. Context. Steps that led to error. Suggested next step.
Handing Context to Humans Cleanly
When you hand off to a human, give them everything they need. But make it clear. Don’t dump raw logs.
What to include:
- User input
- Plan the agent had
- Steps executed so far
- Failing step
- Error logs
- Suggested next step from agent
def package_context_for_human(
user_input: str,
plan: dict,
executed_steps: list,
failing_step: dict,
error: Exception,
agent_suggestion: str = None
) -> dict:
"""Package context for human review."""
return {
"summary": {
"user_input": user_input,
"status": "failed",
"error": str(error),
"error_type": type(error).__name__
},
"execution": {
"plan": plan,
"steps_executed": executed_steps,
"failing_step": failing_step,
"steps_remaining": len(plan.get("steps", [])) - len(executed_steps)
},
"error_details": {
"message": str(error),
"type": type(error).__name__,
"traceback": format_traceback(error)
},
"agent_suggestion": agent_suggestion or "Please review and provide guidance",
"timestamp": time.time(),
"suggested_actions": [
"Review the error and context",
"Decide on next step",
"Approve retry or provide alternative"
]
}
Format it well: Use clear structure. Use plain language. Highlight important parts. Make it scannable.
Patterns to Cover
“Ask-before-act”: Agent proposes action. Human approves. Then agent acts.
async def ask_before_act(action: dict, context: dict) -> dict:
"""Ask human before acting."""
approval_request = package_context_for_human(
user_input=context.get("user_input"),
plan={"proposed_action": action},
executed_steps=[],
failing_step=None,
error=None,
agent_suggestion=f"Proposed action: {action['type']}"
)
# Send to human
approval_id = await send_approval_request(approval_request)
# Wait for approval
approved = await wait_for_approval(approval_id, timeout=300)
if approved:
return await execute_action(action)
else:
return {"status": "cancelled", "reason": "Not approved"}
“Review-after-act”: Agent acts. Then human reviews. Can undo if needed.
async def review_after_act(action: dict, result: dict, context: dict) -> dict:
"""Act first, then ask human to review."""
# Execute action
execution_result = await execute_action(action)
# Package for review
review_request = {
"action_taken": action,
"result": execution_result,
"context": context,
"can_undo": execution_result.get("can_undo", False)
}
# Send for review
review_id = await send_review_request(review_request)
# If human says undo, undo it
if await should_undo(review_id):
return await undo_action(action, execution_result)
return execution_result
“Stop-and-escalate”: Agent stops. Creates ticket. Waits for human. Doesn’t continue until human responds.
async def stop_and_escalate(error: Exception, context: dict) -> dict:
"""Stop execution and escalate to human."""
# Package context
escalation_context = package_context_for_human(
user_input=context.get("user_input"),
plan=context.get("plan"),
executed_steps=context.get("executed_steps", []),
failing_step=context.get("current_step"),
error=error
)
# Create ticket
ticket_id = await create_ticket(escalation_context)
# Notify human
await notify_human(ticket_id, escalation_context)
# Wait for human response
human_guidance = await wait_for_human_guidance(ticket_id, timeout=3600)
return {
"status": "escalated",
"ticket_id": ticket_id,
"human_guidance": human_guidance,
"next_step": human_guidance.get("next_step")
}
Observability for Failure-First Agents
You can’t fix what you can’t see. You need to log failures. Track them. Alert on them.
What to Log and How
Log structure matters. Make it consistent. Make it searchable.
Log structure:
{
"trace_id": "abc123",
"user_id": "user_456",
"workflow_id": "workflow_789",
"step_name": "call_payment_api",
"tool_name": "payment_gateway",
"error_type": "TimeoutError",
"error_message": "Request timed out after 5.0 seconds",
"retries": 2,
"duration_ms": 5000,
"timestamp": "2025-11-16T10:30:00Z",
"context": {
"operation": "process_payment",
"amount": 100.00,
"currency": "USD"
}
}
Key fields:
trace_id: Track a request across servicesuser_id: Who triggered itworkflow_id: Which workflowstep_name: Which step failedtool_name: Which toolerror_type: What kind of errorretries: How many retriesduration_ms: How long it took
Basic Dashboards
Build simple dashboards. Don’t overcomplicate.
Error rate by tool: Show how often each tool fails. Which tools are unreliable.
Average retries per workflow: Show how often workflows need retries. High retries mean problems.
Top error types per day: Show what’s failing. Timeouts? 500s? Validation errors?
# Example metrics to track
metrics = {
"errors.by_tool": {
"payment_gateway": 5,
"search_api": 2,
"database": 1
},
"retries.per_workflow": 1.2, # Average
"errors.by_type": {
"TimeoutError": 10,
"HTTPError": 5,
"ValidationError": 2
}
}
Simple Alerts
Set up alerts. But don’t alert on everything.
Spike in a specific error type: If timeouts spike, alert. If 500s spike, alert.
Sudden increase in average retries: If retries double, something is wrong.
Critical operation failures: If payments fail, alert immediately.
class AlertManager:
"""Simple alert manager."""
def __init__(self):
self.error_counts = {}
self.retry_counts = {}
def record_error(self, error_type: str, tool_name: str):
"""Record an error and check for alerts."""
key = f"{error_type}:{tool_name}"
self.error_counts[key] = self.error_counts.get(key, 0) + 1
# Alert if spike
if self.error_counts[key] > 10: # Threshold
self._send_alert(f"Spike in {error_type} for {tool_name}")
def record_retry(self, workflow_id: str):
"""Record a retry and check for alerts."""
self.retry_counts[workflow_id] = self.retry_counts.get(workflow_id, 0) + 1
# Alert if too many retries
if self.retry_counts[workflow_id] > 3:
self._send_alert(f"Workflow {workflow_id} retried {self.retry_counts[workflow_id]} times")
Keep it simple. No need for a specific vendor. Use what you have. Logs. Metrics. Alerts.
Testing Failure Scenarios (Without Breaking Prod)
You need to test failures. But you can’t break production. Test in staging. Inject failures. See what happens.
Injecting Fake Tool Errors
In staging, inject random errors. See how the agent handles them.
class FailureInjector:
"""Inject failures for testing."""
def __init__(self, failure_rate: float = 0.1):
self.failure_rate = failure_rate
self.injected_errors = []
async def call_tool_with_injection(self, tool_name: str, func, *args, **kwargs):
"""Call tool with potential failure injection."""
# Randomly inject failure
if random.random() < self.failure_rate:
error_type = random.choice([
"TimeoutError",
"HTTPError",
"ConnectionError"
])
self.injected_errors.append({
"tool": tool_name,
"error": error_type,
"timestamp": time.time()
})
# Raise the error
if error_type == "TimeoutError":
raise asyncio.TimeoutError(f"Injected timeout for {tool_name}")
elif error_type == "HTTPError":
raise HTTPError(f"Injected 500 for {tool_name}", status_code=500)
else:
raise ConnectionError(f"Injected connection error for {tool_name}")
# Normal call
return await func(*args, **kwargs)
What to inject:
- Random 500s
- Timeouts
- Slow responses
- Connection errors
What to test:
- Retry logic works
- Fallbacks trigger
- Timeouts are respected
- Handoffs happen when needed
Testing Retry Limits
Make sure retries stop. Don’t retry forever.
async def test_retry_limits():
"""Test that retries stop at limit."""
max_attempts = 3
attempt_count = 0
async def failing_func():
nonlocal attempt_count
attempt_count += 1
raise Exception("Always fails")
try:
await retry_with_backoff(failing_func, max_attempts=max_attempts)
assert False, "Should have raised exception"
except Exception:
# Should have tried max_attempts times
assert attempt_count == max_attempts
Ensuring Handoffs Work
Simulate repeated failure. Make sure tickets are created. Notifications are sent.
async def test_handoff_on_repeated_failure():
"""Test that handoff happens on repeated failure."""
escalation_manager = EscalationManager(max_retries=3)
tickets_created = []
# Mock ticket creation
async def mock_create_ticket(error, context, count):
tickets_created.append({"error": str(error), "count": count})
escalation_manager._create_ticket = mock_create_ticket
# Simulate 3 failures
for i in range(3):
await escalation_manager.handle_error(
Exception("Test error"),
{"operation": "test"}
)
# Should have created a ticket
assert len(tickets_created) == 1
assert tickets_created[0]["count"] == 3
Practical Checklist
Here’s a checklist you can use:
- Timeouts set per tool?
- Retry rules documented and implemented?
- Fallbacks defined for critical paths?
- Human approvals defined for risky actions?
- Error logging + dashboards in place?
- Failure test suite at least for happy vs failure paths?
Putting It All Together: A Complete Example
Let’s build a simple agent that handles failures well. It calls tools. It has timeouts. It retries. It falls back. It escalates.
The Agent Loop
import asyncio
import time
import logging
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from dataclasses import dataclass
logger = logging.getLogger(__name__)
class AgentState(Enum):
RUNNING = "running"
WAITING_APPROVAL = "waiting_approval"
ESCALATED = "escalated"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ToolConfig:
name: str
timeout: float
max_retries: int
requires_approval: bool = False
class FailureFirstAgent:
"""An agent designed to handle failures gracefully."""
def __init__(
self,
tool_configs: Dict[str, ToolConfig],
workflow_timeout: float = 300.0,
escalation_threshold: int = 3
):
self.tool_configs = tool_configs
self.workflow_timeout = workflow_timeout
self.escalation_threshold = escalation_threshold
self.state = AgentState.RUNNING
self.start_time = None
self.error_counts = {}
self.executed_steps = []
async def run(self, user_input: str, plan: List[Dict]) -> Dict:
"""Run the agent workflow."""
self.start_time = time.time()
self.state = AgentState.RUNNING
try:
for step in plan:
# Check workflow timeout
if self._workflow_timed_out():
return await self._handle_timeout(user_input, plan)
# Execute step
result = await self._execute_step(step, user_input, plan)
if result.get("requires_approval"):
self.state = AgentState.WAITING_APPROVAL
return result
if result.get("escalated"):
self.state = AgentState.ESCALATED
return result
self.executed_steps.append({
"step": step,
"result": result,
"timestamp": time.time()
})
self.state = AgentState.COMPLETED
return {
"status": "completed",
"result": "Workflow completed successfully",
"steps_executed": len(self.executed_steps)
}
except Exception as e:
self.state = AgentState.FAILED
return await self._handle_error(e, user_input, plan)
async def _execute_step(self, step: Dict, user_input: str, plan: List[Dict]) -> Dict:
"""Execute a single step with failure handling."""
tool_name = step.get("tool")
tool_config = self.tool_configs.get(tool_name)
if not tool_config:
raise ValueError(f"Unknown tool: {tool_name}")
# Check if approval needed
if tool_config.requires_approval:
return await self._request_approval(step, user_input, plan)
# Execute with retry and timeout
return await self._call_tool_with_retry(
tool_name,
tool_config,
step.get("params", {})
)
async def _call_tool_with_retry(
self,
tool_name: str,
config: ToolConfig,
params: Dict
) -> Dict:
"""Call tool with retry and timeout."""
last_error = None
for attempt in range(config.max_retries):
try:
# Call with timeout
result = await asyncio.wait_for(
self._call_tool(tool_name, params),
timeout=config.timeout
)
# Success - reset error count
error_key = f"{tool_name}:{type(last_error).__name__ if last_error else 'success'}"
self.error_counts[error_key] = 0
return {
"status": "success",
"result": result,
"attempts": attempt + 1
}
except asyncio.TimeoutError as e:
last_error = e
logger.warning(f"Tool {tool_name} timed out (attempt {attempt + 1})")
# Check if should escalate
if await self._should_escalate(tool_name, e):
return await self._escalate(e, tool_name, params)
except Exception as e:
last_error = e
logger.warning(f"Tool {tool_name} failed: {e} (attempt {attempt + 1})")
# Don't retry certain errors
if not self._should_retry(e):
raise
# Check if should escalate
if await self._should_escalate(tool_name, e):
return await self._escalate(e, tool_name, params)
# Wait before retry
if attempt < config.max_retries - 1:
delay = min(1.0 * (2 ** attempt), 60.0)
await asyncio.sleep(delay)
# All retries exhausted
raise last_error
async def _call_tool(self, tool_name: str, params: Dict) -> Any:
"""Call a tool (implement based on your tools)."""
# This is a placeholder - implement based on your actual tools
if tool_name == "search":
return await self._search_tool(params)
elif tool_name == "update":
return await self._update_tool(params)
else:
raise ValueError(f"Unknown tool: {tool_name}")
async def _search_tool(self, params: Dict) -> Dict:
"""Example search tool."""
query = params.get("query")
# Simulate tool call
await asyncio.sleep(0.1)
return {"results": [f"Result for {query}"]}
async def _update_tool(self, params: Dict) -> Dict:
"""Example update tool."""
record_id = params.get("record_id")
# Simulate tool call
await asyncio.sleep(0.1)
return {"updated": record_id, "status": "success"}
def _should_retry(self, error: Exception) -> bool:
"""Decide if error should be retried."""
# Don't retry client errors
if isinstance(error, ValueError):
return False
if isinstance(error, KeyError):
return False
# Retry timeouts and server errors
return True
async def _should_escalate(self, tool_name: str, error: Exception) -> bool:
"""Check if error should be escalated."""
error_key = f"{tool_name}:{type(error).__name__}"
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
return self.error_counts[error_key] >= self.escalation_threshold
async def _escalate(self, error: Exception, tool_name: str, params: Dict) -> Dict:
"""Escalate to human."""
logger.error(f"Escalating error: {error} for tool {tool_name}")
# Package context
context = {
"error": str(error),
"error_type": type(error).__name__,
"tool": tool_name,
"params": params,
"executed_steps": self.executed_steps
}
# Create ticket (in real implementation)
ticket_id = f"ticket_{int(time.time())}"
return {
"status": "escalated",
"escalated": True,
"ticket_id": ticket_id,
"context": context,
"message": "Error escalated to human for review"
}
async def _request_approval(self, step: Dict, user_input: str, plan: List[Dict]) -> Dict:
"""Request approval for a step."""
approval_context = {
"step": step,
"user_input": user_input,
"plan": plan,
"executed_steps": self.executed_steps
}
# In real implementation, send to approval system
approval_id = f"approval_{int(time.time())}"
return {
"status": "waiting_approval",
"requires_approval": True,
"approval_id": approval_id,
"context": approval_context
}
def _workflow_timed_out(self) -> bool:
"""Check if workflow has timed out."""
if not self.start_time:
return False
elapsed = time.time() - self.start_time
return elapsed >= self.workflow_timeout
async def _handle_timeout(self, user_input: str, plan: List[Dict]) -> Dict:
"""Handle workflow timeout."""
logger.warning("Workflow timed out")
return {
"status": "timeout",
"message": "Workflow exceeded time limit",
"steps_executed": len(self.executed_steps),
"steps_remaining": len(plan) - len(self.executed_steps)
}
async def _handle_error(self, error: Exception, user_input: str, plan: List[Dict]) -> Dict:
"""Handle workflow error."""
logger.error(f"Workflow failed: {error}")
return {
"status": "failed",
"error": str(error),
"error_type": type(error).__name__,
"steps_executed": len(self.executed_steps),
"context": {
"user_input": user_input,
"plan": plan,
"executed_steps": self.executed_steps
}
}
Usage Example
# Configure tools
tool_configs = {
"search": ToolConfig(
name="search",
timeout=10.0,
max_retries=3,
requires_approval=False
),
"update": ToolConfig(
name="update",
timeout=5.0,
max_retries=2,
requires_approval=True # Updates need approval
)
}
# Create agent
agent = FailureFirstAgent(
tool_configs=tool_configs,
workflow_timeout=300.0,
escalation_threshold=3
)
# Define plan
plan = [
{"tool": "search", "params": {"query": "test"}},
{"tool": "update", "params": {"record_id": "123", "data": {"status": "active"}}}
]
# Run agent
result = await agent.run("Process this record", plan)
print(result)
Summary
Failure-first design isn’t about preventing failures. It’s about handling them well.
Start with simple patterns:
- Timeouts per tool and per workflow
- Smart retries with backoff
- Fallbacks for critical paths
- Human handoffs for risky actions
- Good logging and observability
Test failures in staging. Inject errors. See what happens. Make sure your agent fails safely.
The code examples above give you a foundation. Adapt them to your needs. Start simple. Add complexity only when you need it.
Remember: Things will fail. The question is how they fail. Make sure they fail safely.
Discussion
Loading comments...