By Yusuf Elborey

Replayable Agent Workflows: Checkpoints, Time-Travel, and Regression Tests for Tool-Using Agents

agentsdebuggingcheckpointstime-traveldurable-executionreproducibilitytestingopentelemetrylanggraphpythonobservability

Replayable Agent Workflows

Agents fail in production. They make bad tool choices. They get stuck in loops. They produce wrong answers. And when you try to debug them, you can’t reproduce the failure.

This article shows you how to make agents reproducible. You’ll see how to checkpoint every step, replay failed runs, fork from any point, and turn production incidents into regression tests.

The Problem: Agents Are Hard to Debug

Debugging agents is different from debugging normal code. Normal code is deterministic. Same input, same output. Agents are not.

Non-Determinism Everywhere

Agents have multiple sources of randomness:

  • Model sampling: Temperature > 0 means different outputs each time
  • Tool variability: APIs return different results (weather changes, stock prices move, search results shift)
  • Network timing: Retries, timeouts, rate limits
  • Concurrent state: Multiple agents or users modifying shared state
# Same prompt, different outputs
response1 = llm.generate("Summarize this doc", temperature=0.7)
response2 = llm.generate("Summarize this doc", temperature=0.7)
# response1 != response2

# Same tool call, different results
weather1 = get_weather("San Francisco")  # "Sunny, 65°F"
weather2 = get_weather("San Francisco")  # "Cloudy, 63°F" (5 minutes later)

You can’t just re-run the agent and expect the same behavior.

Side Effects Make Things Worse

Agents don’t just read. They write. They send emails. They charge credit cards. They commit code. They delete files.

# Dangerous side effects
agent.run("Send reminder emails to all overdue customers")
# - Sends 500 emails
# - Can't undo
# - Can't safely replay

agent.run("Refund the last 10 orders")
# - Charges credit cards
# - Updates database
# - Triggers webhooks
# - Can't replay without real consequences

When debugging, you can’t just replay these operations. You’ll send duplicate emails. You’ll double-charge customers. You’ll corrupt state.

”It Worked Yesterday” Incidents

The worst debugging scenario: it worked yesterday, it fails today, and you can’t figure out why.

Maybe the model changed. Maybe a tool API changed. Maybe the data changed. Maybe it was just random. You don’t know. You can’t reproduce it. You can’t fix it.

# Production incident
run_id = "run_abc123"
# Agent failed at step 7
# Error: "Tool 'search_docs' returned empty results"
# Why? Was the query bad? Did the index break? Was it a network issue?
# Can't tell. Can't replay. Can't debug.

Without reproducibility, you’re guessing. You make a change. You hope it fixes the problem. You deploy. You wait. Maybe it works. Maybe it doesn’t.

Define “Replayable”

A replayable agent emits events for every step. You can replay those events and get the same state transitions.

Every Step Emits an Event

An event captures everything needed to reproduce a step:

  • Input state: Messages, variables, context
  • Decision: Which tool to call, which branch to take
  • Tool call: Tool name, arguments, timestamp
  • Tool result: Output, errors, duration
  • New state: Updated messages, variables, context
class StepEvent:
    def __init__(
        self,
        step_id: str,
        input_state: dict,
        decision: str,
        tool_name: str,
        tool_args: dict,
        tool_result: dict,
        output_state: dict,
        timestamp: datetime
    ):
        self.step_id = step_id
        self.input_state = input_state
        self.decision = decision
        self.tool_name = tool_name
        self.tool_args = tool_args
        self.tool_result = tool_result
        self.output_state = output_state
        self.timestamp = timestamp

Every step emits one event. The event stream is the complete history of the run.

Replay Means Re-Running the Event Stream

Replay doesn’t mean re-executing the agent. It means re-running the recorded events.

