Multi-Agent Mesh Observability: Tracing One Task Across 5 Agents with OpenTelemetry
One user request. Five agents. Three tools. Two retries. Where did it break?
You check your logs. You see entries from different agents. Different timestamps. Different correlation IDs. Nothing connects. You can’t see the full story.
This is the problem with multi-agent systems. One request bounces between agents. Each agent calls tools. Tools fail and retry. Agents delegate to other agents. By the time you need to debug, you’re looking at scattered logs with no clear path.
OpenTelemetry fixes this. You get one trace. One story. One view of what happened.
The problem in one picture
Here’s what happens in a typical multi-agent workflow:
User Request
↓
Planner Agent (decides what to do)
↓
Tool Agent (calls external API)
↓
Verifier Agent (checks the result)
↓
Summarizer Agent (formats the output)
↓
Response
Each agent runs independently. Each tool call happens separately. Each retry creates a new log entry. Without correlation, you can’t connect them.
What breaks today
Missing correlation IDs: Each agent generates its own request ID. Agent A has ID “abc-123”. Agent B has ID “xyz-789”. They’re not connected.
Partial logs: You see Agent A’s logs. You see Tool X’s logs. You don’t see how they relate. You don’t see the delegation chain.
No latency breakdown: The request took 5 seconds. Where did the time go? You don’t know. Was it Agent A? Tool X? The network? You’re guessing.
Retry loops are invisible: An agent retries a tool call three times. You see three separate log entries. You don’t see they’re retries of the same call.
Error propagation is unclear: Tool X fails. Agent B doesn’t handle it. Agent A gets bad data. The error message says “Agent A failed.” But Agent A didn’t fail. Tool X failed. You can’t see that.
What “good” looks like
A single trace shows everything:
Trace: trace-abc-123
├── Span: user-request (root)
│ ├── Span: planner-agent
│ │ ├── Span: tool-call-search
│ │ └── Span: tool-call-validate
│ ├── Span: tool-agent
│ │ ├── Span: api-call (retry: 1)
│ │ └── Span: api-call (retry: 2, success)
│ ├── Span: verifier-agent
│ │ └── Span: validation-check
│ └── Span: summarizer-agent
│ └── Span: format-output
One trace. One story. You can answer:
- “Where did time go?” → Look at span durations
- “Which agent caused retries?” → See the retry spans under tool-agent
- “Which tool is flaky?” → Check error rates per tool type
- “What was the full path?” → Follow the span tree
The agent mesh tracing model
OpenTelemetry provides a standard way to trace multi-agent systems. You need to define a few conventions.
Naming scheme
Pick attribute names and stick with them:
Agent attributes:
agent.name: The agent identifier (e.g., “planner-agent”)agent.role: The agent’s role (e.g., “planner”, “tool-executor”, “verifier”)conversation.id: The conversation or session IDworkflow.id: The workflow instance ID
Tool attributes:
tool.name: The tool identifier (e.g., “search-api”)tool.type: The tool category (e.g., “http”, “database”, “llm”)tool.target: The target resource (e.g., “https://api.example.com/search”)
Span naming:
- Agent spans:
agent.{name}.{action}(e.g., “agent.planner.decide”) - Tool spans:
tool.{name}.{action}(e.g., “tool.search-api.query”) - Workflow spans:
workflow.{workflow_id}.{step}
What becomes a span vs. an event
Spans represent operations with duration:
- Agent processing a task
- Tool invocation
- Agent-to-agent delegation
- Retry attempts
Events represent discrete moments:
- Agent decision point
- Tool result received
- Error occurred
- Budget limit reached
Use spans for timing. Use events for markers.
Context propagation across agents
This is the critical part. You need to pass trace context between agents.
OpenTelemetry uses W3C Trace Context. The traceparent header carries the trace ID, span ID, and flags. You pass it in:
Message envelopes between agents:
{
"message": "Please search for user data",
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
}
Tool invocation payloads:
{
"tool": "search-api",
"params": {...},
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
}
Queue messages (if async):
{
"task": {...},
"metadata": {
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
}
}
Each agent extracts the traceparent, creates a child span, and passes it forward.
A tiny working reference implementation
Let’s build a simple 5-agent system with explicit tracing. You can run this locally and see traces in Jaeger.
Architecture
Five agents:
- Planner Agent: Receives user request, decides what to do
- Tool Agent: Calls external tools
- Verifier Agent: Validates results
- Summarizer Agent: Formats output
- Mesh Router: Routes messages between agents, preserves trace context
Each agent creates spans. Each tool call creates a nested span. The mesh router passes trace context.
Setup
First, install dependencies:
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-jaeger
Create the OpenTelemetry setup:
# src/tracing/setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
def setup_tracing(service_name: str):
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
# Export to Jaeger
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
processor = BatchSpanProcessor(jaeger_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(__name__)
Agent base class with tracing
All agents inherit from a base class that handles tracing:
# src/agents/base_agent.py
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from typing import Dict, Any, Optional
import json
class BaseAgent:
def __init__(self, name: str, role: str, tracer):
self.name = name
self.role = role
self.tracer = tracer
def extract_trace_context(self, message: Dict[str, Any]) -> Optional[Any]:
"""Extract trace context from message envelope."""
traceparent = message.get("traceparent")
if not traceparent:
return None
carrier = {"traceparent": traceparent}
return TraceContextTextMapPropagator().extract(carrier)
def inject_trace_context(self, context: Any) -> str:
"""Inject trace context into message envelope."""
carrier = {}
TraceContextTextMapPropagator().inject(carrier, context)
return carrier.get("traceparent", "")
def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Process a message with tracing."""
# Extract trace context
parent_context = self.extract_trace_context(message)
# Start span
with self.tracer.start_as_current_span(
f"agent.{self.name}.process",
context=parent_context
) as span:
# Set agent attributes
span.set_attribute("agent.name", self.name)
span.set_attribute("agent.role", self.role)
span.set_attribute("conversation.id", message.get("conversation_id", ""))
span.set_attribute("workflow.id", message.get("workflow_id", ""))
# Process the message
result = self._do_process(message)
# Inject trace context into result
current_context = trace.context_api.get_current()
traceparent = self.inject_trace_context(current_context)
result["traceparent"] = traceparent
return result
def _do_process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Subclasses implement this."""
raise NotImplementedError
Tool calls with tracing
Tools create nested spans:
# src/tools/base_tool.py
from opentelemetry import trace
from typing import Dict, Any, Optional
import time
class BaseTool:
def __init__(self, name: str, tool_type: str, tracer):
self.name = name
self.tool_type = tool_type
self.tracer = tracer
def call(self, params: Dict[str, Any], traceparent: Optional[str] = None) -> Dict[str, Any]:
"""Call the tool with tracing."""
# Extract trace context if provided
parent_context = None
if traceparent:
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
carrier = {"traceparent": traceparent}
parent_context = TraceContextTextMapPropagator().extract(carrier)
# Start span
with self.tracer.start_as_current_span(
f"tool.{self.name}.call",
context=parent_context
) as span:
# Set tool attributes
span.set_attribute("tool.name", self.name)
span.set_attribute("tool.type", self.tool_type)
span.set_attribute("tool.target", params.get("target", ""))
# Call the tool
start_time = time.time()
try:
result = self._do_call(params)
duration = (time.time() - start_time) * 1000
span.set_attribute("tool.duration_ms", duration)
span.set_attribute("tool.success", True)
return result
except Exception as e:
duration = (time.time() - start_time) * 1000
span.set_attribute("tool.duration_ms", duration)
span.set_attribute("tool.success", False)
span.set_attribute("tool.error", str(e))
span.record_exception(e)
raise
def _do_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Subclasses implement this."""
raise NotImplementedError
Mesh router with context propagation
The mesh router forwards messages and preserves trace context:
# src/mesh/router.py
from typing import Dict, Any, List
from opentelemetry import trace
class MeshRouter:
def __init__(self, agents: Dict[str, Any], tracer):
self.agents = agents
self.tracer = tracer
def route(self, message: Dict[str, Any], route: List[str]) -> Dict[str, Any]:
"""Route message through agent chain, preserving trace context."""
current_message = message
for agent_name in route:
agent = self.agents.get(agent_name)
if not agent:
raise ValueError(f"Agent {agent_name} not found")
# Process with current agent
current_message = agent.process(current_message)
# Trace context is automatically propagated via message envelope
return current_message
Example: Planner Agent
# src/agents/planner_agent.py
from src.agents.base_agent import BaseAgent
from typing import Dict, Any
class PlannerAgent(BaseAgent):
def _do_process(self, message: Dict[str, Any]) -> Dict[str, Any]:
user_request = message.get("request", "")
# Agent logic: decide what to do
plan = {
"steps": [
{"agent": "tool-agent", "action": "search", "query": user_request},
{"agent": "verifier-agent", "action": "validate"},
{"agent": "summarizer-agent", "action": "format"}
]
}
return {
"plan": plan,
"conversation_id": message.get("conversation_id"),
"workflow_id": message.get("workflow_id")
}
Example: Tool Agent with retries
# src/agents/tool_agent.py
from src.agents.base_agent import BaseAgent
from src.tools.search_tool import SearchTool
from opentelemetry import trace
from typing import Dict, Any
import time
class ToolAgent(BaseAgent):
def __init__(self, name: str, role: str, tracer, search_tool):
super().__init__(name, role, tracer)
self.search_tool = search_tool
def _do_process(self, message: Dict[str, Any]) -> Dict[str, Any]:
plan = message.get("plan", {})
steps = plan.get("steps", [])
tool_step = next((s for s in steps if s.get("agent") == "tool-agent"), None)
if not tool_step:
return message
# Extract trace context for tool call
traceparent = message.get("traceparent", "")
# Call tool with retries
max_retries = 3
for attempt in range(max_retries):
with self.tracer.start_as_current_span(
f"tool-agent.retry.{attempt + 1}"
) as retry_span:
retry_span.set_attribute("retry.attempt", attempt + 1)
retry_span.set_attribute("retry.max_attempts", max_retries)
try:
result = self.search_tool.call(
{"query": tool_step.get("query", "")},
traceparent=traceparent
)
retry_span.set_attribute("retry.success", True)
break
except Exception as e:
retry_span.set_attribute("retry.success", False)
if attempt == max_retries - 1:
raise
time.sleep(0.1 * (attempt + 1))
message["tool_result"] = result
return message
Running the example
Create a main script:
# examples/run_example.py
from src.tracing.setup import setup_tracing
from src.agents.planner_agent import PlannerAgent
from src.agents.tool_agent import ToolAgent
from src.agents.verifier_agent import VerifierAgent
from src.agents.summarizer_agent import SummarizerAgent
from src.mesh.router import MeshRouter
from src.tools.search_tool import SearchTool
def main():
# Setup tracing
tracer = setup_tracing("multi-agent-mesh")
# Create tools
search_tool = SearchTool("search-api", "http", tracer)
# Create agents
planner = PlannerAgent("planner-agent", "planner", tracer)
tool_agent = ToolAgent("tool-agent", "tool-executor", tracer, search_tool)
verifier = VerifierAgent("verifier-agent", "verifier", tracer)
summarizer = SummarizerAgent("summarizer-agent", "summarizer", tracer)
agents = {
"planner-agent": planner,
"tool-agent": tool_agent,
"verifier-agent": verifier,
"summarizer-agent": summarizer
}
# Create router
router = MeshRouter(agents, tracer)
# Create user request with root span
with tracer.start_as_current_span("user-request") as root_span:
root_span.set_attribute("user.id", "user-123")
root_span.set_attribute("request.type", "search")
message = {
"request": "Find information about Python",
"conversation_id": "conv-abc-123",
"workflow_id": "workflow-xyz-789"
}
# Inject trace context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
message["traceparent"] = carrier.get("traceparent", "")
# Route through agents
result = router.route(message, [
"planner-agent",
"tool-agent",
"verifier-agent",
"summarizer-agent"
])
print(f"Result: {result}")
if __name__ == "__main__":
main()
OpenTelemetry Collector configuration
For local testing, use this minimal config:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
exporters:
jaeger:
endpoint: jaeger:14250
tls:
insecure: true
logging:
loglevel: debug
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger, logging]
Docker Compose for local testing
# docker-compose.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "6831:6831/udp" # Agent
- "14250:14250" # Collector
environment:
- COLLECTOR_OTLP_ENABLED=true
otel-collector:
image: otel/opentelemetry-collector:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
depends_on:
- jaeger
Expected output
Run the example:
docker-compose up -d
python examples/run_example.py
Open Jaeger UI at http://localhost:16686. You should see:
One trace with this structure:
Trace: trace-abc-123
├── Span: user-request (root, 5000ms)
│ ├── Span: agent.planner-agent.process (100ms)
│ ├── Span: agent.tool-agent.process (2000ms)
│ │ ├── Span: tool-agent.retry.1 (500ms, failed)
│ │ └── Span: tool-agent.retry.2 (1500ms, success)
│ │ └── Span: tool.search-api.call (1500ms)
│ ├── Span: agent.verifier-agent.process (200ms)
│ │ └── Span: tool.validation-check.call (200ms)
│ └── Span: agent.summarizer-agent.process (2700ms)
│ └── Span: tool.format-output.call (2700ms)
You can see:
- Total time: 5000ms
- Tool agent took 2000ms (with one retry)
- Retry spans show the failure and success
- Each agent and tool has its own span
What to record (without leaking data)
You need to be careful about what you record. Prompts can contain PII. Tool outputs can be sensitive.
Redaction rules
Create a redaction function:
# src/tracing/redaction.py
import re
from typing import Any, Dict
def redact_pii(text: str) -> str:
"""Redact PII from text."""
# Email
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
# Phone
text = re.sub(r'\b\d{3}-\d{3}-\d{4}\b', '[PHONE]', text)
# SSN
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
return text
def redact_attributes(attributes: Dict[str, Any]) -> Dict[str, Any]:
"""Redact sensitive attributes."""
redacted = {}
sensitive_keys = ['prompt', 'user_input', 'tool_output', 'api_key']
for key, value in attributes.items():
if key in sensitive_keys:
if isinstance(value, str):
redacted[key] = redact_pii(value)
else:
redacted[key] = '[REDACTED]'
else:
redacted[key] = value
return redacted
Storing prompts safely
Don’t store full prompts in spans. Store hashes or samples:
import hashlib
def safe_prompt_attribute(prompt: str, max_length: int = 100) -> Dict[str, str]:
"""Create safe prompt attributes."""
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16]
prompt_sample = prompt[:max_length] + "..." if len(prompt) > max_length else prompt
return {
"prompt.hash": prompt_hash,
"prompt.sample": redact_pii(prompt_sample),
"prompt.length": len(prompt)
}
Debug mode vs. production mode
Use environment variables to control verbosity:
import os
DEBUG_MODE = os.getenv("OTEL_DEBUG", "false").lower() == "true"
def set_prompt_attribute(span, prompt: str):
"""Set prompt attribute based on mode."""
if DEBUG_MODE:
span.set_attribute("prompt.full", redact_pii(prompt))
else:
attrs = safe_prompt_attribute(prompt)
for key, value in attrs.items():
span.set_attribute(key, value)
Golden signals for multi-agent systems
Beyond basic tracing, track these metrics:
Hop count distribution
How many agents does a request touch?
# src/metrics/hop_counter.py
from opentelemetry import metrics
meter = metrics.get_meter(__name__)
hop_counter = meter.create_counter(
"agent.hops.count",
description="Number of agent hops per request"
)
# In mesh router
hop_counter.add(len(route), {"workflow.id": workflow_id})
Tool error rate per tool type
Which tools are flaky?
tool_error_counter = meter.create_counter(
"tool.errors.count",
description="Tool errors by type"
)
# In tool call
if error:
tool_error_counter.add(1, {
"tool.type": tool_type,
"tool.name": tool_name,
"error.type": type(error).__name__
})
Retry loops and “agent ping-pong”
Detect when agents keep delegating to each other:
delegation_counter = meter.create_counter(
"agent.delegations.count",
description="Agent-to-agent delegations"
)
# Track delegation chain
delegation_chain = message.get("delegation_chain", [])
if len(delegation_chain) > 10:
# Alert: possible ping-pong
pass
Token/cost estimates per trace
Even if approximate, track costs:
token_counter = meter.create_counter(
"agent.tokens.used",
description="Tokens used per trace"
)
# After LLM call
token_counter.add(tokens_used, {
"agent.name": agent_name,
"model": model_name,
"trace_id": trace_id
})
A short checklist you can copy
If you do only 5 things, do these:
- Use W3C Trace Context: Pass
traceparentin all message envelopes between agents - Create spans for agents and tools: One span per agent step, one span per tool call
- Set standard attributes:
agent.name,agent.role,tool.name,tool.type - Redact sensitive data: Hash prompts, redact PII, use debug mode only in dev
- Export to a tracing backend: Jaeger, Tempo, or your APM tool
That’s it. These five things give you end-to-end visibility.
Production considerations
Sampling
In production, you don’t need to trace every request. Sample:
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
# Sample 10% of traces
sampler = TraceIdRatioBased(0.1)
provider = TracerProvider(sampler=sampler)
Backend choice
- Jaeger: Good for development, simple setup
- Tempo: Grafana integration, good for scale
- Datadog/New Relic: Commercial, includes APM features
- OTel Collector: Flexible, can export to multiple backends
Performance overhead
Tracing adds overhead. Measure it:
- Span creation: ~1-5 microseconds
- Span export: ~10-50 microseconds (batched)
- Total overhead: <1% for most workloads
If overhead is too high, reduce sampling or disable in hot paths.
Conclusion
Multi-agent systems are hard to debug. Scattered logs don’t help. You need end-to-end traces.
OpenTelemetry gives you that. One trace. One story. One view of what happened.
The key is context propagation. Pass traceparent between agents. Create spans for operations. Set standard attributes. That’s it.
You can answer “where did time go?” You can see retry loops. You can find flaky tools. You can debug like normal software.
Start with the five things. Add more as you need it. But start with tracing. It’s the foundation.
Discussion
Loading comments...