By Appropri8 Team

Event-Driven AI Agents: Harnessing Reactive Pipelines for Real-Time Autonomy

aimachine-learningarchitectureevent-drivenmicroservicesreal-time

Traditional AI agents work like a restaurant kitchen where every order waits in line. You submit a request, wait for processing, and get a response. This synchronous model works fine for simple tasks, but breaks down when you need real-time responsiveness, scalability, or coordination between multiple agents.

Event-driven AI agents flip this model entirely. Instead of waiting for direct requests, they react to events as they happen. A market data event triggers a trading agent. A user action triggers a recommendation agent. A system alert triggers a monitoring agent. The agents become reactive, autonomous, and capable of near-real-time decision-making.

This isn’t just about speed—though that’s important. Event-driven architectures enable systems that can scale horizontally, handle unpredictable loads, and coordinate complex multi-agent workflows without centralized orchestration. They’re the foundation for truly autonomous AI systems that can operate in dynamic, real-world environments.

The Problem with Synchronous AI

Most AI systems today follow a request-response pattern. You send a prompt, the system processes it, and you get a response. This works for chatbots and simple applications, but creates several problems when building production AI systems.

First, it doesn’t scale. Every request ties up resources until completion. You can’t easily distribute load across multiple agents or handle sudden spikes in demand. The system becomes a bottleneck.

Second, it’s not reactive. Agents can’t respond to changing conditions or external events unless explicitly polled. They miss opportunities and can’t adapt to real-time situations.

Third, coordination is difficult. Getting multiple agents to work together requires complex orchestration logic. You end up with centralized controllers that become single points of failure.

Event-driven architectures solve these problems by making agents reactive to their environment rather than passive responders to requests.

Reactive Agent Principles

Event-driven AI agents follow a fundamentally different design pattern. Instead of the traditional “Input → Process → Output” cycle, they use a “Perception → Trigger → Action” loop that runs continuously.

The perception layer monitors for relevant events. This could be webhook notifications, database changes, message queue events, or signals from other agents. The key is that agents are always listening, not waiting to be called.

When an event matches the agent’s trigger conditions, it activates. The agent processes the event context, makes decisions, and takes actions. These actions might generate new events that other agents can react to, creating a distributed system of autonomous components.

This pattern enables several powerful capabilities:

Asynchronous Processing: Agents can handle multiple events concurrently without blocking each other. A single agent can process dozens of events simultaneously.

Loose Coupling: Agents don’t need to know about each other directly. They communicate through events, making the system more modular and easier to maintain.

Fault Tolerance: If one agent fails, others continue operating. Events can be retried, and failed processing doesn’t crash the entire system.

Scalability: You can add more agents or scale existing ones based on event volume. The system grows organically with demand.

Real-time Responsiveness: Agents react immediately to events rather than waiting for polling cycles or scheduled processing.

Designing an Event Bus for Agents

The event bus is the nervous system of an event-driven AI architecture. It needs to be fast, reliable, and capable of handling the complex routing requirements of AI agents.

For AI systems, the event bus must support several key features:

Event Schema Validation: AI agents generate and consume structured data. The bus needs to validate event schemas and ensure type safety.

Priority Queues: Some events are more urgent than others. A security alert should be processed before a routine status update.

Dead Letter Queues: Failed events need to be captured and retried. AI processing can fail for many reasons—rate limits, model errors, resource constraints.

Event Replay: Agents need to be able to replay events for debugging, training, or recovery scenarios.

Backpressure Handling: When agents can’t keep up with event volume, the system needs graceful degradation rather than crashes.

Here’s a practical implementation using FastAPI and Kafka:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
import asyncio
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import uuid

# Event schemas for AI agents
class AgentEvent(BaseModel):
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str
    source_agent: str
    target_agents: List[str] = []
    priority: int = Field(default=5, ge=1, le=10)  # 1=highest, 10=lowest
    payload: Dict[str, Any]
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    correlation_id: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3

class AgentResponse(BaseModel):
    response_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    original_event_id: str
    agent_id: str
    status: str  # success, failure, partial
    result: Dict[str, Any]
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    processing_time_ms: float

