Event-Driven AI Agents: Harnessing Reactive Pipelines for Real-Time Autonomy
Traditional AI agents work like a restaurant kitchen where every order waits in line. You submit a request, wait for processing, and get a response. This synchronous model works fine for simple tasks, but breaks down when you need real-time responsiveness, scalability, or coordination between multiple agents.
Event-driven AI agents flip this model entirely. Instead of waiting for direct requests, they react to events as they happen. A market data event triggers a trading agent. A user action triggers a recommendation agent. A system alert triggers a monitoring agent. The agents become reactive, autonomous, and capable of near-real-time decision-making.
This isn’t just about speed—though that’s important. Event-driven architectures enable systems that can scale horizontally, handle unpredictable loads, and coordinate complex multi-agent workflows without centralized orchestration. They’re the foundation for truly autonomous AI systems that can operate in dynamic, real-world environments.
The Problem with Synchronous AI
Most AI systems today follow a request-response pattern. You send a prompt, the system processes it, and you get a response. This works for chatbots and simple applications, but creates several problems when building production AI systems.
First, it doesn’t scale. Every request ties up resources until completion. You can’t easily distribute load across multiple agents or handle sudden spikes in demand. The system becomes a bottleneck.
Second, it’s not reactive. Agents can’t respond to changing conditions or external events unless explicitly polled. They miss opportunities and can’t adapt to real-time situations.
Third, coordination is difficult. Getting multiple agents to work together requires complex orchestration logic. You end up with centralized controllers that become single points of failure.
Event-driven architectures solve these problems by making agents reactive to their environment rather than passive responders to requests.
Reactive Agent Principles
Event-driven AI agents follow a fundamentally different design pattern. Instead of the traditional “Input → Process → Output” cycle, they use a “Perception → Trigger → Action” loop that runs continuously.
The perception layer monitors for relevant events. This could be webhook notifications, database changes, message queue events, or signals from other agents. The key is that agents are always listening, not waiting to be called.
When an event matches the agent’s trigger conditions, it activates. The agent processes the event context, makes decisions, and takes actions. These actions might generate new events that other agents can react to, creating a distributed system of autonomous components.
This pattern enables several powerful capabilities:
Asynchronous Processing: Agents can handle multiple events concurrently without blocking each other. A single agent can process dozens of events simultaneously.
Loose Coupling: Agents don’t need to know about each other directly. They communicate through events, making the system more modular and easier to maintain.
Fault Tolerance: If one agent fails, others continue operating. Events can be retried, and failed processing doesn’t crash the entire system.
Scalability: You can add more agents or scale existing ones based on event volume. The system grows organically with demand.
Real-time Responsiveness: Agents react immediately to events rather than waiting for polling cycles or scheduled processing.
Designing an Event Bus for Agents
The event bus is the nervous system of an event-driven AI architecture. It needs to be fast, reliable, and capable of handling the complex routing requirements of AI agents.
For AI systems, the event bus must support several key features:
Event Schema Validation: AI agents generate and consume structured data. The bus needs to validate event schemas and ensure type safety.
Priority Queues: Some events are more urgent than others. A security alert should be processed before a routine status update.
Dead Letter Queues: Failed events need to be captured and retried. AI processing can fail for many reasons—rate limits, model errors, resource constraints.
Event Replay: Agents need to be able to replay events for debugging, training, or recovery scenarios.
Backpressure Handling: When agents can’t keep up with event volume, the system needs graceful degradation rather than crashes.
Here’s a practical implementation using FastAPI and Kafka:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
import asyncio
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import uuid
# Event schemas for AI agents
class AgentEvent(BaseModel):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str
source_agent: str
target_agents: List[str] = []
priority: int = Field(default=5, ge=1, le=10) # 1=highest, 10=lowest
payload: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
correlation_id: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
class AgentResponse(BaseModel):
response_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
original_event_id: str
agent_id: str
status: str # success, failure, partial
result: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
processing_time_ms: float
class EventBus:
def __init__(self, kafka_servers: List[str], topic_prefix: str = "ai_agents"):
self.kafka_servers = kafka_servers
self.topic_prefix = topic_prefix
# Initialize Kafka producer
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
retries=3,
acks='all', # Wait for all replicas to acknowledge
compression_type='gzip'
)
# Event routing rules
self.routing_rules = {}
self.agent_subscriptions = {}
# Dead letter queue for failed events
self.dlq_topic = f"{topic_prefix}_dlq"
def register_agent(self, agent_id: str, event_types: List[str],
priority: int = 5, max_concurrent: int = 10):
"""Register an agent to receive specific event types."""
self.agent_subscriptions[agent_id] = {
'event_types': event_types,
'priority': priority,
'max_concurrent': max_concurrent,
'active_tasks': 0
}
# Create topic for this agent if it doesn't exist
topic_name = f"{self.topic_prefix}_{agent_id}"
self.routing_rules[agent_id] = topic_name
async def publish_event(self, event: AgentEvent) -> str:
"""Publish an event to the bus."""
try:
# Validate event schema
event_dict = event.dict()
# Determine target topics based on routing rules
target_topics = []
for agent_id in event.target_agents:
if agent_id in self.routing_rules:
target_topics.append(self.routing_rules[agent_id])
# If no specific targets, broadcast to all subscribed agents
if not target_topics:
for agent_id, subscription in self.agent_subscriptions.items():
if event.event_type in subscription['event_types']:
target_topics.append(self.routing_rules[agent_id])
# Publish to all target topics
for topic in target_topics:
future = self.producer.send(
topic,
key=event.event_id,
value=event_dict
)
future.add_callback(self._on_send_success)
future.add_errback(self._on_send_error, event)
return event.event_id
except Exception as e:
# Send to dead letter queue
await self._send_to_dlq(event, str(e))
raise HTTPException(status_code=500, detail=f"Failed to publish event: {str(e)}")
def _on_send_success(self, record_metadata):
"""Callback for successful message send."""
print(f"Event published to {record_metadata.topic} at offset {record_metadata.offset}")
def _on_send_error(self, exception, event):
"""Callback for failed message send."""
print(f"Failed to publish event {event.event_id}: {exception}")
# Could implement retry logic here
async def _send_to_dlq(self, event: AgentEvent, error: str):
"""Send failed event to dead letter queue."""
dlq_event = event.dict()
dlq_event['error'] = error
dlq_event['dlq_timestamp'] = datetime.utcnow().isoformat()
try:
self.producer.send(
self.dlq_topic,
key=event.event_id,
value=dlq_event
)
except Exception as e:
print(f"Failed to send to DLQ: {e}")
async def consume_events(self, agent_id: str, callback):
"""Consume events for a specific agent."""
if agent_id not in self.routing_rules:
raise ValueError(f"Agent {agent_id} not registered")
topic = self.routing_rules[agent_id]
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.kafka_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id=f"agent_{agent_id}",
auto_offset_reset='latest',
enable_auto_commit=True,
max_poll_records=10 # Process in batches
)
try:
for message in consumer:
event_data = message.value
event = AgentEvent(**event_data)
# Check if agent can handle more concurrent tasks
subscription = self.agent_subscriptions.get(agent_id, {})
if subscription.get('active_tasks', 0) >= subscription.get('max_concurrent', 10):
print(f"Agent {agent_id} at capacity, skipping event {event.event_id}")
continue
# Process event asynchronously
asyncio.create_task(self._process_event(agent_id, event, callback))
except Exception as e:
print(f"Error consuming events for {agent_id}: {e}")
finally:
consumer.close()
async def _process_event(self, agent_id: str, event: AgentEvent, callback):
"""Process a single event with the agent callback."""
subscription = self.agent_subscriptions.get(agent_id, {})
subscription['active_tasks'] = subscription.get('active_tasks', 0) + 1
try:
start_time = datetime.utcnow()
result = await callback(event)
processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Create response
response = AgentResponse(
original_event_id=event.event_id,
agent_id=agent_id,
status="success",
result=result,
processing_time_ms=processing_time
)
# Publish response back to bus
response_event = AgentEvent(
event_type="agent_response",
source_agent=agent_id,
target_agents=[event.source_agent] if event.source_agent else [],
payload=response.dict(),
correlation_id=event.event_id
)
await self.publish_event(response_event)
except Exception as e:
print(f"Error processing event {event.event_id}: {e}")
# Implement retry logic
if event.retry_count < event.max_retries:
event.retry_count += 1
await asyncio.sleep(2 ** event.retry_count) # Exponential backoff
await self.publish_event(event)
else:
await self._send_to_dlq(event, str(e))
finally:
subscription['active_tasks'] = max(0, subscription['active_tasks'] - 1)
# FastAPI application
app = FastAPI(title="Event-Driven AI Agent Bus", version="1.0.0")
# Initialize event bus
event_bus = EventBus(
kafka_servers=["localhost:9092"],
topic_prefix="ai_agents"
)
@app.post("/events")
async def publish_event(event: AgentEvent):
"""Publish an event to the bus."""
event_id = await event_bus.publish_event(event)
return {"event_id": event_id, "status": "published"}
@app.post("/agents/{agent_id}/register")
async def register_agent(agent_id: str, event_types: List[str],
priority: int = 5, max_concurrent: int = 10):
"""Register an agent to receive events."""
event_bus.register_agent(agent_id, event_types, priority, max_concurrent)
return {"agent_id": agent_id, "status": "registered"}
@app.get("/agents/{agent_id}/status")
async def get_agent_status(agent_id: str):
"""Get current status of an agent."""
subscription = event_bus.agent_subscriptions.get(agent_id, {})
return {
"agent_id": agent_id,
"active_tasks": subscription.get('active_tasks', 0),
"max_concurrent": subscription.get('max_concurrent', 0),
"event_types": subscription.get('event_types', [])
}
This implementation provides a solid foundation for event-driven AI agents. The event bus handles routing, retries, dead letter queues, and backpressure management. Agents can register for specific event types and process them asynchronously.
Agent Subscription and Reaction Model
The subscription model determines how agents discover and react to relevant events. This is crucial for building scalable, maintainable event-driven systems.
Agents need flexible ways to express what events they care about. Simple type-based subscriptions work for basic cases, but AI agents often need more sophisticated filtering based on content, context, or patterns.
Here’s how to implement a powerful subscription and reaction system:
from typing import Callable, Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import re
import jsonpath_ng
class SubscriptionType(Enum):
EVENT_TYPE = "event_type"
CONTENT_FILTER = "content_filter"
JSON_PATH = "json_path"
REGEX = "regex"
COMPOSITE = "composite"
@dataclass
class SubscriptionFilter:
filter_type: SubscriptionType
pattern: str
case_sensitive: bool = False
negate: bool = False
class AgentSubscription:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.filters: List[SubscriptionFilter] = []
self.callback: Optional[Callable] = None
self.priority: int = 5
self.max_concurrent: int = 10
self.active_tasks: int = 0
def add_filter(self, filter_config: SubscriptionFilter):
"""Add a filter to the subscription."""
self.filters.append(filter_config)
def matches(self, event: AgentEvent) -> bool:
"""Check if an event matches this subscription."""
if not self.filters:
return True
for filter_config in self.filters:
if not self._evaluate_filter(filter_config, event):
if not filter_config.negate:
return False
elif filter_config.negate:
return False
return True
def _evaluate_filter(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Evaluate a single filter against an event."""
try:
if filter_config.filter_type == SubscriptionType.EVENT_TYPE:
return self._match_event_type(filter_config, event)
elif filter_config.filter_type == SubscriptionType.CONTENT_FILTER:
return self._match_content_filter(filter_config, event)
elif filter_config.filter_type == SubscriptionType.JSON_PATH:
return self._match_json_path(filter_config, event)
elif filter_config.filter_type == SubscriptionType.REGEX:
return self._match_regex(filter_config, event)
elif filter_config.filter_type == SubscriptionType.COMPOSITE:
return self._match_composite(filter_config, event)
except Exception as e:
print(f"Error evaluating filter {filter_config.pattern}: {e}")
return False
return False
def _match_event_type(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Match against event type."""
pattern = filter_config.pattern
if not filter_config.case_sensitive:
pattern = pattern.lower()
event_type = event.event_type.lower()
else:
event_type = event.event_type
return pattern == event_type or (filter_config.pattern.endswith('*') and
event_type.startswith(pattern[:-1]))
def _match_content_filter(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Match against content in the payload."""
content = json.dumps(event.payload, default=str)
if not filter_config.case_sensitive:
content = content.lower()
pattern = filter_config.pattern.lower()
else:
pattern = filter_config.pattern
return pattern in content
def _match_json_path(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Match using JSONPath expressions."""
jsonpath_expr = jsonpath_ng.parse(filter_config.pattern)
matches = jsonpath_expr.find(event.payload)
return len(matches) > 0
def _match_regex(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Match using regular expressions."""
content = json.dumps(event.payload, default=str)
flags = 0 if filter_config.case_sensitive else re.IGNORECASE
return bool(re.search(filter_config.pattern, content, flags))
def _match_composite(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
"""Match using composite logic (AND/OR operations)."""
# This would parse complex expressions like "(type=alert AND priority>5) OR source=monitoring"
# Implementation depends on your specific requirements
return True
class SubscriptionManager:
def __init__(self):
self.subscriptions: Dict[str, AgentSubscription] = {}
self.event_routing: Dict[str, List[str]] = {}
def register_agent(self, agent_id: str, callback: Callable,
priority: int = 5, max_concurrent: int = 10) -> AgentSubscription:
"""Register a new agent subscription."""
subscription = AgentSubscription(agent_id)
subscription.callback = callback
subscription.priority = priority
subscription.max_concurrent = max_concurrent
self.subscriptions[agent_id] = subscription
return subscription
def add_filter(self, agent_id: str, filter_config: SubscriptionFilter):
"""Add a filter to an agent's subscription."""
if agent_id in self.subscriptions:
self.subscriptions[agent_id].add_filter(filter_config)
self._rebuild_routing()
def _rebuild_routing(self):
"""Rebuild the event routing table based on current subscriptions."""
self.event_routing = {}
for agent_id, subscription in self.subscriptions.items():
# This is a simplified routing - in practice, you'd want more sophisticated indexing
for filter_config in subscription.filters:
if filter_config.filter_type == SubscriptionType.EVENT_TYPE:
event_type = filter_config.pattern
if event_type not in self.event_routing:
self.event_routing[event_type] = []
if agent_id not in self.event_routing[event_type]:
self.event_routing[event_type].append(agent_id)
def get_subscribers(self, event: AgentEvent) -> List[str]:
"""Get all agents that should receive this event."""
subscribers = []
for agent_id, subscription in self.subscriptions.items():
if subscription.matches(event):
subscribers.append(agent_id)
return subscribers
async def route_event(self, event: AgentEvent) -> List[str]:
"""Route an event to all matching subscribers."""
subscribers = self.get_subscribers(event)
processed_agents = []
for agent_id in subscribers:
subscription = self.subscriptions[agent_id]
# Check concurrency limits
if subscription.active_tasks >= subscription.max_concurrent:
print(f"Agent {agent_id} at capacity, skipping event {event.event_id}")
continue
# Process event asynchronously
asyncio.create_task(self._process_with_agent(agent_id, event))
processed_agents.append(agent_id)
return processed_agents
async def _process_with_agent(self, agent_id: str, event: AgentEvent):
"""Process an event with a specific agent."""
subscription = self.subscriptions[agent_id]
subscription.active_tasks += 1
try:
if subscription.callback:
await subscription.callback(event)
except Exception as e:
print(f"Error processing event {event.event_id} with agent {agent_id}: {e}")
finally:
subscription.active_tasks = max(0, subscription.active_tasks - 1)
# Example: Trading Agent with Complex Subscriptions
class TradingAgent:
def __init__(self, agent_id: str, subscription_manager: SubscriptionManager):
self.agent_id = agent_id
self.subscription_manager = subscription_manager
self.positions = {}
self.setup_subscriptions()
def setup_subscriptions(self):
"""Set up event subscriptions for the trading agent."""
subscription = self.subscription_manager.register_agent(
self.agent_id,
self.handle_event,
priority=3, # High priority for trading
max_concurrent=5
)
# Subscribe to market data events
subscription.add_filter(SubscriptionFilter(
filter_type=SubscriptionType.EVENT_TYPE,
pattern="market_data"
))
# Subscribe to high-priority alerts
subscription.add_filter(SubscriptionFilter(
filter_type=SubscriptionType.JSON_PATH,
pattern="$.priority[?(@<3)]"
))
# Subscribe to events for specific symbols
subscription.add_filter(SubscriptionFilter(
filter_type=SubscriptionType.JSON_PATH,
pattern="$.payload.symbol[?(@=~/^AAPL|MSFT|GOOGL$/)]"
))
# Subscribe to events containing "volatility" in the content
subscription.add_filter(SubscriptionFilter(
filter_type=SubscriptionType.CONTENT_FILTER,
pattern="volatility",
case_sensitive=False
))
async def handle_event(self, event: AgentEvent):
"""Handle incoming events for the trading agent."""
print(f"Trading agent {self.agent_id} processing event: {event.event_type}")
if event.event_type == "market_data":
await self.process_market_data(event.payload)
elif event.event_type == "alert":
await self.process_alert(event.payload)
elif event.event_type == "order_update":
await self.process_order_update(event.payload)
async def process_market_data(self, payload: Dict[str, Any]):
"""Process market data and make trading decisions."""
symbol = payload.get('symbol')
price = payload.get('price')
volume = payload.get('volume')
if not all([symbol, price, volume]):
return
# Simple trading logic based on volume spikes
if volume > payload.get('avg_volume', 0) * 1.5:
print(f"High volume detected for {symbol}: {volume}")
# Trigger buy/sell decision based on price movement
await self.evaluate_position(symbol, price, volume)
async def process_alert(self, payload: Dict[str, Any]):
"""Process high-priority alerts."""
alert_type = payload.get('type')
message = payload.get('message')
print(f"Alert received: {alert_type} - {message}")
if alert_type == "risk_limit_exceeded":
await self.close_all_positions()
elif alert_type == "market_halt":
await self.pause_trading()
async def evaluate_position(self, symbol: str, price: float, volume: int):
"""Evaluate whether to open, close, or adjust a position."""
# This would contain your actual trading logic
print(f"Evaluating position for {symbol} at {price} with volume {volume}")
async def close_all_positions(self):
"""Close all open positions."""
print("Closing all positions due to risk limit")
async def pause_trading(self):
"""Pause all trading activities."""
print("Pausing trading due to market halt")
This subscription model provides powerful filtering capabilities that allow agents to express exactly what events they care about. The trading agent example shows how complex business logic can be built on top of simple event subscriptions.
Multi-Agent Event Collaboration
Real-world AI systems rarely involve single agents. Most applications require multiple agents working together, each with specialized capabilities. Event-driven architectures excel at enabling this kind of loose coupling and emergent behavior.
The key to successful multi-agent collaboration is designing clear event contracts and communication patterns. Agents need to know what events to expect, what format they’ll be in, and how to handle failures gracefully.
Here’s how to implement robust multi-agent collaboration:
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import asyncio
from datetime import datetime, timedelta
import uuid
class CollaborationPattern(Enum):
REQUEST_RESPONSE = "request_response"
PUBLISH_SUBSCRIBE = "publish_subscribe"
EVENT_CHOREOGRAPHY = "event_choreography"
SAGA_PATTERN = "saga_pattern"
WORKFLOW_ORCHESTRATION = "workflow_orchestration"
@dataclass
class AgentCapability:
agent_id: str
capability_type: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
max_concurrent: int = 5
timeout_seconds: int = 30
class MultiAgentCoordinator:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.agent_capabilities: Dict[str, AgentCapability] = {}
self.active_collaborations: Dict[str, Dict] = {}
self.event_handlers: Dict[str, Callable] = {}
def register_agent_capability(self, capability: AgentCapability):
"""Register an agent's capabilities."""
self.agent_capabilities[capability.agent_id] = capability
# Set up event handlers for this agent
self.event_handlers[capability.agent_id] = self._create_agent_handler(capability)
def _create_agent_handler(self, capability: AgentCapability) -> Callable:
"""Create an event handler for an agent."""
async def handler(event: AgentEvent):
try:
# Validate input schema
if not self._validate_schema(event.payload, capability.input_schema):
await self._send_error_response(event, "Invalid input schema")
return
# Process the request
result = await self._process_agent_request(capability, event)
# Send response
response_event = AgentEvent(
event_type="agent_response",
source_agent=capability.agent_id,
target_agents=[event.source_agent] if event.source_agent else [],
payload={
"request_id": event.event_id,
"result": result,
"status": "success"
},
correlation_id=event.event_id
)
await self.event_bus.publish_event(response_event)
except Exception as e:
await self._send_error_response(event, str(e))
return handler
def _validate_schema(self, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool:
"""Validate payload against schema (simplified implementation)."""
# In practice, you'd use a proper JSON schema validator
required_fields = schema.get('required', [])
return all(field in payload for field in required_fields)
async def _process_agent_request(self, capability: AgentCapability, event: AgentEvent) -> Dict[str, Any]:
"""Process a request with an agent (placeholder implementation)."""
# This would contain the actual agent processing logic
return {"processed": True, "agent_id": capability.agent_id}
async def _send_error_response(self, event: AgentEvent, error_message: str):
"""Send an error response back to the requesting agent."""
error_event = AgentEvent(
event_type="agent_error",
source_agent="coordinator",
target_agents=[event.source_agent] if event.source_agent else [],
payload={
"request_id": event.event_id,
"error": error_message,
"status": "error"
},
correlation_id=event.event_id
)
await self.event_bus.publish_event(error_event)
async def initiate_collaboration(self, pattern: CollaborationPattern,
participants: List[str],
initial_payload: Dict[str, Any]) -> str:
"""Initiate a multi-agent collaboration."""
collaboration_id = str(uuid.uuid4())
collaboration = {
"id": collaboration_id,
"pattern": pattern,
"participants": participants,
"status": "active",
"start_time": datetime.utcnow(),
"results": {},
"errors": []
}
self.active_collaborations[collaboration_id] = collaboration
if pattern == CollaborationPattern.REQUEST_RESPONSE:
await self._handle_request_response(collaboration_id, participants, initial_payload)
elif pattern == CollaborationPattern.EVENT_CHOREOGRAPHY:
await self._handle_event_choreography(collaboration_id, participants, initial_payload)
elif pattern == CollaborationPattern.SAGA_PATTERN:
await self._handle_saga_pattern(collaboration_id, participants, initial_payload)
return collaboration_id
async def _handle_request_response(self, collaboration_id: str,
participants: List[str],
payload: Dict[str, Any]):
"""Handle request-response collaboration pattern."""
if not participants:
return
# Send request to first participant
primary_agent = participants[0]
request_event = AgentEvent(
event_type="collaboration_request",
source_agent="coordinator",
target_agents=[primary_agent],
payload={
"collaboration_id": collaboration_id,
"participants": participants,
"data": payload
}
)
await self.event_bus.publish_event(request_event)
async def _handle_event_choreography(self, collaboration_id: str,
participants: List[str],
payload: Dict[str, Any]):
"""Handle event choreography pattern."""
# In event choreography, agents react to events without central coordination
# Each agent subscribes to relevant events and acts independently
for agent_id in participants:
choreography_event = AgentEvent(
event_type="choreography_start",
source_agent="coordinator",
target_agents=[agent_id],
payload={
"collaboration_id": collaboration_id,
"participants": participants,
"data": payload
}
)
await self.event_bus.publish_event(choreography_event)
async def _handle_saga_pattern(self, collaboration_id: str,
participants: List[str],
payload: Dict[str, Any]):
"""Handle saga pattern for distributed transactions."""
saga_steps = []
# Define saga steps based on participants
for i, agent_id in enumerate(participants):
step = {
"step_id": f"step_{i}",
"agent_id": agent_id,
"action": "process",
"compensating_action": "rollback",
"status": "pending"
}
saga_steps.append(step)
collaboration = self.active_collaborations[collaboration_id]
collaboration["saga_steps"] = saga_steps
# Start the saga
await self._execute_saga_step(collaboration_id, 0)
async def _execute_saga_step(self, collaboration_id: str, step_index: int):
"""Execute a single step in a saga."""
collaboration = self.active_collaborations.get(collaboration_id)
if not collaboration or step_index >= len(collaboration["saga_steps"]):
return
step = collaboration["saga_steps"][step_index]
agent_id = step["agent_id"]
# Send request to agent
request_event = AgentEvent(
event_type="saga_step",
source_agent="coordinator",
target_agents=[agent_id],
payload={
"collaboration_id": collaboration_id,
"step_id": step["step_id"],
"action": step["action"],
"data": collaboration.get("data", {})
}
)
await self.event_bus.publish_event(request_event)
async def handle_collaboration_response(self, event: AgentEvent):
"""Handle responses from agents in a collaboration."""
payload = event.payload
collaboration_id = payload.get("collaboration_id")
if not collaboration_id or collaboration_id not in self.active_collaborations:
return
collaboration = self.active_collaborations[collaboration_id]
if event.event_type == "agent_response":
await self._handle_agent_success(collaboration, event)
elif event.event_type == "agent_error":
await self._handle_agent_error(collaboration, event)
async def _handle_agent_success(self, collaboration: Dict, event: AgentEvent):
"""Handle successful agent response."""
payload = event.payload
collaboration["results"][event.source_agent] = payload.get("result", {})
if collaboration["pattern"] == CollaborationPattern.SAGA_PATTERN:
await self._handle_saga_success(collaboration, event)
else:
# For other patterns, check if collaboration is complete
if len(collaboration["results"]) >= len(collaboration["participants"]):
await self._complete_collaboration(collaboration["id"])
async def _handle_saga_success(self, collaboration: Dict, event: AgentEvent):
"""Handle successful saga step."""
step_id = event.payload.get("step_id")
saga_steps = collaboration.get("saga_steps", [])
# Mark current step as completed
for step in saga_steps:
if step["step_id"] == step_id:
step["status"] = "completed"
break
# Execute next step
next_step_index = self._find_next_pending_step(saga_steps)
if next_step_index is not None:
await self._execute_saga_step(collaboration["id"], next_step_index)
else:
# All steps completed
await self._complete_collaboration(collaboration["id"])
async def _handle_agent_error(self, collaboration: Dict, event: AgentEvent):
"""Handle agent error in collaboration."""
collaboration["errors"].append({
"agent_id": event.source_agent,
"error": event.payload.get("error", "Unknown error"),
"timestamp": datetime.utcnow()
})
if collaboration["pattern"] == CollaborationPattern.SAGA_PATTERN:
await self._handle_saga_error(collaboration, event)
else:
# For other patterns, mark collaboration as failed
collaboration["status"] = "failed"
await self._notify_collaboration_complete(collaboration)
async def _handle_saga_error(self, collaboration: Dict, event: AgentEvent):
"""Handle error in saga pattern - initiate compensation."""
print(f"Saga error in collaboration {collaboration['id']}, initiating compensation")
# Find the step that failed and compensate all previous steps
failed_step_id = event.payload.get("step_id")
saga_steps = collaboration.get("saga_steps", [])
# Execute compensating actions in reverse order
for step in reversed(saga_steps):
if step["status"] == "completed":
await self._execute_compensating_action(collaboration["id"], step)
elif step["step_id"] == failed_step_id:
break
collaboration["status"] = "compensated"
await self._notify_collaboration_complete(collaboration)
async def _execute_compensating_action(self, collaboration_id: str, step: Dict):
"""Execute compensating action for a saga step."""
agent_id = step["agent_id"]
compensate_event = AgentEvent(
event_type="saga_compensate",
source_agent="coordinator",
target_agents=[agent_id],
payload={
"collaboration_id": collaboration_id,
"step_id": step["step_id"],
"action": step["compensating_action"]
}
)
await self.event_bus.publish_event(compensate_event)
def _find_next_pending_step(self, saga_steps: List[Dict]) -> Optional[int]:
"""Find the next pending step in a saga."""
for i, step in enumerate(saga_steps):
if step["status"] == "pending":
return i
return None
async def _complete_collaboration(self, collaboration_id: str):
"""Mark collaboration as completed."""
collaboration = self.active_collaborations[collaboration_id]
collaboration["status"] = "completed"
collaboration["end_time"] = datetime.utcnow()
await self._notify_collaboration_complete(collaboration)
async def _notify_collaboration_complete(self, collaboration: Dict):
"""Notify all participants that collaboration is complete."""
completion_event = AgentEvent(
event_type="collaboration_complete",
source_agent="coordinator",
target_agents=collaboration["participants"],
payload={
"collaboration_id": collaboration["id"],
"status": collaboration["status"],
"results": collaboration["results"],
"errors": collaboration["errors"]
}
)
await self.event_bus.publish_event(completion_event)
# Clean up
del self.active_collaborations[collaboration["id"]]
# Example: E-commerce Order Processing with Multiple Agents
class EcommerceOrderProcessor:
def __init__(self, coordinator: MultiAgentCoordinator):
self.coordinator = coordinator
self.setup_agents()
def setup_agents(self):
"""Set up the various agents needed for order processing."""
# Inventory Agent
inventory_capability = AgentCapability(
agent_id="inventory_agent",
capability_type="inventory_management",
input_schema={
"type": "object",
"required": ["product_id", "quantity"],
"properties": {
"product_id": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1}
}
},
output_schema={
"type": "object",
"properties": {
"available": {"type": "boolean"},
"reserved_quantity": {"type": "integer"}
}
}
)
# Payment Agent
payment_capability = AgentCapability(
agent_id="payment_agent",
capability_type="payment_processing",
input_schema={
"type": "object",
"required": ["amount", "payment_method"],
"properties": {
"amount": {"type": "number"},
"payment_method": {"type": "string"}
}
},
output_schema={
"type": "object",
"properties": {
"transaction_id": {"type": "string"},
"status": {"type": "string"}
}
}
)
# Shipping Agent
shipping_capability = AgentCapability(
agent_id="shipping_agent",
capability_type="shipping_management",
input_schema={
"type": "object",
"required": ["order_id", "shipping_address"],
"properties": {
"order_id": {"type": "string"},
"shipping_address": {"type": "object"}
}
},
output_schema={
"type": "object",
"properties": {
"tracking_number": {"type": "string"},
"estimated_delivery": {"type": "string"}
}
}
)
# Register all agents
self.coordinator.register_agent_capability(inventory_capability)
self.coordinator.register_agent_capability(payment_capability)
self.coordinator.register_agent_capability(shipping_capability)
async def process_order(self, order_data: Dict[str, Any]) -> str:
"""Process an e-commerce order using multiple agents."""
# Use saga pattern for distributed transaction
participants = ["inventory_agent", "payment_agent", "shipping_agent"]
collaboration_id = await self.coordinator.initiate_collaboration(
pattern=CollaborationPattern.SAGA_PATTERN,
participants=participants,
initial_payload=order_data
)
return collaboration_id
This multi-agent collaboration system provides several powerful patterns:
Request-Response: Simple one-to-one communication between agents.
Event Choreography: Agents react to events independently without central coordination.
Saga Pattern: Distributed transactions with compensation logic for handling failures.
Workflow Orchestration: Central coordination of complex multi-step processes.
The e-commerce example shows how these patterns work in practice, with different agents handling inventory, payment, and shipping while maintaining data consistency through the saga pattern.
Conclusion and Future Patterns
Event-driven AI agents represent a fundamental shift in how we build intelligent systems. Instead of centralized, synchronous architectures, we get distributed, reactive systems that can scale and adapt to real-world conditions.
The technology is maturing rapidly. Message brokers like Kafka and NATS are becoming more sophisticated. Event streaming platforms like Apache Pulsar provide advanced features like geo-replication and multi-tenancy. Cloud providers are offering managed event services that handle the complexity of distributed systems.
We’re also seeing the emergence of specialized frameworks for event-driven AI. LangChain’s event system allows agents to react to external events. AutoGen supports multi-agent conversations with event-driven coordination. Custom frameworks like the one we built provide more control but require more implementation work.
The future belongs to hybrid architectures that combine event-driven reactivity with cognitive control. Agents will react to events in real-time while also maintaining long-term goals and strategic thinking. We’ll see systems that can switch between reactive and proactive modes based on context and requirements.
Event-driven AI agents are just the beginning. As the technology matures, we’ll see even more sophisticated patterns emerge. The key is starting simple and building complexity gradually. The patterns and code samples in this article provide a solid foundation for building your own event-driven AI systems.
The question isn’t whether to adopt event-driven architectures—it’s how quickly you can start building systems that think and react as dynamically as the world around them.
Discussion
Loading comments...