orchestrator.py•16.5 kB
"""
Orchestration Engine for Multi-Agent Workflows
Coordinates parallel and sequential agent execution with intelligent caching
"""
import asyncio
import json
import time
import hashlib
from typing import Dict, List, Any, Optional, Callable, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import logging
from .db_wrapper import ThreadSafeDB
from .cache_manager import CacheManager
from .agent_templates import AgentTemplateManager
from .clean_search import CleanSmartCodeSearch
from .git_analyzer import GitAnalyzer
from .dependency_analyzer import DependencyAnalyzer
from .usage_analyzer import UsageAnalyzer
logger = logging.getLogger(__name__)
class OrchestrationEngine:
"""
Main orchestration engine for coordinating multi-agent workflows
Provides parallel execution, caching, and error handling
"""
def __init__(self, db: ThreadSafeDB, project_root: str = ".", max_workers: int = 4):
"""
Initialize orchestration engine
Args:
db: Thread-safe database connection
project_root: Root directory of project to analyze
max_workers: Maximum parallel workers
"""
self.db = db
self.project_root = Path(project_root)
self.cache = CacheManager(db, ttl_seconds=3600, max_size_mb=100)
self.agents = AgentTemplateManager(db, self.project_root)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# Initialize analysis services for agent access
self.search = CleanSmartCodeSearch(str(self.project_root))
self.git = GitAnalyzer(self.project_root)
self.deps = DependencyAnalyzer(str(self.project_root))
self.usage = UsageAnalyzer(self.project_root)
# Load orchestration flows
self.flows = {}
self.load_orchestration_flows()
# Track execution metrics
self.metrics = {
'flows_executed': 0,
'agents_executed': 0,
'cache_hits': 0,
'errors': 0,
'total_time_ms': 0
}
def load_orchestration_flows(self):
"""Load predefined orchestration flows from database and defaults"""
# Load from database
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT name, description, agent_sequence, parallel_groups,
cache_strategy, trigger_pattern
FROM orchestration_flows
ORDER BY success_count DESC
""")
for row in cursor:
flow = {
'name': row[0],
'description': row[1],
'agent_sequence': json.loads(row[2]) if row[2] else [],
'parallel_groups': json.loads(row[3]) if row[3] else [],
'cache_strategy': row[4] or 'conservative',
'trigger_pattern': row[5]
}
self.flows[flow['name']] = flow
# Add default high-value flows if not in database
default_flows = [
{
'name': 'instant_review',
'description': 'Multi-agent code review orchestration (920/1000 value)',
'parallel_groups': [
['import_analyzer', 'complexity_analyzer', 'function_extractor']
],
'agent_sequence': ['test_gap_finder', 'duplicate_detector'],
'cache_strategy': 'aggressive'
},
{
'name': 'debt_orchestrator',
'description': 'Technical debt analysis and prioritization (900/1000 value)',
'parallel_groups': [
['complexity_analyzer', 'duplicate_detector']
],
'agent_sequence': ['import_analyzer'],
'cache_strategy': 'aggressive'
},
{
'name': 'test_gap_analyzer',
'description': 'Find untested code and suggest tests (880/1000 value)',
'agent_sequence': [
'function_extractor',
'test_gap_finder',
'test_generator'
],
'parallel_groups': [],
'cache_strategy': 'conservative'
},
{
'name': 'import_optimizer',
'description': 'Analyze and optimize imports (650/1000 value)',
'agent_sequence': ['import_analyzer'],
'parallel_groups': [],
'cache_strategy': 'aggressive'
}
]
for flow in default_flows:
if flow['name'] not in self.flows:
self.flows[flow['name']] = flow
self.register_flow(flow)
def register_flow(self, flow: Dict):
"""Register a new orchestration flow in the database"""
with self.db.get_connection() as conn:
conn.execute("""
INSERT OR REPLACE INTO orchestration_flows
(name, description, agent_sequence, parallel_groups, cache_strategy)
VALUES (?, ?, ?, ?, ?)
""", (
flow['name'],
flow.get('description', ''),
json.dumps(flow.get('agent_sequence', [])),
json.dumps(flow.get('parallel_groups', [])),
flow.get('cache_strategy', 'conservative')
))
conn.commit()
async def execute_flow(self, flow_name: str, inputs: Dict) -> Dict:
"""
Execute an orchestration flow with caching
Args:
flow_name: Name of the flow to execute
inputs: Input parameters for the flow
Returns:
Dictionary containing results from all agents
"""
start_time = time.time()
self.metrics['flows_executed'] += 1
# Get flow definition
flow = self.flows.get(flow_name)
if not flow:
raise ValueError(f"Flow '{flow_name}' not found. Available flows: {list(self.flows.keys())}")
# Check cache for entire flow result
cache_key = f"flow_{flow_name}"
cached_result = self.cache.get_or_compute(
cache_key,
inputs,
lambda i: None, # Don't compute yet
force_refresh=False
)
if cached_result is not None:
self.metrics['cache_hits'] += 1
logger.info(f"Flow '{flow_name}' returned from cache")
return cached_result
results = {
'flow_name': flow_name,
'description': flow.get('description', ''),
'timestamp': datetime.now().isoformat(),
'agents': {}
}
# Current inputs that get passed between agents
current_inputs = inputs.copy()
try:
# Execute parallel groups first
if flow.get('parallel_groups'):
for group_idx, group in enumerate(flow['parallel_groups']):
logger.info(f"Executing parallel group {group_idx + 1}: {group}")
group_results = await self.execute_parallel(group, current_inputs)
results['agents'].update(group_results)
# Add results to inputs for next stages
for agent_name, result in group_results.items():
current_inputs[f"{agent_name}_result"] = result
# Execute sequential agents
for agent_name in flow.get('agent_sequence', []):
logger.info(f"Executing sequential agent: {agent_name}")
agent_result = await self.execute_agent(agent_name, current_inputs)
results['agents'][agent_name] = agent_result
# Add result to inputs for next agent
current_inputs[f"{agent_name}_result"] = agent_result
# Cache the complete flow result
self.cache.get_or_compute(
cache_key,
inputs,
lambda i: results,
force_refresh=True # Store the result
)
# Update flow success metrics
with self.db.get_connection() as conn:
conn.execute("""
UPDATE orchestration_flows
SET success_count = success_count + 1,
avg_duration_ms =
(avg_duration_ms * success_count + ?) / (success_count + 1)
WHERE name = ?
""", (int((time.time() - start_time) * 1000), flow_name))
conn.commit()
except Exception as e:
self.metrics['errors'] += 1
logger.error(f"Error in flow '{flow_name}': {str(e)}")
# Update failure metrics
with self.db.get_connection() as conn:
conn.execute("""
UPDATE orchestration_flows
SET failure_count = failure_count + 1
WHERE name = ?
""", (flow_name,))
conn.commit()
results['error'] = str(e)
# Record execution time
execution_time = int((time.time() - start_time) * 1000)
results['execution_ms'] = execution_time
self.metrics['total_time_ms'] += execution_time
return results
async def execute_parallel(self, agent_names: List[str], inputs: Dict) -> Dict:
"""
Execute multiple agents in parallel
Args:
agent_names: List of agent names to execute
inputs: Input parameters for all agents
Returns:
Dictionary mapping agent names to their results
"""
tasks = []
for agent_name in agent_names:
task = asyncio.create_task(self.execute_agent(agent_name, inputs))
tasks.append((agent_name, task))
results = {}
for agent_name, task in tasks:
try:
result = await task
results[agent_name] = result
except Exception as e:
logger.error(f"Error in parallel agent '{agent_name}': {str(e)}")
results[agent_name] = {'error': str(e)}
self.metrics['errors'] += 1
return results
async def execute_agent(self, agent_name: str, inputs: Dict) -> Any:
"""
Execute a single agent with caching
Args:
agent_name: Name of the agent to execute
inputs: Input parameters for the agent
Returns:
Agent execution result
"""
self.metrics['agents_executed'] += 1
# Get agent template
agent = self.agents.get_agent(agent_name)
if not agent:
# Try to create a simple aggregator if it doesn't exist
if 'aggregator' in agent_name.lower():
return self._aggregate_results(inputs)
raise ValueError(f"Agent '{agent_name}' not found")
# Use cache for individual agent results
def compute_agent(i):
return self._run_agent_sync(agent, i)
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
self.cache.get_or_compute,
agent_name,
inputs,
compute_agent,
False # Don't force refresh
)
return result
def _run_agent_sync(self, agent, inputs: Dict) -> Any:
"""
Synchronously execute agent template code
Args:
agent: Agent template
inputs: Input parameters
Returns:
Agent execution result
"""
try:
# Use the agent template manager's execution
return self.agents.execute_agent(agent.name, inputs)
except Exception as e:
logger.error(f"Error executing agent '{agent.name}': {str(e)}")
raise
def _aggregate_results(self, inputs: Dict) -> Dict:
"""
Simple result aggregator for review flows
Args:
inputs: Dictionary containing results from other agents
Returns:
Aggregated results with summary
"""
aggregated = {
'summary': {},
'issues': [],
'suggestions': [],
'metrics': {}
}
# Aggregate results from all agent outputs in inputs
for key, value in inputs.items():
if key.endswith('_result') and isinstance(value, dict):
# Extract issues
if 'unused' in value:
for item in value.get('unused', []):
aggregated['issues'].append({
'type': 'unused_import',
'severity': 'low',
'detail': item
})
# Extract complexity issues
if 'complexity' in value and value.get('complexity', 0) > 10:
aggregated['issues'].append({
'type': 'high_complexity',
'severity': 'medium',
'complexity': value['complexity']
})
# Extract test gaps
if 'untested' in value:
for func in value.get('untested', []):
aggregated['issues'].append({
'type': 'missing_test',
'severity': 'medium',
'function': func
})
# Collect metrics
if 'count' in value:
aggregated['metrics'][key.replace('_result', '_count')] = value['count']
# Generate summary
aggregated['summary'] = {
'total_issues': len(aggregated['issues']),
'high_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'high']),
'medium_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'medium']),
'low_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'low'])
}
return aggregated
def get_metrics(self) -> Dict:
"""Get orchestration engine metrics"""
cache_stats = self.cache.get_stats()
return {
'orchestration': self.metrics,
'cache': cache_stats,
'flows_available': len(self.flows)
}
def list_flows(self) -> List[Dict]:
"""List all available orchestration flows"""
flows_list = []
for name, flow in self.flows.items():
flows_list.append({
'name': name,
'description': flow.get('description', ''),
'agents': len(flow.get('agent_sequence', [])) +
sum(len(g) for g in flow.get('parallel_groups', [])),
'cache_strategy': flow.get('cache_strategy', 'conservative')
})
return flows_list
async def warm_cache(self, patterns: Optional[List[str]] = None):
"""
Warm the cache with common operations
Args:
patterns: List of file patterns to warm cache for
"""
if patterns is None:
patterns = ['*.py', '*.js', '*.ts']
logger.info("Warming cache for common operations...")
# Common flows to warm
warm_flows = ['import_optimizer', 'test_gap_analyzer']
for pattern in patterns:
for flow_name in warm_flows:
try:
# Execute flow to warm cache
await self.execute_flow(flow_name, {'pattern': pattern})
except:
pass # Ignore errors during warming
logger.info("Cache warming complete")
def shutdown(self):
"""Cleanup resources"""
self.executor.shutdown(wait=True)