class EventBus:
    def __init__(self, kafka_servers: List[str], topic_prefix: str = "ai_agents"):
        self.kafka_servers = kafka_servers
        self.topic_prefix = topic_prefix
        
        # Initialize Kafka producer
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_servers,
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            retries=3,
            acks='all',  # Wait for all replicas to acknowledge
            compression_type='gzip'
        )
        
        # Event routing rules
        self.routing_rules = {}
        self.agent_subscriptions = {}
        
        # Dead letter queue for failed events
        self.dlq_topic = f"{topic_prefix}_dlq"
        
    def register_agent(self, agent_id: str, event_types: List[str], 
                      priority: int = 5, max_concurrent: int = 10):
        """Register an agent to receive specific event types."""
        self.agent_subscriptions[agent_id] = {
            'event_types': event_types,
            'priority': priority,
            'max_concurrent': max_concurrent,
            'active_tasks': 0
        }
        
        # Create topic for this agent if it doesn't exist
        topic_name = f"{self.topic_prefix}_{agent_id}"
        self.routing_rules[agent_id] = topic_name
        
    async def publish_event(self, event: AgentEvent) -> str:
        """Publish an event to the bus."""
        try:
            # Validate event schema
            event_dict = event.dict()
            
            # Determine target topics based on routing rules
            target_topics = []
            for agent_id in event.target_agents:
                if agent_id in self.routing_rules:
                    target_topics.append(self.routing_rules[agent_id])
            
            # If no specific targets, broadcast to all subscribed agents
            if not target_topics:
                for agent_id, subscription in self.agent_subscriptions.items():
                    if event.event_type in subscription['event_types']:
                        target_topics.append(self.routing_rules[agent_id])
            
            # Publish to all target topics
            for topic in target_topics:
                future = self.producer.send(
                    topic,
                    key=event.event_id,
                    value=event_dict
                )
                future.add_callback(self._on_send_success)
                future.add_errback(self._on_send_error, event)
            
            return event.event_id
            
        except Exception as e:
            # Send to dead letter queue
            await self._send_to_dlq(event, str(e))
            raise HTTPException(status_code=500, detail=f"Failed to publish event: {str(e)}")
    
    def _on_send_success(self, record_metadata):
        """Callback for successful message send."""
        print(f"Event published to {record_metadata.topic} at offset {record_metadata.offset}")
    
    def _on_send_error(self, exception, event):
        """Callback for failed message send."""
        print(f"Failed to publish event {event.event_id}: {exception}")
        # Could implement retry logic here
    
    async def _send_to_dlq(self, event: AgentEvent, error: str):
        """Send failed event to dead letter queue."""
        dlq_event = event.dict()
        dlq_event['error'] = error
        dlq_event['dlq_timestamp'] = datetime.utcnow().isoformat()
        
        try:
            self.producer.send(
                self.dlq_topic,
                key=event.event_id,
                value=dlq_event
            )
        except Exception as e:
            print(f"Failed to send to DLQ: {e}")
    
    async def consume_events(self, agent_id: str, callback):
        """Consume events for a specific agent."""
        if agent_id not in self.routing_rules:
            raise ValueError(f"Agent {agent_id} not registered")
        
        topic = self.routing_rules[agent_id]
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=self.kafka_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id=f"agent_{agent_id}",
            auto_offset_reset='latest',
            enable_auto_commit=True,
            max_poll_records=10  # Process in batches
        )
        
        try:
            for message in consumer:
                event_data = message.value
                event = AgentEvent(**event_data)
                
                # Check if agent can handle more concurrent tasks
                subscription = self.agent_subscriptions.get(agent_id, {})
                if subscription.get('active_tasks', 0) >= subscription.get('max_concurrent', 10):
                    print(f"Agent {agent_id} at capacity, skipping event {event.event_id}")
                    continue
                
                # Process event asynchronously
                asyncio.create_task(self._process_event(agent_id, event, callback))
                
        except Exception as e:
            print(f"Error consuming events for {agent_id}: {e}")
        finally:
            consumer.close()
    
    async def _process_event(self, agent_id: str, event: AgentEvent, callback):
        """Process a single event with the agent callback."""
        subscription = self.agent_subscriptions.get(agent_id, {})
        subscription['active_tasks'] = subscription.get('active_tasks', 0) + 1
        
        try:
            start_time = datetime.utcnow()
            result = await callback(event)
            processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
            
            # Create response
            response = AgentResponse(
                original_event_id=event.event_id,
                agent_id=agent_id,
                status="success",
                result=result,
                processing_time_ms=processing_time
            )
            
            # Publish response back to bus
            response_event = AgentEvent(
                event_type="agent_response",
                source_agent=agent_id,
                target_agents=[event.source_agent] if event.source_agent else [],
                payload=response.dict(),
                correlation_id=event.event_id
            )
            
            await self.publish_event(response_event)
            
        except Exception as e:
            print(f"Error processing event {event.event_id}: {e}")
            
            # Implement retry logic
            if event.retry_count < event.max_retries:
                event.retry_count += 1
                await asyncio.sleep(2 ** event.retry_count)  # Exponential backoff
                await self.publish_event(event)
            else:
                await self._send_to_dlq(event, str(e))
        
        finally:
            subscription['active_tasks'] = max(0, subscription['active_tasks'] - 1)

