Skip to main content

Multi-Agent Orchestration

Advanced multi-agent orchestration system for complex workflows involving multiple specialized agents, task decomposition, intelligent handoffs, and result synthesis.

Core Classes

EnhancedMultiAgentOrchestrator

Description: Main orchestrator for managing multi-agent workflows

Parameters:

  • max_concurrent_agents (int): Maximum concurrent agents (default: 5)
  • timeout_seconds (int): Workflow timeout (default: 300)
  • enable_persistence (bool): Enable workflow persistence (default: True)
  • retry_failed_tasks (bool): Retry failed tasks (default: True)

Returns: EnhancedMultiAgentOrchestrator instance

Example:

from recoagent.agents import EnhancedMultiAgentOrchestrator, AgentCapability, TaskType

# Create orchestrator
orchestrator = EnhancedMultiAgentOrchestrator(
max_concurrent_agents=3,
timeout_seconds=600,
enable_persistence=True
)

# Register agents
orchestrator.register_agent(
"research_agent",
research_agent,
AgentCapability(
agent_id="research_agent",
agent_name="Research Agent",
task_types=[TaskType.RESEARCH, TaskType.ANALYSIS],
description="Conducts research and analysis"
)
)

# Execute workflow
result = orchestrator.execute_workflow({
"query": "Research AI trends and create a summary",
"user_id": "user_123"
})

TaskDecomposer

Description: Decomposes complex tasks into subtasks for different agents

Parameters:

  • max_subtasks (int): Maximum number of subtasks (default: 10)
  • complexity_threshold (float): Threshold for task complexity (default: 0.7)

Returns: TaskDecomposer instance

Example:

from recoagent.agents import TaskDecomposer, TaskType

# Create task decomposer
decomposer = TaskDecomposer(
max_subtasks=5,
complexity_threshold=0.6
)

# Decompose complex task
task_plan = decomposer.decompose_task({
"query": "Analyze market trends and create a business report",
"context": "Technology sector analysis"
})

print(f"Created {len(task_plan.subtasks)} subtasks")
for subtask in task_plan.subtasks:
print(f"- {subtask.task_type}: {subtask.description}")

HandoffProtocol

Description: Manages handoffs between agents with context preservation

Parameters:

  • context_retention (bool): Retain context across handoffs (default: True)
  • handoff_timeout (int): Handoff timeout in seconds (default: 30)

Returns: HandoffProtocol instance

Example:

from recoagent.agents import HandoffProtocol, HandoffContext

# Create handoff protocol
handoff_protocol = HandoffProtocol(
context_retention=True,
handoff_timeout=60
)

# Create handoff context
context = HandoffContext(
from_agent="research_agent",
to_agent="writing_agent",
context_data={"research_results": "..."},
priority="high"
)

# Execute handoff
result = handoff_protocol.execute_handoff(context)

ResultSynthesizer

Description: Synthesizes results from multiple agents into coherent output

Parameters:

  • synthesis_strategy (str): Synthesis strategy ("merge", "hierarchical", "weighted")
  • quality_threshold (float): Minimum quality threshold (default: 0.7)

Returns: ResultSynthesizer instance

Example:

from recoagent.agents import ResultSynthesizer, AgentResult

# Create result synthesizer
synthesizer = ResultSynthesizer(
synthesis_strategy="hierarchical",
quality_threshold=0.8
)

# Collect agent results
agent_results = [
AgentResult(
agent_id="research_agent",
result="Research findings...",
confidence=0.9,
metadata={"sources": 5}
),
AgentResult(
agent_id="analysis_agent",
result="Analysis results...",
confidence=0.85,
metadata={"metrics": 3}
)
]

# Synthesize results
synthesized = synthesizer.synthesize_results(agent_results)
print(f"Synthesized result: {synthesized.result}")
print(f"Overall confidence: {synthesized.confidence}")

Usage Examples

Basic Multi-Agent Workflow

from recoagent.agents import EnhancedMultiAgentOrchestrator, AgentCapability, TaskType