def replay_run(run_id: str):
    """Replay a run from recorded events."""
    events = load_events(run_id)
    state = {}
    
    for event in events:
        # Restore input state
        state = event.input_state
        
        # Use recorded tool result (don't re-execute)
        tool_result = event.tool_result
        
        # Apply state transition
        state = event.output_state
        
        print(f"Step {event.step_id}: {event.tool_name} -> {tool_result}")
    
    return state

Replay uses recorded tool results. It doesn’t re-call APIs. It doesn’t re-execute side effects. It just replays the state transitions.

Checkpoint Design

Checkpoints store everything needed to resume or replay a run.

What to Store Per Step

Store these fields for each checkpoint:

  • Run metadata: run_id, user_id, start_time, agent_version
  • Step metadata: step_id, step_number, timestamp
  • Messages: Full conversation history up to this point
  • Tool calls: Tool name, arguments, result, duration, errors
  • Model config: Model name, temperature, max_tokens, prompt version
  • State: All variables, context, flags
class Checkpoint:
    def __init__(
        self,
        run_id: str,
        step_id: str,
        step_number: int,
        timestamp: datetime,
        messages: list[dict],
        tool_calls: list[dict],
        model_config: dict,
        state: dict,
        agent_version: str
    ):
        self.run_id = run_id
        self.step_id = step_id
        self.step_number = step_number
        self.timestamp = timestamp
        self.messages = messages
        self.tool_calls = tool_calls
        self.model_config = model_config
        self.state = state
        self.agent_version = agent_version

Store enough to resume from any step. Store enough to understand what happened. Store enough to debug.

Where to Store: Database + Blob Store

Use a database for metadata and a blob store for large payloads.

class CheckpointStore:
    def __init__(self, db, blob_store):
        self.db = db
        self.blob_store = blob_store
    
    def save_checkpoint(self, checkpoint: Checkpoint):
        """Save checkpoint to DB + blob store."""
        # Save metadata to DB
        self.db.execute("""
            INSERT INTO checkpoints (
                run_id, step_id, step_number, timestamp, agent_version
            ) VALUES (?, ?, ?, ?, ?)
        """, (
            checkpoint.run_id,
            checkpoint.step_id,
            checkpoint.step_number,
            checkpoint.timestamp,
            checkpoint.agent_version
        ))
        
        # Save large payloads to blob store
        blob_key = f"{checkpoint.run_id}/{checkpoint.step_id}"
        self.blob_store.put(blob_key, {
            "messages": checkpoint.messages,
            "tool_calls": checkpoint.tool_calls,
            "state": checkpoint.state
        })
def load_checkpoint(self, run_id: str, step_id: str) -> Checkpoint:
    """Load checkpoint from DB + blob store."""
    # Load metadata from DB
    row = self.db.execute("""
        SELECT step_number, timestamp, agent_version
        FROM checkpoints
        WHERE run_id = ? AND step_id = ?
    """, (run_id, step_id)).fetchone()
    
    # Load payloads from blob store
    blob_key = f"{run_id}/{step_id}"
    blob_data = self.blob_store.get(blob_key)
    
    return Checkpoint(
        run_id=run_id,
        step_id=step_id,
        step_number=row[0],
        timestamp=row[1],
        messages=blob_data["messages"],
        tool_calls=blob_data["tool_calls"],
        state=blob_data["state"],
        agent_version=row[2],
        model_config={}
    )

Database for queries. Blob store for large data. Keep them in sync.


### Redaction Boundaries

Don't store secrets or PII in checkpoints. Redact them.

