Skip to main content
Glama

Smart Code Search MCP Server

orchestrator.py16.5 kB
""" Orchestration Engine for Multi-Agent Workflows Coordinates parallel and sequential agent execution with intelligent caching """ import asyncio import json import time import hashlib from typing import Dict, List, Any, Optional, Callable, Union from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path import logging from .db_wrapper import ThreadSafeDB from .cache_manager import CacheManager from .agent_templates import AgentTemplateManager from .clean_search import CleanSmartCodeSearch from .git_analyzer import GitAnalyzer from .dependency_analyzer import DependencyAnalyzer from .usage_analyzer import UsageAnalyzer logger = logging.getLogger(__name__) class OrchestrationEngine: """ Main orchestration engine for coordinating multi-agent workflows Provides parallel execution, caching, and error handling """ def __init__(self, db: ThreadSafeDB, project_root: str = ".", max_workers: int = 4): """ Initialize orchestration engine Args: db: Thread-safe database connection project_root: Root directory of project to analyze max_workers: Maximum parallel workers """ self.db = db self.project_root = Path(project_root) self.cache = CacheManager(db, ttl_seconds=3600, max_size_mb=100) self.agents = AgentTemplateManager(db, self.project_root) self.executor = ThreadPoolExecutor(max_workers=max_workers) # Initialize analysis services for agent access self.search = CleanSmartCodeSearch(str(self.project_root)) self.git = GitAnalyzer(self.project_root) self.deps = DependencyAnalyzer(str(self.project_root)) self.usage = UsageAnalyzer(self.project_root) # Load orchestration flows self.flows = {} self.load_orchestration_flows() # Track execution metrics self.metrics = { 'flows_executed': 0, 'agents_executed': 0, 'cache_hits': 0, 'errors': 0, 'total_time_ms': 0 } def load_orchestration_flows(self): """Load predefined orchestration flows from database and defaults""" # Load from database with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT name, description, agent_sequence, parallel_groups, cache_strategy, trigger_pattern FROM orchestration_flows ORDER BY success_count DESC """) for row in cursor: flow = { 'name': row[0], 'description': row[1], 'agent_sequence': json.loads(row[2]) if row[2] else [], 'parallel_groups': json.loads(row[3]) if row[3] else [], 'cache_strategy': row[4] or 'conservative', 'trigger_pattern': row[5] } self.flows[flow['name']] = flow # Add default high-value flows if not in database default_flows = [ { 'name': 'instant_review', 'description': 'Multi-agent code review orchestration (920/1000 value)', 'parallel_groups': [ ['import_analyzer', 'complexity_analyzer', 'function_extractor'] ], 'agent_sequence': ['test_gap_finder', 'duplicate_detector'], 'cache_strategy': 'aggressive' }, { 'name': 'debt_orchestrator', 'description': 'Technical debt analysis and prioritization (900/1000 value)', 'parallel_groups': [ ['complexity_analyzer', 'duplicate_detector'] ], 'agent_sequence': ['import_analyzer'], 'cache_strategy': 'aggressive' }, { 'name': 'test_gap_analyzer', 'description': 'Find untested code and suggest tests (880/1000 value)', 'agent_sequence': [ 'function_extractor', 'test_gap_finder', 'test_generator' ], 'parallel_groups': [], 'cache_strategy': 'conservative' }, { 'name': 'import_optimizer', 'description': 'Analyze and optimize imports (650/1000 value)', 'agent_sequence': ['import_analyzer'], 'parallel_groups': [], 'cache_strategy': 'aggressive' } ] for flow in default_flows: if flow['name'] not in self.flows: self.flows[flow['name']] = flow self.register_flow(flow) def register_flow(self, flow: Dict): """Register a new orchestration flow in the database""" with self.db.get_connection() as conn: conn.execute(""" INSERT OR REPLACE INTO orchestration_flows (name, description, agent_sequence, parallel_groups, cache_strategy) VALUES (?, ?, ?, ?, ?) """, ( flow['name'], flow.get('description', ''), json.dumps(flow.get('agent_sequence', [])), json.dumps(flow.get('parallel_groups', [])), flow.get('cache_strategy', 'conservative') )) conn.commit() async def execute_flow(self, flow_name: str, inputs: Dict) -> Dict: """ Execute an orchestration flow with caching Args: flow_name: Name of the flow to execute inputs: Input parameters for the flow Returns: Dictionary containing results from all agents """ start_time = time.time() self.metrics['flows_executed'] += 1 # Get flow definition flow = self.flows.get(flow_name) if not flow: raise ValueError(f"Flow '{flow_name}' not found. Available flows: {list(self.flows.keys())}") # Check cache for entire flow result cache_key = f"flow_{flow_name}" cached_result = self.cache.get_or_compute( cache_key, inputs, lambda i: None, # Don't compute yet force_refresh=False ) if cached_result is not None: self.metrics['cache_hits'] += 1 logger.info(f"Flow '{flow_name}' returned from cache") return cached_result results = { 'flow_name': flow_name, 'description': flow.get('description', ''), 'timestamp': datetime.now().isoformat(), 'agents': {} } # Current inputs that get passed between agents current_inputs = inputs.copy() try: # Execute parallel groups first if flow.get('parallel_groups'): for group_idx, group in enumerate(flow['parallel_groups']): logger.info(f"Executing parallel group {group_idx + 1}: {group}") group_results = await self.execute_parallel(group, current_inputs) results['agents'].update(group_results) # Add results to inputs for next stages for agent_name, result in group_results.items(): current_inputs[f"{agent_name}_result"] = result # Execute sequential agents for agent_name in flow.get('agent_sequence', []): logger.info(f"Executing sequential agent: {agent_name}") agent_result = await self.execute_agent(agent_name, current_inputs) results['agents'][agent_name] = agent_result # Add result to inputs for next agent current_inputs[f"{agent_name}_result"] = agent_result # Cache the complete flow result self.cache.get_or_compute( cache_key, inputs, lambda i: results, force_refresh=True # Store the result ) # Update flow success metrics with self.db.get_connection() as conn: conn.execute(""" UPDATE orchestration_flows SET success_count = success_count + 1, avg_duration_ms = (avg_duration_ms * success_count + ?) / (success_count + 1) WHERE name = ? """, (int((time.time() - start_time) * 1000), flow_name)) conn.commit() except Exception as e: self.metrics['errors'] += 1 logger.error(f"Error in flow '{flow_name}': {str(e)}") # Update failure metrics with self.db.get_connection() as conn: conn.execute(""" UPDATE orchestration_flows SET failure_count = failure_count + 1 WHERE name = ? """, (flow_name,)) conn.commit() results['error'] = str(e) # Record execution time execution_time = int((time.time() - start_time) * 1000) results['execution_ms'] = execution_time self.metrics['total_time_ms'] += execution_time return results async def execute_parallel(self, agent_names: List[str], inputs: Dict) -> Dict: """ Execute multiple agents in parallel Args: agent_names: List of agent names to execute inputs: Input parameters for all agents Returns: Dictionary mapping agent names to their results """ tasks = [] for agent_name in agent_names: task = asyncio.create_task(self.execute_agent(agent_name, inputs)) tasks.append((agent_name, task)) results = {} for agent_name, task in tasks: try: result = await task results[agent_name] = result except Exception as e: logger.error(f"Error in parallel agent '{agent_name}': {str(e)}") results[agent_name] = {'error': str(e)} self.metrics['errors'] += 1 return results async def execute_agent(self, agent_name: str, inputs: Dict) -> Any: """ Execute a single agent with caching Args: agent_name: Name of the agent to execute inputs: Input parameters for the agent Returns: Agent execution result """ self.metrics['agents_executed'] += 1 # Get agent template agent = self.agents.get_agent(agent_name) if not agent: # Try to create a simple aggregator if it doesn't exist if 'aggregator' in agent_name.lower(): return self._aggregate_results(inputs) raise ValueError(f"Agent '{agent_name}' not found") # Use cache for individual agent results def compute_agent(i): return self._run_agent_sync(agent, i) result = await asyncio.get_event_loop().run_in_executor( self.executor, self.cache.get_or_compute, agent_name, inputs, compute_agent, False # Don't force refresh ) return result def _run_agent_sync(self, agent, inputs: Dict) -> Any: """ Synchronously execute agent template code Args: agent: Agent template inputs: Input parameters Returns: Agent execution result """ try: # Use the agent template manager's execution return self.agents.execute_agent(agent.name, inputs) except Exception as e: logger.error(f"Error executing agent '{agent.name}': {str(e)}") raise def _aggregate_results(self, inputs: Dict) -> Dict: """ Simple result aggregator for review flows Args: inputs: Dictionary containing results from other agents Returns: Aggregated results with summary """ aggregated = { 'summary': {}, 'issues': [], 'suggestions': [], 'metrics': {} } # Aggregate results from all agent outputs in inputs for key, value in inputs.items(): if key.endswith('_result') and isinstance(value, dict): # Extract issues if 'unused' in value: for item in value.get('unused', []): aggregated['issues'].append({ 'type': 'unused_import', 'severity': 'low', 'detail': item }) # Extract complexity issues if 'complexity' in value and value.get('complexity', 0) > 10: aggregated['issues'].append({ 'type': 'high_complexity', 'severity': 'medium', 'complexity': value['complexity'] }) # Extract test gaps if 'untested' in value: for func in value.get('untested', []): aggregated['issues'].append({ 'type': 'missing_test', 'severity': 'medium', 'function': func }) # Collect metrics if 'count' in value: aggregated['metrics'][key.replace('_result', '_count')] = value['count'] # Generate summary aggregated['summary'] = { 'total_issues': len(aggregated['issues']), 'high_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'high']), 'medium_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'medium']), 'low_severity': len([i for i in aggregated['issues'] if i.get('severity') == 'low']) } return aggregated def get_metrics(self) -> Dict: """Get orchestration engine metrics""" cache_stats = self.cache.get_stats() return { 'orchestration': self.metrics, 'cache': cache_stats, 'flows_available': len(self.flows) } def list_flows(self) -> List[Dict]: """List all available orchestration flows""" flows_list = [] for name, flow in self.flows.items(): flows_list.append({ 'name': name, 'description': flow.get('description', ''), 'agents': len(flow.get('agent_sequence', [])) + sum(len(g) for g in flow.get('parallel_groups', [])), 'cache_strategy': flow.get('cache_strategy', 'conservative') }) return flows_list async def warm_cache(self, patterns: Optional[List[str]] = None): """ Warm the cache with common operations Args: patterns: List of file patterns to warm cache for """ if patterns is None: patterns = ['*.py', '*.js', '*.ts'] logger.info("Warming cache for common operations...") # Common flows to warm warm_flows = ['import_optimizer', 'test_gap_analyzer'] for pattern in patterns: for flow_name in warm_flows: try: # Execute flow to warm cache await self.execute_flow(flow_name, {'pattern': pattern}) except: pass # Ignore errors during warming logger.info("Cache warming complete") def shutdown(self): """Cleanup resources""" self.executor.shutdown(wait=True)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server