Multi-Agent Orchestration
Advanced multi-agent orchestration system for complex workflows involving multiple specialized agents, task decomposition, intelligent handoffs, and result synthesis.
Core Classes
EnhancedMultiAgentOrchestrator
Description: Main orchestrator for managing multi-agent workflows
Parameters:
max_concurrent_agents(int): Maximum concurrent agents (default: 5)timeout_seconds(int): Workflow timeout (default: 300)enable_persistence(bool): Enable workflow persistence (default: True)retry_failed_tasks(bool): Retry failed tasks (default: True)
Returns: EnhancedMultiAgentOrchestrator instance
Example:
from recoagent.agents import EnhancedMultiAgentOrchestrator, AgentCapability, TaskType
# Create orchestrator
orchestrator = EnhancedMultiAgentOrchestrator(
max_concurrent_agents=3,
timeout_seconds=600,
enable_persistence=True
)
# Register agents
orchestrator.register_agent(
"research_agent",
research_agent,
AgentCapability(
agent_id="research_agent",
agent_name="Research Agent",
task_types=[TaskType.RESEARCH, TaskType.ANALYSIS],
description="Conducts research and analysis"
)
)
# Execute workflow
result = orchestrator.execute_workflow({
"query": "Research AI trends and create a summary",
"user_id": "user_123"
})
TaskDecomposer
Description: Decomposes complex tasks into subtasks for different agents
Parameters:
max_subtasks(int): Maximum number of subtasks (default: 10)complexity_threshold(float): Threshold for task complexity (default: 0.7)
Returns: TaskDecomposer instance
Example:
from recoagent.agents import TaskDecomposer, TaskType
# Create task decomposer
decomposer = TaskDecomposer(
max_subtasks=5,
complexity_threshold=0.6
)
# Decompose complex task
task_plan = decomposer.decompose_task({
"query": "Analyze market trends and create a business report",
"context": "Technology sector analysis"
})
print(f"Created {len(task_plan.subtasks)} subtasks")
for subtask in task_plan.subtasks:
print(f"- {subtask.task_type}: {subtask.description}")
HandoffProtocol
Description: Manages handoffs between agents with context preservation
Parameters:
context_retention(bool): Retain context across handoffs (default: True)handoff_timeout(int): Handoff timeout in seconds (default: 30)
Returns: HandoffProtocol instance
Example:
from recoagent.agents import HandoffProtocol, HandoffContext
# Create handoff protocol
handoff_protocol = HandoffProtocol(
context_retention=True,
handoff_timeout=60
)
# Create handoff context
context = HandoffContext(
from_agent="research_agent",
to_agent="writing_agent",
context_data={"research_results": "..."},
priority="high"
)
# Execute handoff
result = handoff_protocol.execute_handoff(context)
ResultSynthesizer
Description: Synthesizes results from multiple agents into coherent output
Parameters:
synthesis_strategy(str): Synthesis strategy ("merge", "hierarchical", "weighted")quality_threshold(float): Minimum quality threshold (default: 0.7)
Returns: ResultSynthesizer instance
Example:
from recoagent.agents import ResultSynthesizer, AgentResult
# Create result synthesizer
synthesizer = ResultSynthesizer(
synthesis_strategy="hierarchical",
quality_threshold=0.8
)
# Collect agent results
agent_results = [
AgentResult(
agent_id="research_agent",
result="Research findings...",
confidence=0.9,
metadata={"sources": 5}
),
AgentResult(
agent_id="analysis_agent",
result="Analysis results...",
confidence=0.85,
metadata={"metrics": 3}
)
]
# Synthesize results
synthesized = synthesizer.synthesize_results(agent_results)
print(f"Synthesized result: {synthesized.result}")
print(f"Overall confidence: {synthesized.confidence}")
Usage Examples
Basic Multi-Agent Workflow
from recoagent.agents import EnhancedMultiAgentOrchestrator, AgentCapability, TaskType
# Create orchestrator
orchestrator = EnhancedMultiAgentOrchestrator()
# Register specialized agents
orchestrator.register_agent(
"research_agent",
research_agent,
AgentCapability(
agent_id="research_agent",
agent_name="Research Agent",
task_types=[TaskType.RESEARCH],
description="Conducts research and gathers information"
)
)
orchestrator.register_agent(
"analysis_agent",
analysis_agent,
AgentCapability(
agent_id="analysis_agent",
agent_name="Analysis Agent",
task_types=[TaskType.ANALYSIS],
description="Analyzes data and provides insights"
)
)
orchestrator.register_agent(
"writing_agent",
writing_agent,
AgentCapability(
agent_id="writing_agent",
agent_name="Writing Agent",
task_types=[TaskType.WRITING],
description="Creates written content and reports"
)
)
# Execute complex workflow
result = orchestrator.execute_workflow({
"query": "Research AI trends, analyze the data, and create a comprehensive report",
"user_id": "user_123",
"requirements": {
"report_length": "2000 words",
"include_charts": True,
"deadline": "2024-01-15"
}
})
Advanced Task Decomposition
from recoagent.agents import TaskDecomposer, TaskType
# Create advanced decomposer
decomposer = TaskDecomposer(
max_subtasks=8,
complexity_threshold=0.5
)
# Decompose complex business task
task_plan = decomposer.decompose_task({
"query": "Create a comprehensive market analysis for AI startups",
"context": {
"industry": "artificial intelligence",
"focus": "startups",
"geography": "North America",
"timeframe": "2023-2024"
},
"requirements": {
"include_competitors": True,
"include_funding_data": True,
"include_technology_trends": True,
"format": "executive_summary"
}
})
# Review decomposition
print(f"Task Plan: {task_plan.description}")
print(f"Complexity Score: {task_plan.complexity_score}")
print(f"Estimated Duration: {task_plan.estimated_duration} minutes")
for i, subtask in enumerate(task_plan.subtasks, 1):
print(f"{i}. {subtask.task_type.value}: {subtask.description}")
print(f" Agent: {subtask.assigned_agent}")
print(f" Priority: {subtask.priority}")
print(f" Dependencies: {subtask.dependencies}")
Intelligent Agent Handoffs
from recoagent.agents import HandoffProtocol, HandoffContext, HandoffStatus
# Create handoff protocol
handoff_protocol = HandoffProtocol(
context_retention=True,
handoff_timeout=120
)
# Simulate agent handoff
def simulate_workflow():
# Research agent completes its task
research_context = HandoffContext(
from_agent="research_agent",
to_agent="analysis_agent",
context_data={
"research_results": {
"market_size": "$50B",
"growth_rate": "25%",
"key_players": ["OpenAI", "Anthropic", "Google"],
"sources": ["report1.pdf", "article2.html"]
},
"user_requirements": {
"focus_areas": ["funding", "technology", "competition"]
}
},
priority="high",
metadata={"research_quality": 0.9}
)
# Execute handoff
handoff_result = handoff_protocol.execute_handoff(research_context)
if handoff_result.status == HandoffStatus.SUCCESS:
print("Handoff successful")
print(f"Context transferred: {len(handoff_result.context_data)} items")
# Analysis agent processes with context
analysis_result = analysis_agent.process_with_context(
handoff_result.context_data
)
# Handoff to writing agent
writing_context = HandoffContext(
from_agent="analysis_agent",
to_agent="writing_agent",
context_data={
"research_results": research_context.context_data["research_results"],
"analysis_results": analysis_result,
"user_requirements": research_context.context_data["user_requirements"]
},
priority="high"
)
final_handoff = handoff_protocol.execute_handoff(writing_context)
return final_handoff
else:
print(f"Handoff failed: {handoff_result.error_message}")
return None
# Execute workflow
result = simulate_workflow()
Result Synthesis and Quality Control
from recoagent.agents import ResultSynthesizer, AgentResult
# Create synthesizer with quality control
synthesizer = ResultSynthesizer(
synthesis_strategy="weighted",
quality_threshold=0.8
)
# Collect results from multiple agents
agent_results = [
AgentResult(
agent_id="research_agent",
result="Market research findings: AI market growing at 25% CAGR...",
confidence=0.95,
metadata={
"sources": 10,
"data_quality": "high",
"coverage": "comprehensive"
}
),
AgentResult(
agent_id="analysis_agent",
result="Analysis: Key trends include LLM adoption, enterprise AI...",
confidence=0.88,
metadata={
"analysis_depth": "detailed",
"methodology": "statistical",
"insights": 15
}
),
AgentResult(
agent_id="writing_agent",
result="Executive Summary: The AI startup market shows strong growth...",
confidence=0.92,
metadata={
"word_count": 2000,
"sections": 5,
"readability": "high"
}
)
]
# Synthesize with quality checks
synthesized = synthesizer.synthesize_results(agent_results)
print(f"Final Result Quality: {synthesized.quality_score}")
print(f"Overall Confidence: {synthesized.confidence}")
print(f"Synthesis Strategy: {synthesized.synthesis_strategy}")
# Quality control
if synthesized.quality_score >= 0.8:
print("✅ Quality threshold met - result approved")
final_result = synthesized.result
else:
print("❌ Quality threshold not met - result needs improvement")
# Trigger quality improvement workflow
improved_result = synthesizer.improve_quality(synthesized)
final_result = improved_result.result
Workflow Persistence and Recovery
from recoagent.agents import WorkflowPersistence, WorkflowStatus
# Create workflow persistence
persistence = WorkflowPersistence(
storage_backend="redis",
checkpoint_interval=30 # seconds
)
# Start persistent workflow
workflow_id = "market_analysis_001"
orchestrator = EnhancedMultiAgentOrchestrator(
enable_persistence=True,
persistence=persistence
)
# Execute workflow with persistence
try:
result = orchestrator.execute_workflow({
"query": "Complex market analysis",
"workflow_id": workflow_id,
"user_id": "user_123"
})
except Exception as e:
print(f"Workflow interrupted: {e}")
# Resume workflow
resumed_result = orchestrator.resume_workflow(workflow_id)
print(f"Workflow resumed and completed: {resumed_result.status}")
# Check workflow status
status = persistence.get_workflow_status(workflow_id)
print(f"Workflow Status: {status.status}")
print(f"Progress: {status.progress_percentage}%")
print(f"Checkpoints: {len(status.checkpoints)}")
API Reference
EnhancedMultiAgentOrchestrator Methods
register_agent(agent_id: str, agent: Any, capability: AgentCapability) -> None
Register an agent with the orchestrator
Parameters:
agent_id(str): Unique agent identifieragent(Any): Agent instancecapability(AgentCapability): Agent capabilities
execute_workflow(workflow_request: Dict) -> Dict
Execute a multi-agent workflow
Parameters:
workflow_request(Dict): Workflow request with query and context
Returns: Workflow execution result
resume_workflow(workflow_id: str) -> Dict
Resume an interrupted workflow
Parameters:
workflow_id(str): Workflow identifier
Returns: Workflow execution result
TaskDecomposer Methods
decompose_task(task_request: Dict) -> TaskPlan
Decompose a complex task into subtasks
Parameters:
task_request(Dict): Task request with query and context
Returns: TaskPlan with subtasks
HandoffProtocol Methods
execute_handoff(context: HandoffContext) -> HandoffResult
Execute handoff between agents
Parameters:
context(HandoffContext): Handoff context
Returns: HandoffResult with status and data
ResultSynthesizer Methods
synthesize_results(agent_results: List[AgentResult]) -> SynthesizedResult
Synthesize results from multiple agents
Parameters:
agent_results(List[AgentResult]): Results from agents
Returns: SynthesizedResult with combined output
See Also
- Agent Graphs - Individual agent state machines
- Agent Tools - Available tools for agents
- Agent Policies - Safety and governance
- Agent Callbacks - Monitoring and metrics
- Agent Middleware - Request processing