# FastAPI application
app = FastAPI(title="Event-Driven AI Agent Bus", version="1.0.0")

# Initialize event bus
event_bus = EventBus(
    kafka_servers=["localhost:9092"],
    topic_prefix="ai_agents"
)

@app.post("/events")
async def publish_event(event: AgentEvent):
    """Publish an event to the bus."""
    event_id = await event_bus.publish_event(event)
    return {"event_id": event_id, "status": "published"}

@app.post("/agents/{agent_id}/register")
async def register_agent(agent_id: str, event_types: List[str], 
                        priority: int = 5, max_concurrent: int = 10):
    """Register an agent to receive events."""
    event_bus.register_agent(agent_id, event_types, priority, max_concurrent)
    return {"agent_id": agent_id, "status": "registered"}

@app.get("/agents/{agent_id}/status")
async def get_agent_status(agent_id: str):
    """Get current status of an agent."""
    subscription = event_bus.agent_subscriptions.get(agent_id, {})
    return {
        "agent_id": agent_id,
        "active_tasks": subscription.get('active_tasks', 0),
        "max_concurrent": subscription.get('max_concurrent', 0),
        "event_types": subscription.get('event_types', [])
    }

This implementation provides a solid foundation for event-driven AI agents. The event bus handles routing, retries, dead letter queues, and backpressure management. Agents can register for specific event types and process them asynchronously.

Agent Subscription and Reaction Model

The subscription model determines how agents discover and react to relevant events. This is crucial for building scalable, maintainable event-driven systems.

Agents need flexible ways to express what events they care about. Simple type-based subscriptions work for basic cases, but AI agents often need more sophisticated filtering based on content, context, or patterns.

Here’s how to implement a powerful subscription and reaction system:

from typing import Callable, Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import re
import jsonpath_ng

class SubscriptionType(Enum):
    EVENT_TYPE = "event_type"
    CONTENT_FILTER = "content_filter"
    JSON_PATH = "json_path"
    REGEX = "regex"
    COMPOSITE = "composite"

@dataclass
class SubscriptionFilter:
    filter_type: SubscriptionType
    pattern: str
    case_sensitive: bool = False
    negate: bool = False