# Create orchestrator
orchestrator = EnhancedMultiAgentOrchestrator()

# Register specialized agents
orchestrator.register_agent(
"research_agent",
research_agent,
AgentCapability(
agent_id="research_agent",
agent_name="Research Agent",
task_types=[TaskType.RESEARCH],
description="Conducts research and gathers information"
)
)

orchestrator.register_agent(
"analysis_agent",
analysis_agent,
AgentCapability(
agent_id="analysis_agent",
agent_name="Analysis Agent",
task_types=[TaskType.ANALYSIS],
description="Analyzes data and provides insights"
)
)

orchestrator.register_agent(
"writing_agent",
writing_agent,
AgentCapability(
agent_id="writing_agent",
agent_name="Writing Agent",
task_types=[TaskType.WRITING],
description="Creates written content and reports"
)
)

# Execute complex workflow
result = orchestrator.execute_workflow({
"query": "Research AI trends, analyze the data, and create a comprehensive report",
"user_id": "user_123",
"requirements": {
"report_length": "2000 words",
"include_charts": True,
"deadline": "2024-01-15"
}
})

Advanced Task Decomposition

from recoagent.agents import TaskDecomposer, TaskType

# Create advanced decomposer
decomposer = TaskDecomposer(
max_subtasks=8,
complexity_threshold=0.5
)

# Decompose complex business task
task_plan = decomposer.decompose_task({
"query": "Create a comprehensive market analysis for AI startups",
"context": {
"industry": "artificial intelligence",
"focus": "startups",
"geography": "North America",
"timeframe": "2023-2024"
},
"requirements": {
"include_competitors": True,
"include_funding_data": True,
"include_technology_trends": True,
"format": "executive_summary"
}
})

# Review decomposition
print(f"Task Plan: {task_plan.description}")
print(f"Complexity Score: {task_plan.complexity_score}")
print(f"Estimated Duration: {task_plan.estimated_duration} minutes")

for i, subtask in enumerate(task_plan.subtasks, 1):
print(f"{i}. {subtask.task_type.value}: {subtask.description}")
print(f" Agent: {subtask.assigned_agent}")
print(f" Priority: {subtask.priority}")
print(f" Dependencies: {subtask.dependencies}")

Intelligent Agent Handoffs

from recoagent.agents import HandoffProtocol, HandoffContext, HandoffStatus

# Create handoff protocol
handoff_protocol = HandoffProtocol(
context_retention=True,
handoff_timeout=120
)

# Simulate agent handoff
def simulate_workflow():
# Research agent completes its task
research_context = HandoffContext(
from_agent="research_agent",
to_agent="analysis_agent",
context_data={
"research_results": {
"market_size": "$50B",
"growth_rate": "25%",
"key_players": ["OpenAI", "Anthropic", "Google"],
"sources": ["report1.pdf", "article2.html"]
},
"user_requirements": {
"focus_areas": ["funding", "technology", "competition"]
}
},
priority="high",
metadata={"research_quality": 0.9}
)

# Execute handoff
handoff_result = handoff_protocol.execute_handoff(research_context)

if handoff_result.status == HandoffStatus.SUCCESS:
print("Handoff successful")
print(f"Context transferred: {len(handoff_result.context_data)} items")

# Analysis agent processes with context
analysis_result = analysis_agent.process_with_context(
handoff_result.context_data
)

# Handoff to writing agent
writing_context = HandoffContext(
from_agent="analysis_agent",
to_agent="writing_agent",
context_data={
"research_results": research_context.context_data["research_results"],
"analysis_results": analysis_result,
"user_requirements": research_context.context_data["user_requirements"]
},
priority="high"
)

final_handoff = handoff_protocol.execute_handoff(writing_context)
return final_handoff
else:
print(f"Handoff failed: {handoff_result.error_message}")
return None

# Execute workflow
result = simulate_workflow()

Result Synthesis and Quality Control

from recoagent.agents import ResultSynthesizer, AgentResult

