Adaptive Role Negotiation in Multi-Agent Systems
You have three agents working together. One plans tasks. One executes them. One reviews the results. This works fine when tasks are predictable. But what happens when the workload shifts? The planner gets overwhelmed. The executor sits idle. The reviewer can’t keep up.
Static role assignments break down under uncertainty. Real work isn’t predictable. Tasks change. Priorities shift. Agents need to adapt. They need to negotiate who does what, when, and why.
This is the problem with fixed roles in multi-agent systems. You assign roles at the start, and agents stick to them. Even when it doesn’t make sense. Even when someone else could do the job better. Even when the situation demands flexibility.
Adaptive role negotiation changes that. Agents evaluate their capabilities in real time. They negotiate roles based on current context. They reassign responsibilities when conditions change. They self-organize under uncertainty.
This article explains how adaptive role negotiation works and how to build it.
Introduction: Why Static Roles Limit Multi-Agent Systems
Most multi-agent systems use static roles. You define roles upfront. Planner, Executor, Critic. Each agent gets assigned a role. They stick to it for the entire workflow.
This is simple. It’s also brittle. When conditions change, the system can’t adapt. The planner might be better at execution for a specific task. The executor might have better planning insights. But they can’t switch because roles are fixed.
Consider a software development workflow. You have a Planner agent that breaks down features into tasks. An Executor agent that writes code. A Reviewer agent that checks quality. This works for standard features. But what about a complex refactoring? The Executor might understand the codebase better and should lead planning. The Reviewer might need to execute some validation tests. Static roles prevent this flexibility.
Real-world teams don’t work this way. People switch roles based on context. A developer might lead planning for their area of expertise. A designer might execute prototypes. A manager might review code. Roles are fluid, not fixed.
Multi-agent systems should work the same way. Agents should negotiate roles based on:
- Current capabilities and expertise
- Workload distribution
- Task complexity and requirements
- Historical performance in different roles
- Real-time context and constraints
Real-World Analogs
Swarm robotics shows how this works. Robots don’t have fixed roles. They negotiate tasks based on proximity, battery level, and capability. A robot near a target takes the execution role. A robot with low battery switches to a support role. Roles emerge from negotiation, not assignment.
Distributed project management works similarly. Team members don’t have fixed job titles. They take on roles based on what needs doing. Someone might lead planning for one project and execute tasks for another. Roles adapt to context.
Multi-agent systems need this flexibility. Static assignments work for simple, predictable workflows. But real work is messy. Tasks overlap. Priorities conflict. Agents need to reorganize themselves.
The Challenge of Dynamic Role Adaptation
Dynamic role adaptation isn’t free. It has costs. Agents need to communicate. They need to evaluate options. They need to reach consensus. This coordination overhead can slow things down.
There’s a tradeoff between coordination cost and adaptability. More negotiation means more flexibility, but also more overhead. Less negotiation means faster execution, but less adaptability.
The challenge is finding the right balance. When should agents negotiate? When should they stick to current roles? How do you avoid negotiation loops where agents keep switching roles?
Coordination Cost vs Adaptability
Every role negotiation has a cost. Agents exchange messages. They evaluate proposals. They make decisions. This takes time and resources.
If agents negotiate too often, coordination overhead dominates. The system spends more time negotiating than working. Tasks get delayed. Throughput drops.
If agents never negotiate, they miss opportunities. A better agent might be idle while a worse agent struggles. Work gets stuck. Efficiency suffers.
You need negotiation triggers. Agents should negotiate when:
- Current role assignments are clearly suboptimal
- Workload is imbalanced
- A better-suited agent becomes available
- Task requirements change significantly
- Performance metrics indicate problems
They shouldn’t negotiate when:
- Current assignments are working well
- Negotiation cost exceeds potential benefit
- Tasks are nearly complete
- System is under heavy load
Examples Where Role-Switching Improves Throughput
Here’s a concrete example. You have a multi-agent system processing customer support tickets. Agent A is the Planner. It categorizes tickets and assigns them. Agent B is the Executor. It responds to tickets. Agent C is the Reviewer. It checks response quality.
Normally this works. But then a complex technical ticket arrives. Agent B has deep technical knowledge. It should lead planning for this ticket. Agent A should execute the simpler follow-up tasks. But with static roles, Agent B can’t plan. It has to wait for Agent A’s plan, even though Agent A doesn’t understand the technical details.
With adaptive roles, Agent B proposes taking the Planner role for this ticket. Agent A agrees and switches to Executor. The system processes the ticket faster because the right agent is in the right role.
Another example: workload balancing. Agent A (Planner) has a queue of 50 tasks to plan. Agent B (Executor) has finished its queue and is idle. With static roles, Agent B waits while Agent A struggles. With adaptive roles, Agent B can temporarily take on some planning tasks. The system balances itself.
Role-switching improves throughput when:
- Agents have different capabilities for different tasks
- Workloads are imbalanced
- Task requirements don’t match current role assignments
- Agents can learn from role-switching experiences
Architectural Design
Adaptive role negotiation needs an architecture that supports dynamic reassignment. You need a Role Manager that coordinates negotiations. You need a negotiation protocol that agents can follow. You need memory systems that track role performance over time.
Role Manager: The Meta-Agent
The Role Manager is a meta-agent that coordinates role negotiations. It doesn’t execute tasks. It manages who does what. It evaluates role proposals. It makes reassignment decisions.
The Role Manager maintains:
- Current role assignments for each agent
- Agent capabilities and expertise profiles
- Historical performance in different roles
- Workload distribution across agents
- Negotiation history and outcomes
When an agent wants to negotiate a role, it sends a proposal to the Role Manager. The proposal includes:
- Desired role and rationale
- Current role and why it wants to switch
- Capability evidence (past performance, expertise)
- Expected benefit (throughput, quality, efficiency)
The Role Manager evaluates proposals using:
- Capability matching (does the agent fit the role?)
- Workload balance (will this improve distribution?)
- Historical performance (has this agent done well in this role?)
- System state (is this a good time to switch?)
If the proposal makes sense, the Role Manager approves it and reassigns roles. If not, it explains why and suggests alternatives.
Negotiation via Shared Context Embeddings
Agents need shared context to negotiate effectively. They need to understand:
- What tasks need doing
- What capabilities are required
- Who has those capabilities
- What the current state is
Shared context embeddings encode this information. Each agent maintains embeddings of:
- Task requirements
- Their own capabilities
- Other agents’ capabilities (learned from observation)
- Current system state
When negotiating, agents compare embeddings. They find similarity between task requirements and agent capabilities. They identify mismatches between current roles and optimal assignments.
Here’s how it works. Agent A wants to switch from Executor to Planner. It creates an embedding of the current task requirements. It compares this to embeddings of its own planning capabilities and the current Planner’s capabilities. If its planning embedding is more similar to task requirements, it proposes a switch.
The Role Manager uses these embeddings to evaluate proposals. It can also proactively suggest role changes when it detects mismatches.
Confidence Thresholds
Not every role negotiation should happen. You need confidence thresholds. An agent should only propose a role change if it’s confident the change will help. The Role Manager should only approve if it’s confident the proposal is good.
Confidence comes from:
- Capability similarity scores (how well does the agent match the role?)
- Historical performance (has this worked before?)
- Current system metrics (are things working well now?)
- Proposal quality (is the rationale sound?)
If confidence is below threshold, agents stick to current roles. This prevents unnecessary negotiation overhead.
Temporal Memory for Contextual Negotiation
Agents need memory to negotiate effectively. They need to remember:
- Past role assignments and outcomes
- What worked and what didn’t
- How different agents performed in different roles
- Patterns in task requirements and role needs
Temporal memory tracks this over time. It maintains a timeline of:
- Role assignments and switches
- Task completions and performance metrics
- Negotiation outcomes and their effects
- Agent capability evolution
When negotiating, agents query this memory. They find similar past situations. They see what roles worked then. They use this to inform current proposals.
The Role Manager also uses temporal memory. It identifies patterns. It learns which role assignments work best for which task types. It proactively suggests changes based on historical patterns.
Implementation Walkthrough
Let’s build an adaptive role negotiation system. We’ll create a RoleManager that coordinates role assignments. We’ll implement agents that can negotiate roles. We’ll add a message-passing layer for coordination.
Setting Up the Base Architecture
First, define the core structures:
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import asyncio
import json
import time
class Role(Enum):
"""Available roles in the system."""
PLANNER = "planner"
EXECUTOR = "executor"
CRITIC = "critic"
IDLE = "idle"
@dataclass
class AgentCapability:
"""Represents an agent's capability in a role."""
role: Role
confidence: float # 0.0 to 1.0
historical_performance: float # Average performance score
expertise_areas: List[str]
last_performed: Optional[datetime] = None
@dataclass
class RoleProposal:
"""A proposal to change roles."""
agent_id: str
current_role: Role
proposed_role: Role
rationale: str
confidence: float
expected_benefit: float
capability_evidence: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Task:
"""A task that needs to be completed."""
task_id: str
description: str
requirements: List[str] # Required capabilities
priority: int
status: str = "pending"
assigned_role: Optional[Role] = None
assigned_agent: Optional[str] = None
@dataclass
class AgentMessage:
"""Message between agents."""
sender_id: str
receiver_id: str
message_type: str
content: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
message_id: str = field(default_factory=lambda: f"msg-{int(time.time())}")
Implementing the Role Manager
The RoleManager coordinates role negotiations:
class RoleManager:
"""Manages role assignments and negotiations."""
def __init__(self):
self.role_assignments: Dict[str, Role] = {} # agent_id -> role
self.agent_capabilities: Dict[str, List[AgentCapability]] = {} # agent_id -> capabilities
self.negotiation_history: List[RoleProposal] = []
self.performance_history: Dict[Tuple[str, Role], List[float]] = {} # (agent_id, role) -> performance scores
self.confidence_threshold = 0.6
self.negotiation_cooldown = 60 # seconds between negotiations
def register_agent(self, agent_id: str, initial_role: Role, capabilities: List[AgentCapability]):
"""Register an agent with initial role and capabilities."""
self.role_assignments[agent_id] = initial_role
self.agent_capabilities[agent_id] = capabilities
def evaluate_proposal(self, proposal: RoleProposal) -> Tuple[bool, str]:
"""Evaluate a role change proposal."""
# Check cooldown
recent_proposals = [
p for p in self.negotiation_history
if p.agent_id == proposal.agent_id
and (datetime.now() - p.timestamp).seconds < self.negotiation_cooldown
]
if recent_proposals:
return False, "Too soon after last negotiation"
# Check confidence threshold
if proposal.confidence < self.confidence_threshold:
return False, f"Confidence too low: {proposal.confidence:.2f}"
# Check if agent has capability for proposed role
agent_caps = self.agent_capabilities.get(proposal.agent_id, [])
proposed_cap = next(
(cap for cap in agent_caps if cap.role == proposal.proposed_role),
None
)
if not proposed_cap or proposed_cap.confidence < 0.5:
return False, "Agent lacks capability for proposed role"
# Check workload balance
current_assignments = self._get_role_distribution()
proposed_assignments = current_assignments.copy()
proposed_assignments[proposal.current_role] -= 1
proposed_assignments[proposal.proposed_role] += 1
balance_score = self._evaluate_workload_balance(proposed_assignments)
current_balance = self._evaluate_workload_balance(current_assignments)
if balance_score < current_balance:
return False, "Proposal would worsen workload balance"
# Check historical performance
hist_key = (proposal.agent_id, proposal.proposed_role)
if hist_key in self.performance_history:
avg_performance = sum(self.performance_history[hist_key]) / len(self.performance_history[hist_key])
if avg_performance < 0.5:
return False, "Poor historical performance in this role"
# Proposal looks good
return True, "Approved"
def approve_proposal(self, proposal: RoleProposal):
"""Approve and execute a role change."""
# Update role assignment
old_role = self.role_assignments[proposal.agent_id]
self.role_assignments[proposal.agent_id] = proposal.proposed_role
# Record negotiation
self.negotiation_history.append(proposal)
return old_role, proposal.proposed_role
def _get_role_distribution(self) -> Dict[Role, int]:
"""Get current distribution of agents across roles."""
distribution = {role: 0 for role in Role}
for role in self.role_assignments.values():
distribution[role] += 1
return distribution
def _evaluate_workload_balance(self, distribution: Dict[Role, int]) -> float:
"""Evaluate how balanced the workload is. Higher is better."""
# Simple balance metric: lower variance is better
values = [v for v in distribution.values() if v > 0]
if not values:
return 1.0
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
# Convert variance to balance score (inverse relationship)
return 1.0 / (1.0 + variance)
def record_performance(self, agent_id: str, role: Role, performance_score: float):
"""Record performance for an agent in a role."""
key = (agent_id, role)
if key not in self.performance_history:
self.performance_history[key] = []
self.performance_history[key].append(performance_score)
# Keep only recent history (last 100)
if len(self.performance_history[key]) > 100:
self.performance_history[key] = self.performance_history[key][-100:]
def suggest_role_change(self, agent_id: str, current_task: Task) -> Optional[Role]:
"""Proactively suggest a role change based on task requirements."""
agent_caps = self.agent_capabilities.get(agent_id, [])
current_role = self.role_assignments.get(agent_id)
# Find best matching role for this task
best_role = None
best_score = 0.0
for cap in agent_caps:
# Check if agent's expertise matches task requirements
matching_expertise = sum(
1 for req in current_task.requirements
if req in cap.expertise_areas
)
match_score = matching_expertise / len(current_task.requirements) if current_task.requirements else 0
total_score = match_score * cap.confidence * cap.historical_performance
if total_score > best_score:
best_score = total_score
best_role = cap.role
# Only suggest if it's different from current role and score is high
if best_role and best_role != current_role and best_score > 0.7:
return best_role
return None
Implementing Negotiating Agents
Agents that can negotiate roles:
class NegotiatingAgent:
"""Base agent that can negotiate roles."""
def __init__(self, agent_id: str, role_manager: RoleManager):
self.agent_id = agent_id
self.role_manager = role_manager
self.current_role: Optional[Role] = None
self.message_queue: asyncio.Queue = asyncio.Queue()
self.task_queue: List[Task] = []
self.performance_scores: List[float] = []
async def propose_role_change(self, new_role: Role, rationale: str, task: Optional[Task] = None) -> bool:
"""Propose a role change to the role manager."""
if not self.current_role:
return False
# Calculate confidence based on capabilities
capabilities = self.role_manager.agent_capabilities.get(self.agent_id, [])
target_cap = next((cap for cap in capabilities if cap.role == new_role), None)
if not target_cap:
return False
confidence = target_cap.confidence * target_cap.historical_performance
# Estimate expected benefit
expected_benefit = self._estimate_benefit(new_role, task)
proposal = RoleProposal(
agent_id=self.agent_id,
current_role=self.current_role,
proposed_role=new_role,
rationale=rationale,
confidence=confidence,
expected_benefit=expected_benefit,
capability_evidence={
"capability_confidence": target_cap.confidence,
"historical_performance": target_cap.historical_performance,
"expertise_match": len(set(target_cap.expertise_areas) & set(task.requirements)) if task else 0
}
)
# Evaluate proposal
approved, reason = self.role_manager.evaluate_proposal(proposal)
if approved:
old_role, new_role_actual = self.role_manager.approve_proposal(proposal)
self.current_role = new_role_actual
return True
else:
return False
def _estimate_benefit(self, new_role: Role, task: Optional[Task]) -> float:
"""Estimate the benefit of switching to a new role."""
# Simple heuristic: benefit is higher if task requirements match role
if not task:
return 0.5 # Neutral benefit
capabilities = self.role_manager.agent_capabilities.get(self.agent_id, [])
target_cap = next((cap for cap in capabilities if cap.role == new_role), None)
if not target_cap:
return 0.0
# Benefit based on expertise match
matching = sum(1 for req in task.requirements if req in target_cap.expertise_areas)
match_ratio = matching / len(task.requirements) if task.requirements else 0
return match_ratio * target_cap.confidence
async def process_task(self, task: Task) -> Dict[str, Any]:
"""Process a task in the current role."""
# Check if role change would be beneficial
suggested_role = self.role_manager.suggest_role_change(self.agent_id, task)
if suggested_role and suggested_role != self.current_role:
# Try to negotiate role change
success = await self.propose_role_change(
suggested_role,
f"Task requirements better match {suggested_role.value} role",
task
)
if success:
# Role changed, process with new role
pass
# Process task based on current role
result = await self._execute_role_task(task)
# Record performance
performance_score = self._evaluate_performance(task, result)
self.role_manager.record_performance(self.agent_id, self.current_role, performance_score)
self.performance_scores.append(performance_score)
return result
async def _execute_role_task(self, task: Task) -> Dict[str, Any]:
"""Execute a task based on current role."""
if self.current_role == Role.PLANNER:
return await self._plan_task(task)
elif self.current_role == Role.EXECUTOR:
return await self._execute_task(task)
elif self.current_role == Role.CRITIC:
return await self._critique_task(task)
else:
return {"status": "error", "message": "No role assigned"}
async def _plan_task(self, task: Task) -> Dict[str, Any]:
"""Plan a task (Planner role)."""
# Simulate planning
await asyncio.sleep(0.1)
return {
"status": "planned",
"task_id": task.task_id,
"plan": f"Plan for: {task.description}",
"steps": ["step1", "step2", "step3"]
}
async def _execute_task(self, task: Task) -> Dict[str, Any]:
"""Execute a task (Executor role)."""
# Simulate execution
await asyncio.sleep(0.2)
return {
"status": "completed",
"task_id": task.task_id,
"result": f"Executed: {task.description}"
}
async def _critique_task(self, task: Task) -> Dict[str, Any]:
"""Critique a task (Critic role)."""
# Simulate critique
await asyncio.sleep(0.15)
return {
"status": "reviewed",
"task_id": task.task_id,
"feedback": f"Review of: {task.description}",
"score": 0.85
}
def _evaluate_performance(self, task: Task, result: Dict[str, Any]) -> float:
"""Evaluate performance on a task."""
# Simple performance metric
if result.get("status") in ["completed", "planned", "reviewed"]:
return 0.8 + (hash(task.task_id) % 20) / 100 # Simulated variation
return 0.3
Message-Passing Layer
Agents need to communicate. Here’s an async message bus:
class MessageBus:
"""Async message bus for agent communication."""
def __init__(self):
self.agents: Dict[str, NegotiatingAgent] = {}
self.message_history: List[AgentMessage] = []
def register_agent(self, agent: NegotiatingAgent):
"""Register an agent with the message bus."""
self.agents[agent.agent_id] = agent
async def send_message(self, message: AgentMessage):
"""Send a message to an agent."""
receiver = self.agents.get(message.receiver_id)
if receiver:
await receiver.message_queue.put(message)
self.message_history.append(message)
async def broadcast(self, sender_id: str, message_type: str, content: Dict[str, Any]):
"""Broadcast a message to all agents."""
message = AgentMessage(
sender_id=sender_id,
receiver_id="*", # Broadcast
message_type=message_type,
content=content
)
for agent_id, agent in self.agents.items():
if agent_id != sender_id:
await agent.message_queue.put(message)
self.message_history.append(message)
Dynamic Role Reassignment Logic
Here’s the core logic for dynamic role reassignment:
class AdaptiveRoleSystem:
"""Complete system with adaptive role negotiation."""
def __init__(self):
self.role_manager = RoleManager()
self.message_bus = MessageBus()
self.agents: Dict[str, NegotiatingAgent] = {}
def add_agent(self, agent_id: str, initial_role: Role, capabilities: List[AgentCapability]):
"""Add an agent to the system."""
agent = NegotiatingAgent(agent_id, self.role_manager)
agent.current_role = initial_role
self.agents[agent_id] = agent
self.role_manager.register_agent(agent_id, initial_role, capabilities)
self.message_bus.register_agent(agent)
async def process_tasks(self, tasks: List[Task]):
"""Process a list of tasks with adaptive role assignment."""
results = []
for task in tasks:
# Find best agent for this task based on current roles
best_agent = self._find_best_agent_for_task(task)
if best_agent:
# Process task
result = await best_agent.process_task(task)
results.append(result)
# Check if role rebalancing is needed
await self._check_and_rebalance()
return results
def _find_best_agent_for_task(self, task: Task) -> Optional[NegotiatingAgent]:
"""Find the best agent for a task based on current roles and capabilities."""
best_agent = None
best_score = 0.0
for agent_id, agent in self.agents.items():
if agent.current_role == Role.IDLE:
continue
# Check if agent's role matches task needs
role_match = self._evaluate_role_match(agent.current_role, task)
# Check agent capabilities
capabilities = self.role_manager.agent_capabilities.get(agent_id, [])
current_cap = next(
(cap for cap in capabilities if cap.role == agent.current_role),
None
)
if current_cap:
score = role_match * current_cap.confidence * current_cap.historical_performance
if score > best_score:
best_score = score
best_agent = agent
return best_agent
def _evaluate_role_match(self, role: Role, task: Task) -> float:
"""Evaluate how well a role matches task requirements."""
# Simple heuristic: different roles match different task types
task_lower = task.description.lower()
if role == Role.PLANNER:
return 1.0 if any(word in task_lower for word in ["plan", "design", "organize"]) else 0.5
elif role == Role.EXECUTOR:
return 1.0 if any(word in task_lower for word in ["execute", "implement", "build"]) else 0.5
elif role == Role.CRITIC:
return 1.0 if any(word in task_lower for word in ["review", "check", "validate"]) else 0.5
else:
return 0.3
async def _check_and_rebalance(self):
"""Check if role rebalancing is needed and trigger negotiations."""
# Get current workload distribution
distribution = self.role_manager._get_role_distribution()
# Check for imbalance
active_roles = [count for role, count in distribution.items() if role != Role.IDLE and count > 0]
if not active_roles:
return
max_load = max(active_roles)
min_load = min(active_roles)
# If imbalance is significant, suggest rebalancing
if max_load - min_load > 1:
# Find overloaded and underloaded agents
role_counts = {role: 0 for role in Role}
for agent in self.agents.values():
if agent.current_role:
role_counts[agent.current_role] += 1
# Trigger negotiations for rebalancing
await self._trigger_rebalancing_negotiations(role_counts)
async def _trigger_rebalancing_negotiations(self, role_counts: Dict[Role, int]):
"""Trigger role negotiations to rebalance workload."""
# Find overloaded roles
max_role = max(role_counts.items(), key=lambda x: x[1])[0]
min_role = min(
(r, c) for r, c in role_counts.items()
if r != Role.IDLE and c > 0
)[0]
# Find agents that could switch
for agent_id, agent in self.agents.items():
if agent.current_role == max_role:
# Check if agent can take min_role
capabilities = self.role_manager.agent_capabilities.get(agent_id, [])
target_cap = next((cap for cap in capabilities if cap.role == min_role), None)
if target_cap and target_cap.confidence > 0.6:
# Propose role change
await agent.propose_role_change(
min_role,
"Rebalancing workload",
None
)
Evaluation & Metrics
How do you measure if adaptive role negotiation is working? You need metrics that capture collaboration efficiency, task completion, and system performance.
Collaboration Efficiency
Collaboration efficiency measures how well agents work together. Key metrics:
- Role-fit score: How well agents match their assigned roles for current tasks
- Negotiation overhead: Time spent negotiating vs time spent working
- Coordination latency: Delay from task arrival to assignment
- Role-switch frequency: How often roles change (too much or too little?)
class CollaborationMetrics:
"""Track collaboration efficiency metrics."""
def __init__(self):
self.role_fit_scores: List[float] = []
self.negotiation_times: List[float] = []
self.coordination_latencies: List[float] = []
self.role_switches: int = 0
self.total_negotiations: int = 0
def record_role_fit(self, agent_id: str, role: Role, task: Task, fit_score: float):
"""Record how well an agent's role fits a task."""
self.role_fit_scores.append(fit_score)
def record_negotiation(self, duration: float, successful: bool):
"""Record a negotiation event."""
self.negotiation_times.append(duration)
self.total_negotiations += 1
if successful:
self.role_switches += 1
def record_coordination_latency(self, latency: float):
"""Record time from task arrival to assignment."""
self.coordination_latencies.append(latency)
def get_efficiency_report(self) -> Dict[str, Any]:
"""Get summary of collaboration efficiency."""
return {
"avg_role_fit": sum(self.role_fit_scores) / len(self.role_fit_scores) if self.role_fit_scores else 0,
"avg_negotiation_time": sum(self.negotiation_times) / len(self.negotiation_times) if self.negotiation_times else 0,
"avg_coordination_latency": sum(self.coordination_latencies) / len(self.coordination_latencies) if self.coordination_latencies else 0,
"role_switch_rate": self.role_switches / self.total_negotiations if self.total_negotiations > 0 else 0,
"total_negotiations": self.total_negotiations
}
Task Completion Rate
Task completion rate measures throughput. How many tasks get completed? How fast? With what quality?
class TaskCompletionMetrics:
"""Track task completion metrics."""
def __init__(self):
self.completed_tasks: int = 0
self.failed_tasks: int = 0
self.task_times: List[float] = []
self.task_qualities: List[float] = []
def record_completion(self, task: Task, duration: float, quality: float):
"""Record a completed task."""
self.completed_tasks += 1
self.task_times.append(duration)
self.task_qualities.append(quality)
def record_failure(self, task: Task):
"""Record a failed task."""
self.failed_tasks += 1
def get_completion_report(self) -> Dict[str, Any]:
"""Get task completion summary."""
total = self.completed_tasks + self.failed_tasks
return {
"completion_rate": self.completed_tasks / total if total > 0 else 0,
"avg_completion_time": sum(self.task_times) / len(self.task_times) if self.task_times else 0,
"avg_quality": sum(self.task_qualities) / len(self.task_qualities) if self.task_qualities else 0,
"total_tasks": total
}
Latency Impact
Role negotiation adds latency. You need to measure whether the benefits outweigh the costs.
def measure_latency_impact(system: AdaptiveRoleSystem, tasks: List[Task]) -> Dict[str, float]:
"""Measure latency impact of adaptive role negotiation."""
import time
# Measure with negotiation
start = time.time()
results_with_negotiation = await system.process_tasks(tasks)
time_with_negotiation = time.time() - start
# Measure without negotiation (static roles)
# This would require a separate system with fixed roles
# For simulation, estimate based on negotiation overhead
negotiation_overhead = sum(
len(system.role_manager.negotiation_history) * 0.1 # 100ms per negotiation
)
estimated_static_time = time_with_negotiation - negotiation_overhead
latency_impact = {
"time_with_negotiation": time_with_negotiation,
"estimated_static_time": estimated_static_time,
"negotiation_overhead": negotiation_overhead,
"overhead_percentage": (negotiation_overhead / time_with_negotiation * 100) if time_with_negotiation > 0 else 0
}
return latency_impact
Best Practices & Pitfalls
Adaptive role negotiation is powerful, but it has pitfalls. Here’s how to avoid common problems.
Avoiding Deadlocks
Deadlocks happen when agents can’t agree on role changes. Agent A wants Agent B’s role. Agent B wants Agent A’s role. They’re stuck.
Prevent deadlocks by:
- Using a central Role Manager that makes final decisions
- Implementing negotiation timeouts
- Adding priority rules (e.g., agents with better capabilities win)
- Detecting circular proposals and breaking them
def detect_deadlock(role_manager: RoleManager, recent_proposals: List[RoleProposal]) -> bool:
"""Detect if proposals form a deadlock."""
# Check for circular proposals
# Agent A wants B's role, B wants A's role
proposal_map = {p.agent_id: p.proposed_role for p in recent_proposals}
for agent_id, proposed_role in proposal_map.items():
# Find who currently has the proposed role
current_holder = next(
(aid for aid, role in role_manager.role_assignments.items() if role == proposed_role),
None
)
if current_holder and current_holder in proposal_map:
# Check if current holder wants this agent's role
if role_manager.role_assignments.get(agent_id) == proposal_map.get(current_holder):
return True # Deadlock detected
return False
Preventing Oscillating Role Reassignment
Agents might keep switching roles back and forth. This wastes time and creates instability.
Prevent oscillation by:
- Adding cooldown periods between negotiations
- Requiring minimum confidence improvements
- Tracking role stability over time
- Penalizing frequent switches
class OscillationPrevention:
"""Prevent oscillating role reassignments."""
def __init__(self, role_manager: RoleManager):
self.role_manager = role_manager
self.role_history: Dict[str, List[Tuple[Role, datetime]]] = {} # agent_id -> [(role, time)]
self.max_switches_per_period = 3
self.switch_period_seconds = 300 # 5 minutes
def should_allow_switch(self, agent_id: str, new_role: Role) -> bool:
"""Check if a role switch should be allowed."""
if agent_id not in self.role_history:
self.role_history[agent_id] = []
history = self.role_history[agent_id]
now = datetime.now()
# Filter recent switches
recent_switches = [
(role, time) for role, time in history
if (now - time).seconds < self.switch_period_seconds
]
# Check if switching back to a recent role (oscillation)
if recent_switches:
recent_roles = [role for role, _ in recent_switches]
if new_role in recent_roles:
return False # Oscillating
# Check switch frequency
if len(recent_switches) >= self.max_switches_per_period:
return False # Too many switches
return True
def record_switch(self, agent_id: str, new_role: Role):
"""Record a role switch."""
if agent_id not in self.role_history:
self.role_history[agent_id] = []
self.role_history[agent_id].append((new_role, datetime.now()))
Confidence Calibration
Confidence scores need to be accurate. Overconfident agents propose bad changes. Underconfident agents miss good opportunities.
Calibrate confidence by:
- Tracking prediction accuracy over time
- Adjusting confidence based on historical performance
- Using ensemble methods (multiple confidence estimators)
- Regularly validating confidence against outcomes
class ConfidenceCalibrator:
"""Calibrate confidence scores based on historical accuracy."""
def __init__(self):
self.prediction_history: List[Tuple[float, bool]] = [] # (confidence, was_correct)
def record_prediction(self, confidence: float, was_correct: bool):
"""Record a confidence prediction and outcome."""
self.prediction_history.append((confidence, was_correct))
# Keep only recent history
if len(self.prediction_history) > 1000:
self.prediction_history = self.prediction_history[-1000:]
def calibrate_confidence(self, raw_confidence: float) -> float:
"""Calibrate a confidence score based on historical accuracy."""
if not self.prediction_history:
return raw_confidence
# Calculate calibration factor
# If we predicted 0.8 but were only right 60% of the time, we're overconfident
confidence_buckets = {}
for conf, correct in self.prediction_history:
bucket = round(conf * 10) / 10 # Round to 0.1
if bucket not in confidence_buckets:
confidence_buckets[bucket] = []
confidence_buckets[bucket].append(correct)
# Calculate actual accuracy for each confidence level
calibration_map = {}
for bucket, outcomes in confidence_buckets.items():
actual_accuracy = sum(outcomes) / len(outcomes)
calibration_map[bucket] = actual_accuracy
# Apply calibration
raw_bucket = round(raw_confidence * 10) / 10
if raw_bucket in calibration_map:
# Adjust confidence to match historical accuracy
calibrated = calibration_map[raw_bucket]
return calibrated
return raw_confidence
Unit Test: Demonstrating Adaptive Behavior
Here’s a test that shows the system adapting:
import pytest
import asyncio
@pytest.mark.asyncio
async def test_adaptive_role_negotiation():
"""Test that agents can negotiate and adapt roles."""
system = AdaptiveRoleSystem()
# Create agents with different capabilities
agent_a_caps = [
AgentCapability(Role.PLANNER, 0.9, 0.85, ["planning", "design"]),
AgentCapability(Role.EXECUTOR, 0.6, 0.7, ["implementation"]),
AgentCapability(Role.CRITIC, 0.5, 0.6, ["review"])
]
agent_b_caps = [
AgentCapability(Role.PLANNER, 0.5, 0.6, ["planning"]),
AgentCapability(Role.EXECUTOR, 0.9, 0.9, ["implementation", "coding"]),
AgentCapability(Role.CRITIC, 0.7, 0.75, ["review", "testing"])
]
# Add agents with initial roles
system.add_agent("agent_a", Role.EXECUTOR, agent_a_caps) # Wrong initial role
system.add_agent("agent_b", Role.PLANNER, agent_b_caps) # Wrong initial role
# Create tasks that don't match initial roles
tasks = [
Task("task1", "Plan the architecture for new feature", ["planning", "design"], 1),
Task("task2", "Implement the user interface", ["implementation", "coding"], 1),
]
# Process tasks - agents should negotiate role changes
results = await system.process_tasks(tasks)
# Verify that roles adapted
assert system.role_manager.role_assignments["agent_a"] == Role.PLANNER
assert system.role_manager.role_assignments["agent_b"] == Role.EXECUTOR
# Verify tasks were completed
assert len(results) == 2
assert all(r.get("status") in ["planned", "completed"] for r in results)
# Verify negotiation happened
assert len(system.role_manager.negotiation_history) > 0
@pytest.mark.asyncio
async def test_confidence_scoring():
"""Test that confidence scoring affects role negotiation."""
system = AdaptiveRoleSystem()
# Agent with high confidence in Planner role
high_conf_caps = [AgentCapability(Role.PLANNER, 0.95, 0.9, ["planning"])]
system.add_agent("high_conf", Role.EXECUTOR, high_conf_caps)
# Agent with low confidence in Planner role
low_conf_caps = [AgentCapability(Role.PLANNER, 0.4, 0.5, ["planning"])]
system.add_agent("low_conf", Role.EXECUTOR, low_conf_caps)
task = Task("task1", "Plan the system architecture", ["planning"], 1)
# High confidence agent should successfully negotiate
high_agent = system.agents["high_conf"]
success_high = await high_agent.propose_role_change(
Role.PLANNER,
"High confidence in planning",
task
)
assert success_high == True
# Low confidence agent should fail negotiation
low_agent = system.agents["low_conf"]
success_low = await low_agent.propose_role_change(
Role.PLANNER,
"Low confidence in planning",
task
)
assert success_low == False
Simulation: Task Assignment with Confidence Scoring
Here’s a simulation showing the system in action:
async def simulate_adaptive_system():
"""Simulate adaptive role negotiation system."""
system = AdaptiveRoleSystem()
metrics = CollaborationMetrics()
completion_metrics = TaskCompletionMetrics()
# Create diverse agents
agents_config = [
("planner_specialist", Role.PLANNER, [
AgentCapability(Role.PLANNER, 0.95, 0.9, ["planning", "architecture"]),
AgentCapability(Role.EXECUTOR, 0.5, 0.6, ["basic"]),
]),
("executor_specialist", Role.EXECUTOR, [
AgentCapability(Role.PLANNER, 0.4, 0.5, ["basic"]),
AgentCapability(Role.EXECUTOR, 0.95, 0.92, ["implementation", "coding"]),
]),
("flexible_agent", Role.CRITIC, [
AgentCapability(Role.PLANNER, 0.7, 0.75, ["planning"]),
AgentCapability(Role.EXECUTOR, 0.75, 0.8, ["implementation"]),
AgentCapability(Role.CRITIC, 0.8, 0.85, ["review", "testing"]),
]),
]
for agent_id, initial_role, caps in agents_config:
system.add_agent(agent_id, initial_role, caps)
# Generate diverse tasks
tasks = [
Task(f"task_{i}", desc, reqs, 1)
for i, (desc, reqs) in enumerate([
("Plan the microservices architecture", ["planning", "architecture"]),
("Implement the API endpoints", ["implementation", "coding"]),
("Review the code quality", ["review", "testing"]),
("Design the database schema", ["planning", "architecture"]),
("Write unit tests", ["implementation", "coding"]),
])
]
# Process tasks
start_time = time.time()
results = await system.process_tasks(tasks)
total_time = time.time() - start_time
# Generate report
print("=== Adaptive Role Negotiation Simulation ===")
print(f"\nTotal time: {total_time:.2f}s")
print(f"\nFinal role assignments:")
for agent_id, role in system.role_manager.role_assignments.items():
print(f" {agent_id}: {role.value}")
print(f"\nNegotiations: {len(system.role_manager.negotiation_history)}")
print(f"Role switches: {sum(1 for _ in system.role_manager.negotiation_history)}")
print(f"\nTask completion:")
print(f" Completed: {completion_metrics.completed_tasks}")
print(f" Failed: {completion_metrics.failed_tasks}")
return system, results
# Run simulation
# asyncio.run(simulate_adaptive_system())
Conclusion: Future of Self-Organizing Agent Societies
Adaptive role negotiation enables self-organizing agent systems. Agents don’t need fixed roles. They negotiate. They adapt. They optimize themselves.
This matters as multi-agent systems scale. With fixed roles, you need to manually assign agents to tasks. You need to predict workload. You need to handle imbalances. With adaptive negotiation, agents handle this themselves.
The future is self-organizing agent societies. Agents that form teams dynamically. Agents that negotiate responsibilities. Agents that learn from experience and improve over time.
This requires better negotiation protocols. More sophisticated capability matching. Smarter confidence estimation. But the foundation is here. Role negotiation works. It improves throughput. It handles uncertainty. It scales.
The goal isn’t perfect role assignment. It’s good enough assignment that adapts to change. Adaptive role negotiation delivers that. It makes multi-agent systems more flexible, more efficient, and more resilient.
If you’re building multi-agent systems, consider adaptive roles. Start with simple negotiation. Add confidence thresholds. Track performance. Iterate. The benefits are real. Agents work better when they can adapt.
Discussion
Loading comments...