```python
import re

def redact_sensitive_data(data: dict) -> dict:
    """Redact PII and secrets from checkpoint data."""
    redacted = data.copy()
    
    # Redact API keys
    if "api_key" in redacted:
        redacted["api_key"] = "REDACTED"
    
    # Redact email addresses
    if "email" in redacted:
        redacted["email"] = re.sub(
            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
            'user@REDACTED.com',
            redacted["email"]
        )
    
    # Redact credit card numbers
    if "credit_card" in redacted:
        redacted["credit_card"] = "XXXX-XXXX-XXXX-XXXX"
    
    return redacted

Redact before storing. Keep a separate secure store for sensitive data if you need it for debugging.

Retention Rules

Don’t store checkpoints forever. Set retention policies.

RETENTION_POLICIES = {
    "successful_runs": timedelta(days=7),
    "failed_runs": timedelta(days=30),
    "flagged_runs": timedelta(days=90),
    "test_runs": timedelta(days=1)
}

def cleanup_old_checkpoints():
    """Delete checkpoints older than retention policy."""
    for run_type, retention in RETENTION_POLICIES.items():
        cutoff = datetime.now() - retention
        
        db.execute("""
            DELETE FROM checkpoints
            WHERE run_type = ? AND timestamp < ?
        """, (run_type, cutoff))

Keep failed runs longer. Keep successful runs shorter. Keep test runs shortest.

Determinism Rules That Actually Hold Up

You can’t make agents fully deterministic. But you can make them deterministic enough to replay.

Wrap Side Effects Into Idempotent Tasks

Make side effects idempotent. Same input, same effect (even if called multiple times).

class IdempotentTask:
    def __init__(self, task_id: str):
        self.task_id = task_id
        self.executed = set()
    
    def execute(self, operation: callable, *args, **kwargs):
        """Execute operation idempotently."""
        # Check if already executed
        op_key = f"{self.task_id}:{operation.__name__}:{args}:{kwargs}"
        
        if op_key in self.executed:
            print(f"Task {op_key} already executed, skipping")
            return
        
        # Execute
        result = operation(*args, **kwargs)
        
        # Mark as executed
        self.executed.add(op_key)
        
        return result

# Usage
task = IdempotentTask("send_email_task_123")
task.execute(send_email, to="user@example.com", subject="Reminder")
task.execute(send_email, to="user@example.com", subject="Reminder")  # Skipped

Idempotent tasks can be replayed safely. They won’t duplicate side effects.

Separate Read Tools from Write Tools

Read tools are safe to replay. Write tools are dangerous.

class ToolRegistry:
    def __init__(self):
        self.read_tools = {}
        self.write_tools = {}
    
    def register_read_tool(self, name: str, func: callable):
        """Register a read-only tool."""
        self.read_tools[name] = func
    
    def register_write_tool(self, name: str, func: callable):
        """Register a write tool (has side effects)."""
        self.write_tools[name] = func
    
    def is_write_tool(self, name: str) -> bool:
        """Check if tool has side effects."""
        return name in self.write_tools

# Register tools
registry = ToolRegistry()
registry.register_read_tool("search_docs", search_docs)
registry.register_read_tool("get_weather", get_weather)
registry.register_write_tool("send_email", send_email)
registry.register_write_tool("charge_card", charge_card)

When replaying, skip write tools. Use recorded results instead.

Freeze Randomness Where Possible

Use seeds for model sampling. Record tool results for everything else.

def run_agent_with_seed(query: str, seed: int = 42):
    """Run agent with fixed random seed."""
    # Set seed for model
    model_config = {
        "temperature": 0.7,
        "seed": seed  # Some models support this
    }
    
    # Run agent
    response = llm.generate(query, **model_config)
    return response

# Same seed, same output (if model supports it)
response1 = run_agent_with_seed("Summarize this", seed=42)
response2 = run_agent_with_seed("Summarize this", seed=42)
# response1 == response2 (hopefully)

Not all models support seeds. For those that don’t, record the actual output and replay it.

class RecordedTool:
    def __init__(self, tool_func: callable, recorder):
        self.tool_func = tool_func
        self.recorder = recorder
    
    def __call__(self, *args, **kwargs):
        """Call tool and record result."""
        result = self.tool_func(*args, **kwargs)
        self.recorder.record(self.tool_func.__name__, args, kwargs, result)
        return result

Record everything that’s non-deterministic. Replay from recordings.

Time-Travel Debugging

Time-travel debugging lets you resume from any checkpoint, fork the run, and compare outcomes.

Resume from Checkpoint

Load a checkpoint and continue from there.

def resume_from_checkpoint(run_id: str, step_id: str):
    """Resume agent execution from a checkpoint."""
    # Load checkpoint
    checkpoint = checkpoint_store.load_checkpoint(run_id, step_id)
    
    # Restore state
    state = checkpoint.state
    messages = checkpoint.messages
    
    # Continue execution
    agent = Agent(state=state, messages=messages)
    result = agent.run()
    
    return result

# Resume from step 5
result = resume_from_checkpoint("run_abc123", "step_5")

This lets you continue a failed run. Fix the issue. Resume. See if it works.

Fork the Run with Modified State

Load a checkpoint, modify the state, and continue. See what happens.

def fork_run(run_id: str, step_id: str, state_modifications: dict):
    """Fork a run with modified state."""
    # Load checkpoint
    checkpoint = checkpoint_store.load_checkpoint(run_id, step_id)
    
    # Apply modifications
    state = checkpoint.state.copy()
    state.update(state_modifications)
    
    # Create new run ID for fork
    fork_run_id = f"{run_id}_fork_{uuid.uuid4().hex[:8]}"
    
    # Continue with modified state
    agent = Agent(state=state, messages=checkpoint.messages)
    result = agent.run(run_id=fork_run_id)
    
    return fork_run_id, result

# Fork and remove a document from context
fork_id, result = fork_run(
    "run_abc123",
    "step_5",
    {"retrieved_docs": []}  # Remove docs
)

Forking lets you test “what if” scenarios. What if we removed this doc? What if we changed this parameter? What if we used a different tool?

Compare Forks: Where Did They Diverge?

Run two forks and compare where they diverged.

def compare_runs(run_id_1: str, run_id_2: str):
    """Compare two runs and find where they diverged."""
    events_1 = load_events(run_id_1)
    events_2 = load_events(run_id_2)
    
    divergence_point = None
    
    for i, (e1, e2) in enumerate(zip(events_1, events_2)):
        if e1.tool_name != e2.tool_name or e1.tool_args != e2.tool_args:
            divergence_point = i
            break
    
    if divergence_point is None:
        print("Runs are identical")
        return
    
    print(f"Runs diverged at step {divergence_point}")
    print(f"Run 1: {events_1[divergence_point].tool_name}({events_1[divergence_point].tool_args})")
    print(f"Run 2: {events_2[divergence_point].tool_name}({events_2[divergence_point].tool_args})")
    
    return divergence_point

# Compare original and fork
compare_runs("run_abc123", "run_abc123_fork_a1b2c3d4")

This shows you exactly where behavior changed. Which tool choice was different. Which argument was different. Why the outcome changed.

Turn Production Runs Into Regression Tests

The best test cases come from production. Real failures. Real edge cases. Real user queries.

Golden Traces: Store Real “Bad” Runs

When a run fails or produces a bad result, save it as a golden trace.

class GoldenTrace:
    def __init__(self, run_id: str, description: str, expected_outcome: str):
        self.run_id = run_id
        self.description = description
        self.expected_outcome = expected_outcome
    
    def save(self):
        """Save golden trace for regression testing."""
        db.execute("""
            INSERT INTO golden_traces (run_id, description, expected_outcome)
            VALUES (?, ?, ?)
        """, (self.run_id, self.description, self.expected_outcome))

# Save a bad run
trace = GoldenTrace(
    run_id="run_abc123",
    description="Agent got stuck in loop when docs were empty",
    expected_outcome="Should refuse and say 'no docs found'"
)
trace.save()

Golden traces become regression tests. Every time you change the agent, replay the golden traces. Make sure they pass.

Record/Replay Tool Stubs

Record tool calls during production. Replay them in tests.

class ToolRecorder:
    def __init__(self):
        self.recordings = []
    
    def record(self, tool_name: str, args: dict, result: dict):
        """Record a tool call."""
        self.recordings.append({
            "tool_name": tool_name,
            "args": args,
            "result": result
        })
    
    def save(self, run_id: str):
        """Save recordings to file."""
        with open(f"recordings/{run_id}.json", "w") as f:
            json.dump(self.recordings, f)

class ToolReplayer:
    def __init__(self, run_id: str):
        with open(f"recordings/{run_id}.json", "r") as f:
            self.recordings = json.load(f)
        self.index = 0
    
    def replay(self, tool_name: str, args: dict) -> dict:
        """Replay a recorded tool call."""
        recording = self.recordings[self.index]
        self.index += 1
        
        # Verify tool name and args match
        assert recording["tool_name"] == tool_name
        assert recording["args"] == args
        
        return recording["result"]

In tests, use the replayer instead of real tools. Tests run fast. Tests are deterministic. Tests don’t hit real APIs.

Snapshot Tests for Intermediate States

Test intermediate states, not just final answers.

def test_agent_intermediate_states():
    """Test that agent reaches expected intermediate states."""
    # Replay run
    events = load_events("run_abc123")
    
    # Check state at step 3
    assert events[2].output_state["retrieved_docs_count"] == 5
    
    # Check state at step 5
    assert events[4].output_state["selected_tool"] == "summarize"
    
    # Check final state
    assert events[-1].output_state["answer_length"] > 100

Snapshot tests catch regressions in intermediate behavior. Not just final output.

Add Observability That’s Worth Keeping

Emit traces, logs, and metrics that help you debug.

Traces for Every Run

Create a trace per run. Add spans for each step and tool call.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor

# Setup tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

def run_agent_with_tracing(query: str, run_id: str):
    """Run agent with OpenTelemetry tracing."""
    with tracer.start_as_current_span("agent_run") as run_span:
        run_span.set_attribute("run_id", run_id)
        run_span.set_attribute("query", query)
        
        # Step 1: Retrieve docs
        with tracer.start_as_current_span("retrieve_docs") as step_span:
            step_span.set_attribute("step_number", 1)
            docs = retrieve_docs(query)
            step_span.set_attribute("docs_count", len(docs))
        
        # Step 2: Rerank
        with tracer.start_as_current_span("rerank") as step_span:
            step_span.set_attribute("step_number", 2)
            ranked_docs = rerank(docs, query)
        
        # Step 3: Generate
        with tracer.start_as_current_span("generate") as step_span:
            step_span.set_attribute("step_number", 3)
            answer = generate(ranked_docs, query)
            step_span.set_attribute("answer_length", len(answer))
        
        return answer

Traces show you the full execution path. Which steps ran. How long they took. What attributes they had.

Emit Spans for Tool Calls, Retries, Errors

Add spans for tool calls. Track latency, errors, retries.

def call_tool_with_tracing(tool_name: str, args: dict):
    """Call tool with tracing."""
    with tracer.start_as_current_span(f"tool_{tool_name}") as span:
        span.set_attribute("tool_name", tool_name)
        span.set_attribute("args", json.dumps(args))
        
        start_time = time.time()
        
        try:
            result = call_tool(tool_name, args)
            span.set_attribute("success", True)
            span.set_attribute("result_size", len(str(result)))
            return result
        except Exception as e:
            span.set_attribute("success", False)
            span.set_attribute("error", str(e))
            span.record_exception(e)
            raise
        finally:
            duration = time.time() - start_time
            span.set_attribute("duration_ms", duration * 1000)

Tool spans show you which tools were slow. Which tools failed. Which tools were retried.

Align to GenAI Semantic Conventions

Use OpenTelemetry semantic conventions for GenAI.

# GenAI semantic conventions
GENAI_SYSTEM = "gen_ai.system"  # e.g. "openai"
GENAI_REQUEST_MODEL = "gen_ai.request.model"  # e.g. "gpt-4"
GENAI_REQUEST_TEMPERATURE = "gen_ai.request.temperature"
GENAI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens"
GENAI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reasons"
GENAI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
GENAI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"

def call_llm_with_tracing(prompt: str, model: str, temperature: float):
    """Call LLM with GenAI semantic conventions."""
    with tracer.start_as_current_span("llm_call") as span:
        span.set_attribute(GENAI_SYSTEM, "openai")
        span.set_attribute(GENAI_REQUEST_MODEL, model)
        span.set_attribute(GENAI_REQUEST_TEMPERATURE, temperature)
        span.set_attribute(GENAI_REQUEST_MAX_TOKENS, 1000)
        
        response = llm.generate(prompt, model=model, temperature=temperature)
        
        span.set_attribute(GENAI_RESPONSE_FINISH_REASON, response.finish_reason)
        span.set_attribute(GENAI_USAGE_INPUT_TOKENS, response.usage.input_tokens)
        span.set_attribute(GENAI_USAGE_OUTPUT_TOKENS, response.usage.output_tokens)
        
        return response.text

Semantic conventions make traces portable. Tools can parse them. Dashboards can visualize them. Alerts can trigger on them.

Operational Checklist

Here’s how to use this in production.

Incident Workflow

When an incident happens:

  1. Locate the run: Find the run_id from logs or user report
  2. Replay the run: See what happened step by step
  3. Fork from the failure point: Try different fixes
  4. Compare forks: See which fix works
  5. Add regression test: Save the run as a golden trace
  6. Deploy the fix: Update prompt, tool, or policy
  7. Verify: Replay the golden trace, confirm it passes
def handle_incident(run_id: str):
    """Handle a production incident."""
    # 1. Replay
    print("Replaying run...")
    replay_run(run_id)
    
    # 2. Fork with fix
    print("Forking with fix...")
    fork_id, result = fork_run(run_id, "step_5", {"max_retries": 3})
    
    # 3. Compare
    print("Comparing runs...")
    compare_runs(run_id, fork_id)
    
    # 4. Save as golden trace
    print("Saving golden trace...")
    trace = GoldenTrace(run_id, "Incident XYZ", "Should succeed")
    trace.save()
    
    print("Incident handled")

Cost Controls

Checkpoints cost money. Storage costs. Query costs. Set limits.

# Sampling: Only checkpoint 10% of successful runs
def should_checkpoint(run_id: str, success: bool) -> bool:
    """Decide if we should checkpoint this run."""
    if not success:
        return True  # Always checkpoint failures
    
    # Sample 10% of successes
    return hash(run_id) % 10 == 0

# Compression: Compress large payloads
import gzip

def compress_checkpoint(checkpoint: Checkpoint) -> bytes:
    """Compress checkpoint data."""
    data = json.dumps({
        "messages": checkpoint.messages,
        "state": checkpoint.state
    })
    return gzip.compress(data.encode())

# Store only deltas
def store_delta(prev_checkpoint: Checkpoint, curr_checkpoint: Checkpoint):
    """Store only what changed."""
    delta = {
        "added_messages": curr_checkpoint.messages[len(prev_checkpoint.messages):],
        "state_changes": {
            k: v for k, v in curr_checkpoint.state.items()
            if prev_checkpoint.state.get(k) != v
        }
    }
    return delta

Sample. Compress. Store deltas. Keep costs down.

Real Incident Walkthrough

Here’s a real example. An agent got stuck in a loop. Let’s debug it.

The Incident

User reports: “Agent keeps searching the same docs over and over. Never gives an answer.”

Run ID: run_xyz789

Step 1: Replay the Run

events = load_events("run_xyz789")

for i, event in enumerate(events):
    print(f"Step {i}: {event.tool_name}({event.tool_args})")

# Output:
# Step 0: search_docs({"query": "refund policy"})
# Step 1: search_docs({"query": "refund policy"})
# Step 2: search_docs({"query": "refund policy"})
# Step 3: search_docs({"query": "refund policy"})
# ...
# (repeats 20 times, then times out)

The agent is stuck in a loop. It keeps calling search_docs with the same query.

Step 2: Inspect the Tool Results

# Check what search_docs returned
for event in events[:5]:
    print(f"Tool result: {event.tool_result}")

# Output:
# Tool result: {"docs": [], "count": 0}
# Tool result: {"docs": [], "count": 0}
# Tool result: {"docs": [], "count": 0}
# ...

The tool returns empty results every time. The agent doesn’t know how to handle empty results. It just retries.

Step 3: Fork with a Fix

The fix: Add a check for empty results. If empty, refuse instead of retrying.

# Fork from step 1 with modified agent logic
fork_id, result = fork_run(
    "run_xyz789",
    "step_1",
    {"agent_version": "v2_with_empty_check"}
)

# Check fork result
fork_events = load_events(fork_id)
print(f"Fork completed in {len(fork_events)} steps")
print(f"Final answer: {fork_events[-1].output_state['answer']}")

# Output:
# Fork completed in 2 steps
# Final answer: "I don't have any documents about refund policy. Please check with support."

The fork works. The agent refuses when docs are empty. No loop.

Step 4: Add Regression Test

def test_empty_docs_no_loop():
    """Test that agent doesn't loop when docs are empty."""
    # Replay the original bad run
    events = load_events("run_xyz789")
    
    # Should not have more than 3 search attempts
    search_count = sum(1 for e in events if e.tool_name == "search_docs")
    assert search_count <= 3, f"Agent looped {search_count} times"
    
    # Should refuse when docs are empty
    final_answer = events[-1].output_state.get("answer", "")
    assert "don't have" in final_answer.lower() or "no documents" in final_answer.lower()

# Run test
test_empty_docs_no_loop()

The test fails on the old version. It passes on the new version. Perfect.

Step 5: Deploy and Verify

Deploy the fix. Replay the golden trace. Confirm it passes.

# Deploy v2
deploy_agent_version("v2_with_empty_check")

# Replay golden trace
replay_run("run_xyz789")

# Verify it doesn't loop
events = load_events("run_xyz789_replay")
assert len(events) <= 3

Fixed. Tested. Deployed. Verified. Done.

Code Samples

The code repository includes six runnable examples:

  1. Minimal Agent Graph: Agent with step boundaries that emit events
  2. Checkpoint Store: Interface + SQLite implementation
  3. Record/Replay Tool Wrapper: Records tool calls, replays from fixtures
  4. Time-Travel Runner: Resume from checkpoint, fork state, continue
  5. Pytest Regression Harness: Replays known-bad runs, asserts tool sequence and state
  6. OpenTelemetry Instrumentation: Traces per run, spans per step/tool

See the GitHub repository for complete, runnable code.

Summary

Agents are hard to debug because they’re non-deterministic and have side effects. You can’t just re-run them and expect the same behavior.

Make agents replayable by:

  1. Emitting events for every step
  2. Storing checkpoints with full state
  3. Recording tool results
  4. Making side effects idempotent
  5. Separating read tools from write tools

Use time-travel debugging to:

  1. Resume from any checkpoint
  2. Fork runs with modified state
  3. Compare where runs diverged

Turn production incidents into regression tests by:

  1. Saving failed runs as golden traces
  2. Recording tool calls for replay
  3. Testing intermediate states, not just final output

Add observability with:

  1. OpenTelemetry traces per run
  2. Spans for steps, tools, retries, errors
  3. GenAI semantic conventions

When an incident happens: locate the run, replay it, fork with fixes, compare outcomes, add a regression test, deploy, and verify.

Replayable agents are debuggable agents. Debuggable agents are fixable agents. Fixable agents are reliable agents.

Start by adding checkpoints. Record tool results. Build replay capability. Your future self will thank you.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000