# Create synthesizer with quality control
synthesizer = ResultSynthesizer(
synthesis_strategy="weighted",
quality_threshold=0.8
)

# Collect results from multiple agents
agent_results = [
AgentResult(
agent_id="research_agent",
result="Market research findings: AI market growing at 25% CAGR...",
confidence=0.95,
metadata={
"sources": 10,
"data_quality": "high",
"coverage": "comprehensive"
}
),
AgentResult(
agent_id="analysis_agent",
result="Analysis: Key trends include LLM adoption, enterprise AI...",
confidence=0.88,
metadata={
"analysis_depth": "detailed",
"methodology": "statistical",
"insights": 15
}
),
AgentResult(
agent_id="writing_agent",
result="Executive Summary: The AI startup market shows strong growth...",
confidence=0.92,
metadata={
"word_count": 2000,
"sections": 5,
"readability": "high"
}
)
]

# Synthesize with quality checks
synthesized = synthesizer.synthesize_results(agent_results)

print(f"Final Result Quality: {synthesized.quality_score}")
print(f"Overall Confidence: {synthesized.confidence}")
print(f"Synthesis Strategy: {synthesized.synthesis_strategy}")

# Quality control
if synthesized.quality_score >= 0.8:
print("✅ Quality threshold met - result approved")
final_result = synthesized.result
else:
print("❌ Quality threshold not met - result needs improvement")
# Trigger quality improvement workflow
improved_result = synthesizer.improve_quality(synthesized)
final_result = improved_result.result

Workflow Persistence and Recovery

from recoagent.agents import WorkflowPersistence, WorkflowStatus

# Create workflow persistence
persistence = WorkflowPersistence(
storage_backend="redis",
checkpoint_interval=30 # seconds
)

# Start persistent workflow
workflow_id = "market_analysis_001"
orchestrator = EnhancedMultiAgentOrchestrator(
enable_persistence=True,
persistence=persistence
)

# Execute workflow with persistence
try:
result = orchestrator.execute_workflow({
"query": "Complex market analysis",
"workflow_id": workflow_id,
"user_id": "user_123"
})
except Exception as e:
print(f"Workflow interrupted: {e}")

# Resume workflow
resumed_result = orchestrator.resume_workflow(workflow_id)
print(f"Workflow resumed and completed: {resumed_result.status}")

# Check workflow status
status = persistence.get_workflow_status(workflow_id)
print(f"Workflow Status: {status.status}")
print(f"Progress: {status.progress_percentage}%")
print(f"Checkpoints: {len(status.checkpoints)}")

API Reference

EnhancedMultiAgentOrchestrator Methods

register_agent(agent_id: str, agent: Any, capability: AgentCapability) -> None

Register an agent with the orchestrator

Parameters:

  • agent_id (str): Unique agent identifier
  • agent (Any): Agent instance
  • capability (AgentCapability): Agent capabilities

execute_workflow(workflow_request: Dict) -> Dict

Execute a multi-agent workflow

Parameters:

  • workflow_request (Dict): Workflow request with query and context

Returns: Workflow execution result

resume_workflow(workflow_id: str) -> Dict

Resume an interrupted workflow

Parameters:

  • workflow_id (str): Workflow identifier

Returns: Workflow execution result

TaskDecomposer Methods

decompose_task(task_request: Dict) -> TaskPlan

Decompose a complex task into subtasks

Parameters:

  • task_request (Dict): Task request with query and context

Returns: TaskPlan with subtasks

HandoffProtocol Methods

execute_handoff(context: HandoffContext) -> HandoffResult

Execute handoff between agents

Parameters:

  • context (HandoffContext): Handoff context

Returns: HandoffResult with status and data

ResultSynthesizer Methods

synthesize_results(agent_results: List[AgentResult]) -> SynthesizedResult

Synthesize results from multiple agents

Parameters:

  • agent_results (List[AgentResult]): Results from agents

Returns: SynthesizedResult with combined output

See Also