class AgentSubscription:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.filters: List[SubscriptionFilter] = []
        self.callback: Optional[Callable] = None
        self.priority: int = 5
        self.max_concurrent: int = 10
        self.active_tasks: int = 0
        
    def add_filter(self, filter_config: SubscriptionFilter):
        """Add a filter to the subscription."""
        self.filters.append(filter_config)
    
    def matches(self, event: AgentEvent) -> bool:
        """Check if an event matches this subscription."""
        if not self.filters:
            return True
        
        for filter_config in self.filters:
            if not self._evaluate_filter(filter_config, event):
                if not filter_config.negate:
                    return False
            elif filter_config.negate:
                return False
        
        return True
    
    def _evaluate_filter(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Evaluate a single filter against an event."""
        try:
            if filter_config.filter_type == SubscriptionType.EVENT_TYPE:
                return self._match_event_type(filter_config, event)
            elif filter_config.filter_type == SubscriptionType.CONTENT_FILTER:
                return self._match_content_filter(filter_config, event)
            elif filter_config.filter_type == SubscriptionType.JSON_PATH:
                return self._match_json_path(filter_config, event)
            elif filter_config.filter_type == SubscriptionType.REGEX:
                return self._match_regex(filter_config, event)
            elif filter_config.filter_type == SubscriptionType.COMPOSITE:
                return self._match_composite(filter_config, event)
        except Exception as e:
            print(f"Error evaluating filter {filter_config.pattern}: {e}")
            return False
        
        return False
    
    def _match_event_type(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Match against event type."""
        pattern = filter_config.pattern
        if not filter_config.case_sensitive:
            pattern = pattern.lower()
            event_type = event.event_type.lower()
        else:
            event_type = event.event_type
        
        return pattern == event_type or (filter_config.pattern.endswith('*') and 
                                       event_type.startswith(pattern[:-1]))
    
    def _match_content_filter(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Match against content in the payload."""
        content = json.dumps(event.payload, default=str)
        if not filter_config.case_sensitive:
            content = content.lower()
            pattern = filter_config.pattern.lower()
        else:
            pattern = filter_config.pattern
        
        return pattern in content
    
    def _match_json_path(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Match using JSONPath expressions."""
        jsonpath_expr = jsonpath_ng.parse(filter_config.pattern)
        matches = jsonpath_expr.find(event.payload)
        return len(matches) > 0
    
    def _match_regex(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Match using regular expressions."""
        content = json.dumps(event.payload, default=str)
        flags = 0 if filter_config.case_sensitive else re.IGNORECASE
        return bool(re.search(filter_config.pattern, content, flags))
    
    def _match_composite(self, filter_config: SubscriptionFilter, event: AgentEvent) -> bool:
        """Match using composite logic (AND/OR operations)."""
        # This would parse complex expressions like "(type=alert AND priority>5) OR source=monitoring"
        # Implementation depends on your specific requirements
        return True

class SubscriptionManager:
    def __init__(self):
        self.subscriptions: Dict[str, AgentSubscription] = {}
        self.event_routing: Dict[str, List[str]] = {}
    
    def register_agent(self, agent_id: str, callback: Callable, 
                      priority: int = 5, max_concurrent: int = 10) -> AgentSubscription:
        """Register a new agent subscription."""
        subscription = AgentSubscription(agent_id)
        subscription.callback = callback
        subscription.priority = priority
        subscription.max_concurrent = max_concurrent
        
        self.subscriptions[agent_id] = subscription
        return subscription
    
    def add_filter(self, agent_id: str, filter_config: SubscriptionFilter):
        """Add a filter to an agent's subscription."""
        if agent_id in self.subscriptions:
            self.subscriptions[agent_id].add_filter(filter_config)
            self._rebuild_routing()
    
    def _rebuild_routing(self):
        """Rebuild the event routing table based on current subscriptions."""
        self.event_routing = {}
        for agent_id, subscription in self.subscriptions.items():
            # This is a simplified routing - in practice, you'd want more sophisticated indexing
            for filter_config in subscription.filters:
                if filter_config.filter_type == SubscriptionType.EVENT_TYPE:
                    event_type = filter_config.pattern
                    if event_type not in self.event_routing:
                        self.event_routing[event_type] = []
                    if agent_id not in self.event_routing[event_type]:
                        self.event_routing[event_type].append(agent_id)
    
    def get_subscribers(self, event: AgentEvent) -> List[str]:
        """Get all agents that should receive this event."""
        subscribers = []
        
        for agent_id, subscription in self.subscriptions.items():
            if subscription.matches(event):
                subscribers.append(agent_id)
        
        return subscribers
    
    async def route_event(self, event: AgentEvent) -> List[str]:
        """Route an event to all matching subscribers."""
        subscribers = self.get_subscribers(event)
        processed_agents = []
        
        for agent_id in subscribers:
            subscription = self.subscriptions[agent_id]
            
            # Check concurrency limits
            if subscription.active_tasks >= subscription.max_concurrent:
                print(f"Agent {agent_id} at capacity, skipping event {event.event_id}")
                continue
            
            # Process event asynchronously
            asyncio.create_task(self._process_with_agent(agent_id, event))
            processed_agents.append(agent_id)
        
        return processed_agents
    
    async def _process_with_agent(self, agent_id: str, event: AgentEvent):
        """Process an event with a specific agent."""
        subscription = self.subscriptions[agent_id]
        subscription.active_tasks += 1
        
        try:
            if subscription.callback:
                await subscription.callback(event)
        except Exception as e:
            print(f"Error processing event {event.event_id} with agent {agent_id}: {e}")
        finally:
            subscription.active_tasks = max(0, subscription.active_tasks - 1)

# Example: Trading Agent with Complex Subscriptions
class TradingAgent:
    def __init__(self, agent_id: str, subscription_manager: SubscriptionManager):
        self.agent_id = agent_id
        self.subscription_manager = subscription_manager
        self.positions = {}
        self.setup_subscriptions()
    
    def setup_subscriptions(self):
        """Set up event subscriptions for the trading agent."""
        subscription = self.subscription_manager.register_agent(
            self.agent_id, 
            self.handle_event,
            priority=3,  # High priority for trading
            max_concurrent=5
        )
        
        # Subscribe to market data events
        subscription.add_filter(SubscriptionFilter(
            filter_type=SubscriptionType.EVENT_TYPE,
            pattern="market_data"
        ))
        
        # Subscribe to high-priority alerts
        subscription.add_filter(SubscriptionFilter(
            filter_type=SubscriptionType.JSON_PATH,
            pattern="$.priority[?(@<3)]"
        ))
        
        # Subscribe to events for specific symbols
        subscription.add_filter(SubscriptionFilter(
            filter_type=SubscriptionType.JSON_PATH,
            pattern="$.payload.symbol[?(@=~/^AAPL|MSFT|GOOGL$/)]"
        ))
        
        # Subscribe to events containing "volatility" in the content
        subscription.add_filter(SubscriptionFilter(
            filter_type=SubscriptionType.CONTENT_FILTER,
            pattern="volatility",
            case_sensitive=False
        ))
    
    async def handle_event(self, event: AgentEvent):
        """Handle incoming events for the trading agent."""
        print(f"Trading agent {self.agent_id} processing event: {event.event_type}")
        
        if event.event_type == "market_data":
            await self.process_market_data(event.payload)
        elif event.event_type == "alert":
            await self.process_alert(event.payload)
        elif event.event_type == "order_update":
            await self.process_order_update(event.payload)
    
    async def process_market_data(self, payload: Dict[str, Any]):
        """Process market data and make trading decisions."""
        symbol = payload.get('symbol')
        price = payload.get('price')
        volume = payload.get('volume')
        
        if not all([symbol, price, volume]):
            return
        
        # Simple trading logic based on volume spikes
        if volume > payload.get('avg_volume', 0) * 1.5:
            print(f"High volume detected for {symbol}: {volume}")
            # Trigger buy/sell decision based on price movement
            await self.evaluate_position(symbol, price, volume)
    
    async def process_alert(self, payload: Dict[str, Any]):
        """Process high-priority alerts."""
        alert_type = payload.get('type')
        message = payload.get('message')
        
        print(f"Alert received: {alert_type} - {message}")
        
        if alert_type == "risk_limit_exceeded":
            await self.close_all_positions()
        elif alert_type == "market_halt":
            await self.pause_trading()
    
    async def evaluate_position(self, symbol: str, price: float, volume: int):
        """Evaluate whether to open, close, or adjust a position."""
        # This would contain your actual trading logic
        print(f"Evaluating position for {symbol} at {price} with volume {volume}")
    
    async def close_all_positions(self):
        """Close all open positions."""
        print("Closing all positions due to risk limit")
    
    async def pause_trading(self):
        """Pause all trading activities."""
        print("Pausing trading due to market halt")

This subscription model provides powerful filtering capabilities that allow agents to express exactly what events they care about. The trading agent example shows how complex business logic can be built on top of simple event subscriptions.

Multi-Agent Event Collaboration

Real-world AI systems rarely involve single agents. Most applications require multiple agents working together, each with specialized capabilities. Event-driven architectures excel at enabling this kind of loose coupling and emergent behavior.

The key to successful multi-agent collaboration is designing clear event contracts and communication patterns. Agents need to know what events to expect, what format they’ll be in, and how to handle failures gracefully.

Here’s how to implement robust multi-agent collaboration:

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import asyncio
from datetime import datetime, timedelta
import uuid

class CollaborationPattern(Enum):
    REQUEST_RESPONSE = "request_response"
    PUBLISH_SUBSCRIBE = "publish_subscribe"
    EVENT_CHOREOGRAPHY = "event_choreography"
    SAGA_PATTERN = "saga_pattern"
    WORKFLOW_ORCHESTRATION = "workflow_orchestration"

@dataclass
class AgentCapability:
    agent_id: str
    capability_type: str
    input_schema: Dict[str, Any]
    output_schema: Dict[str, Any]
    max_concurrent: int = 5
    timeout_seconds: int = 30

class MultiAgentCoordinator:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.agent_capabilities: Dict[str, AgentCapability] = {}
        self.active_collaborations: Dict[str, Dict] = {}
        self.event_handlers: Dict[str, Callable] = {}
        
    def register_agent_capability(self, capability: AgentCapability):
        """Register an agent's capabilities."""
        self.agent_capabilities[capability.agent_id] = capability
        
        # Set up event handlers for this agent
        self.event_handlers[capability.agent_id] = self._create_agent_handler(capability)
    
    def _create_agent_handler(self, capability: AgentCapability) -> Callable:
        """Create an event handler for an agent."""
        async def handler(event: AgentEvent):
            try:
                # Validate input schema
                if not self._validate_schema(event.payload, capability.input_schema):
                    await self._send_error_response(event, "Invalid input schema")
                    return
                
                # Process the request
                result = await self._process_agent_request(capability, event)
                
                # Send response
                response_event = AgentEvent(
                    event_type="agent_response",
                    source_agent=capability.agent_id,
                    target_agents=[event.source_agent] if event.source_agent else [],
                    payload={
                        "request_id": event.event_id,
                        "result": result,
                        "status": "success"
                    },
                    correlation_id=event.event_id
                )
                
                await self.event_bus.publish_event(response_event)
                
            except Exception as e:
                await self._send_error_response(event, str(e))
        
        return handler
    
    def _validate_schema(self, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool:
        """Validate payload against schema (simplified implementation)."""
        # In practice, you'd use a proper JSON schema validator
        required_fields = schema.get('required', [])
        return all(field in payload for field in required_fields)
    
    async def _process_agent_request(self, capability: AgentCapability, event: AgentEvent) -> Dict[str, Any]:
        """Process a request with an agent (placeholder implementation)."""
        # This would contain the actual agent processing logic
        return {"processed": True, "agent_id": capability.agent_id}
    
    async def _send_error_response(self, event: AgentEvent, error_message: str):
        """Send an error response back to the requesting agent."""
        error_event = AgentEvent(
            event_type="agent_error",
            source_agent="coordinator",
            target_agents=[event.source_agent] if event.source_agent else [],
            payload={
                "request_id": event.event_id,
                "error": error_message,
                "status": "error"
            },
            correlation_id=event.event_id
        )
        
        await self.event_bus.publish_event(error_event)
    
    async def initiate_collaboration(self, pattern: CollaborationPattern, 
                                   participants: List[str], 
                                   initial_payload: Dict[str, Any]) -> str:
        """Initiate a multi-agent collaboration."""
        collaboration_id = str(uuid.uuid4())
        
        collaboration = {
            "id": collaboration_id,
            "pattern": pattern,
            "participants": participants,
            "status": "active",
            "start_time": datetime.utcnow(),
            "results": {},
            "errors": []
        }
        
        self.active_collaborations[collaboration_id] = collaboration
        
        if pattern == CollaborationPattern.REQUEST_RESPONSE:
            await self._handle_request_response(collaboration_id, participants, initial_payload)
        elif pattern == CollaborationPattern.EVENT_CHOREOGRAPHY:
            await self._handle_event_choreography(collaboration_id, participants, initial_payload)
        elif pattern == CollaborationPattern.SAGA_PATTERN:
            await self._handle_saga_pattern(collaboration_id, participants, initial_payload)
        
        return collaboration_id
    
    async def _handle_request_response(self, collaboration_id: str, 
                                     participants: List[str], 
                                     payload: Dict[str, Any]):
        """Handle request-response collaboration pattern."""
        if not participants:
            return
        
        # Send request to first participant
        primary_agent = participants[0]
        request_event = AgentEvent(
            event_type="collaboration_request",
            source_agent="coordinator",
            target_agents=[primary_agent],
            payload={
                "collaboration_id": collaboration_id,
                "participants": participants,
                "data": payload
            }
        )
        
        await self.event_bus.publish_event(request_event)
    
    async def _handle_event_choreography(self, collaboration_id: str, 
                                       participants: List[str], 
                                       payload: Dict[str, Any]):
        """Handle event choreography pattern."""
        # In event choreography, agents react to events without central coordination
        # Each agent subscribes to relevant events and acts independently
        
        for agent_id in participants:
            choreography_event = AgentEvent(
                event_type="choreography_start",
                source_agent="coordinator",
                target_agents=[agent_id],
                payload={
                    "collaboration_id": collaboration_id,
                    "participants": participants,
                    "data": payload
                }
            )
            
            await self.event_bus.publish_event(choreography_event)
    
    async def _handle_saga_pattern(self, collaboration_id: str, 
                                 participants: List[str], 
                                 payload: Dict[str, Any]):
        """Handle saga pattern for distributed transactions."""
        saga_steps = []
        
        # Define saga steps based on participants
        for i, agent_id in enumerate(participants):
            step = {
                "step_id": f"step_{i}",
                "agent_id": agent_id,
                "action": "process",
                "compensating_action": "rollback",
                "status": "pending"
            }
            saga_steps.append(step)
        
        collaboration = self.active_collaborations[collaboration_id]
        collaboration["saga_steps"] = saga_steps
        
        # Start the saga
        await self._execute_saga_step(collaboration_id, 0)
    
    async def _execute_saga_step(self, collaboration_id: str, step_index: int):
        """Execute a single step in a saga."""
        collaboration = self.active_collaborations.get(collaboration_id)
        if not collaboration or step_index >= len(collaboration["saga_steps"]):
            return
        
        step = collaboration["saga_steps"][step_index]
        agent_id = step["agent_id"]
        
        # Send request to agent
        request_event = AgentEvent(
            event_type="saga_step",
            source_agent="coordinator",
            target_agents=[agent_id],
            payload={
                "collaboration_id": collaboration_id,
                "step_id": step["step_id"],
                "action": step["action"],
                "data": collaboration.get("data", {})
            }
        )
        
        await self.event_bus.publish_event(request_event)
    
    async def handle_collaboration_response(self, event: AgentEvent):
        """Handle responses from agents in a collaboration."""
        payload = event.payload
        collaboration_id = payload.get("collaboration_id")
        
        if not collaboration_id or collaboration_id not in self.active_collaborations:
            return
        
        collaboration = self.active_collaborations[collaboration_id]
        
        if event.event_type == "agent_response":
            await self._handle_agent_success(collaboration, event)
        elif event.event_type == "agent_error":
            await self._handle_agent_error(collaboration, event)
    
    async def _handle_agent_success(self, collaboration: Dict, event: AgentEvent):
        """Handle successful agent response."""
        payload = event.payload
        collaboration["results"][event.source_agent] = payload.get("result", {})
        
        if collaboration["pattern"] == CollaborationPattern.SAGA_PATTERN:
            await self._handle_saga_success(collaboration, event)
        else:
            # For other patterns, check if collaboration is complete
            if len(collaboration["results"]) >= len(collaboration["participants"]):
                await self._complete_collaboration(collaboration["id"])
    
    async def _handle_saga_success(self, collaboration: Dict, event: AgentEvent):
        """Handle successful saga step."""
        step_id = event.payload.get("step_id")
        saga_steps = collaboration.get("saga_steps", [])
        
        # Mark current step as completed
        for step in saga_steps:
            if step["step_id"] == step_id:
                step["status"] = "completed"
                break
        
        # Execute next step
        next_step_index = self._find_next_pending_step(saga_steps)
        if next_step_index is not None:
            await self._execute_saga_step(collaboration["id"], next_step_index)
        else:
            # All steps completed
            await self._complete_collaboration(collaboration["id"])
    
    async def _handle_agent_error(self, collaboration: Dict, event: AgentEvent):
        """Handle agent error in collaboration."""
        collaboration["errors"].append({
            "agent_id": event.source_agent,
            "error": event.payload.get("error", "Unknown error"),
            "timestamp": datetime.utcnow()
        })
        
        if collaboration["pattern"] == CollaborationPattern.SAGA_PATTERN:
            await self._handle_saga_error(collaboration, event)
        else:
            # For other patterns, mark collaboration as failed
            collaboration["status"] = "failed"
            await self._notify_collaboration_complete(collaboration)
    
    async def _handle_saga_error(self, collaboration: Dict, event: AgentEvent):
        """Handle error in saga pattern - initiate compensation."""
        print(f"Saga error in collaboration {collaboration['id']}, initiating compensation")
        
        # Find the step that failed and compensate all previous steps
        failed_step_id = event.payload.get("step_id")
        saga_steps = collaboration.get("saga_steps", [])
        
        # Execute compensating actions in reverse order
        for step in reversed(saga_steps):
            if step["status"] == "completed":
                await self._execute_compensating_action(collaboration["id"], step)
            elif step["step_id"] == failed_step_id:
                break
        
        collaboration["status"] = "compensated"
        await self._notify_collaboration_complete(collaboration)
    
    async def _execute_compensating_action(self, collaboration_id: str, step: Dict):
        """Execute compensating action for a saga step."""
        agent_id = step["agent_id"]
        
        compensate_event = AgentEvent(
            event_type="saga_compensate",
            source_agent="coordinator",
            target_agents=[agent_id],
            payload={
                "collaboration_id": collaboration_id,
                "step_id": step["step_id"],
                "action": step["compensating_action"]
            }
        )
        
        await self.event_bus.publish_event(compensate_event)
    
    def _find_next_pending_step(self, saga_steps: List[Dict]) -> Optional[int]:
        """Find the next pending step in a saga."""
        for i, step in enumerate(saga_steps):
            if step["status"] == "pending":
                return i
        return None
    
    async def _complete_collaboration(self, collaboration_id: str):
        """Mark collaboration as completed."""
        collaboration = self.active_collaborations[collaboration_id]
        collaboration["status"] = "completed"
        collaboration["end_time"] = datetime.utcnow()
        
        await self._notify_collaboration_complete(collaboration)
    
    async def _notify_collaboration_complete(self, collaboration: Dict):
        """Notify all participants that collaboration is complete."""
        completion_event = AgentEvent(
            event_type="collaboration_complete",
            source_agent="coordinator",
            target_agents=collaboration["participants"],
            payload={
                "collaboration_id": collaboration["id"],
                "status": collaboration["status"],
                "results": collaboration["results"],
                "errors": collaboration["errors"]
            }
        )
        
        await self.event_bus.publish_event(completion_event)
        
        # Clean up
        del self.active_collaborations[collaboration["id"]]

# Example: E-commerce Order Processing with Multiple Agents
class EcommerceOrderProcessor:
    def __init__(self, coordinator: MultiAgentCoordinator):
        self.coordinator = coordinator
        self.setup_agents()
    
    def setup_agents(self):
        """Set up the various agents needed for order processing."""
        
        # Inventory Agent
        inventory_capability = AgentCapability(
            agent_id="inventory_agent",
            capability_type="inventory_management",
            input_schema={
                "type": "object",
                "required": ["product_id", "quantity"],
                "properties": {
                    "product_id": {"type": "string"},
                    "quantity": {"type": "integer", "minimum": 1}
                }
            },
            output_schema={
                "type": "object",
                "properties": {
                    "available": {"type": "boolean"},
                    "reserved_quantity": {"type": "integer"}
                }
            }
        )
        
        # Payment Agent
        payment_capability = AgentCapability(
            agent_id="payment_agent",
            capability_type="payment_processing",
            input_schema={
                "type": "object",
                "required": ["amount", "payment_method"],
                "properties": {
                    "amount": {"type": "number"},
                    "payment_method": {"type": "string"}
                }
            },
            output_schema={
                "type": "object",
                "properties": {
                    "transaction_id": {"type": "string"},
                    "status": {"type": "string"}
                }
            }
        )
        
        # Shipping Agent
        shipping_capability = AgentCapability(
            agent_id="shipping_agent",
            capability_type="shipping_management",
            input_schema={
                "type": "object",
                "required": ["order_id", "shipping_address"],
                "properties": {
                    "order_id": {"type": "string"},
                    "shipping_address": {"type": "object"}
                }
            },
            output_schema={
                "type": "object",
                "properties": {
                    "tracking_number": {"type": "string"},
                    "estimated_delivery": {"type": "string"}
                }
            }
        )
        
        # Register all agents
        self.coordinator.register_agent_capability(inventory_capability)
        self.coordinator.register_agent_capability(payment_capability)
        self.coordinator.register_agent_capability(shipping_capability)
    
    async def process_order(self, order_data: Dict[str, Any]) -> str:
        """Process an e-commerce order using multiple agents."""
        
        # Use saga pattern for distributed transaction
        participants = ["inventory_agent", "payment_agent", "shipping_agent"]
        
        collaboration_id = await self.coordinator.initiate_collaboration(
            pattern=CollaborationPattern.SAGA_PATTERN,
            participants=participants,
            initial_payload=order_data
        )
        
        return collaboration_id

This multi-agent collaboration system provides several powerful patterns:

Request-Response: Simple one-to-one communication between agents.

Event Choreography: Agents react to events independently without central coordination.

Saga Pattern: Distributed transactions with compensation logic for handling failures.

Workflow Orchestration: Central coordination of complex multi-step processes.

The e-commerce example shows how these patterns work in practice, with different agents handling inventory, payment, and shipping while maintaining data consistency through the saga pattern.

Conclusion and Future Patterns

Event-driven AI agents represent a fundamental shift in how we build intelligent systems. Instead of centralized, synchronous architectures, we get distributed, reactive systems that can scale and adapt to real-world conditions.

The technology is maturing rapidly. Message brokers like Kafka and NATS are becoming more sophisticated. Event streaming platforms like Apache Pulsar provide advanced features like geo-replication and multi-tenancy. Cloud providers are offering managed event services that handle the complexity of distributed systems.

We’re also seeing the emergence of specialized frameworks for event-driven AI. LangChain’s event system allows agents to react to external events. AutoGen supports multi-agent conversations with event-driven coordination. Custom frameworks like the one we built provide more control but require more implementation work.

The future belongs to hybrid architectures that combine event-driven reactivity with cognitive control. Agents will react to events in real-time while also maintaining long-term goals and strategic thinking. We’ll see systems that can switch between reactive and proactive modes based on context and requirements.

Event-driven AI agents are just the beginning. As the technology matures, we’ll see even more sophisticated patterns emerge. The key is starting simple and building complexity gradually. The patterns and code samples in this article provide a solid foundation for building your own event-driven AI systems.

The question isn’t whether to adopt event-driven architectures—it’s how quickly you can start building systems that think and react as dynamically as the world around them.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000