Skip to main content
Glama
delegation.py17.2 kB
"""Delegation engine for routing tasks to orchestrators.""" import re import logging from typing import Any from datetime import datetime from .config import DelegationConfig, DelegationRule from .orchestrator import OrchestratorRegistry from .retry import retry_with_backoff from .logging_config import delegation_logger logger = logging.getLogger(__name__) class DelegationResult: """Result of a delegation operation.""" def __init__( self, query: str, orchestrator: str, delegated_to: str | None, rule: DelegationRule | None, output: str, error: str, success: bool, duration: float, ): self.query = query self.orchestrator = orchestrator self.delegated_to = delegated_to self.rule = rule self.output = output self.error = error self.success = success self.duration = duration self.timestamp = datetime.now() def __repr__(self) -> str: delegation = f" -> {self.delegated_to}" if self.delegated_to else "" return f"<DelegationResult {self.orchestrator}{delegation}: {self.success}>" class DelegationEngine: """Engine for delegating tasks based on rules.""" def __init__(self, config: DelegationConfig, registry: OrchestratorRegistry): self.config = config self.registry = registry self.history: list[DelegationResult] = [] async def _execute_with_fallback( self, query: str, ranked_agents: list[str], tried_agents: list[str] = None, progress_callback: Any = None, timeout: int | None = None, ) -> tuple[str, str, str, int]: """ Execute query with automatic fallback to next best agent on failure. Returns: tuple: (target_agent, stdout, stderr, returncode) """ if tried_agents is None: tried_agents = [] for agent in ranked_agents: if agent in tried_agents: continue try: logger.info(f"Executing: {agent}") stdout, stderr, returncode = await self.registry.execute( agent, query, timeout=timeout, progress_callback=progress_callback ) if returncode == 0: logger.info(f"Success: {agent} completed task") return agent, stdout, stderr, returncode # Failed but can try fallback logger.warning(f"Fallback: {agent} failed (rc={returncode}) → trying next agent") tried_agents.append(agent) except (TimeoutError, RuntimeError, Exception) as e: error_type = type(e).__name__ logger.warning(f"Fallback: {agent} error ({error_type}) → trying next agent") tried_agents.append(agent) continue # All agents failed raise RuntimeError(f"All agents failed. Tried: {', '.join(tried_agents)}") async def process( self, query: str, force_delegate: str | None = None, progress_callback: Any = None, guidance_only: bool = False, ) -> DelegationResult: """ Process a query with delegation logic and automatic fallback. Args: query: User query/task force_delegate: Force delegation to specific orchestrator progress_callback: Optional async callback for progress reporting guidance_only: If True, return routing guidance without executing Returns: DelegationResult """ start = datetime.now() orchestrator = "claude" # Determine delegation and get ranked agents target, rule = self._determine_delegation(query, force_delegate) # If guidance_only mode, return routing recommendation without executing if guidance_only: if target == orchestrator: return DelegationResult( query=query, orchestrator=orchestrator, delegated_to=None, rule=rule, output="HANDLE_DIRECTLY", error="", success=True, duration=0.0, ) else: return DelegationResult( query=query, orchestrator=orchestrator, delegated_to=target, rule=rule, output=f"DELEGATE_TO: {target}", error="", success=True, duration=0.0, ) # Get full ranking for fallback if force_delegate: ranked_agents = [force_delegate] else: ranked_agents = self._rank_by_capabilities(query) # Ensure target is executed first if target in ranked_agents: ranked_agents.remove(target) ranked_agents.insert(0, target) if not ranked_agents: ranked_agents = [orchestrator] # Get recommended timeout based on task classification _, recommended_timeout = self._classify_task(query) # Log delegation start delegated_to = target if target != orchestrator else None delegation_logger.delegation_start(orchestrator, query, delegated_to) # Execute with fallback try: actual_agent, stdout, stderr, returncode = await self._execute_with_fallback( query, ranked_agents, progress_callback=progress_callback, timeout=recommended_timeout ) success = returncode == 0 # Update delegated_to if we fell back to different agent if actual_agent != target: logger.info(f"Fallback chain: {target} → {actual_agent}") delegated_to = actual_agent if actual_agent != orchestrator else None else: delegated_to = target if target != orchestrator else None except Exception as e: # All agents failed stdout = "" stderr = str(e) success = False actual_agent = target logger.error(f"All agents failed: {e}") duration = (datetime.now() - start).total_seconds() result = DelegationResult( query=query, orchestrator=orchestrator, delegated_to=delegated_to, rule=rule, output=stdout, error=stderr, success=success, duration=duration, ) # Log result if success: delegation_logger.delegation_success(orchestrator, delegated_to, duration) else: delegation_logger.delegation_failure(orchestrator, delegated_to, stderr, duration) if self.config.log_delegations: self.history.append(result) return result def _estimate_task_complexity(self, query: str) -> str: """ Estimate task complexity to determine if delegation overhead is worth it. Returns: "simple" | "medium" | "complex" Simple tasks: Claude handles directly (delegation overhead > token savings) Medium/Complex tasks: Delegate to specialized agents (token savings > overhead) """ query_lower = query.lower() # SIMPLE: Read-only operations and single-step deterministic commands # These don't benefit from AI - just execute directly simple_patterns = [ r"^git\s+status\s*$", r"^git\s+log", r"^git\s+show", r"^git\s+diff\s+[\w\./\-]+\s*$", # Single file diff r"^git\s+branch\s*(-a|-r)?\s*$", r"^git\s+remote", r"^git\s+stash\s+(list|show)?\s*$", r"^git\s+checkout\s+[\w\-/]+\s*$", # Simple branch switch r"^git\s+checkout\s+-b\s+[\w\-/]+\s*$", # Create branch r"^git\s+add\s+[\w\./\-]+\s*$", # Add specific files r"^git\s+pull\s*$", # Simple pull (no conflicts mentioned) r"^gh\s+pr\s+(view|list)", r"^gh\s+issue\s+list", r"^gh\s+repo\s+view", ] # COMPLEX: Multi-step workflows, content generation, safety-critical operations # These have high token costs or need AI decision-making complex_indicators = [ # Git operations requiring intelligence "commit", # Needs message generation "create a commit", "commit message", "amend", "rebase", "cherry-pick", "squash", "merge conflict", "resolve conflict", "git history", "clean up", "--force", "force push", "force-with-lease", # GitHub operations requiring content generation "create pr", "create pull request", "pr create", "pull request", "create issue", "issue create", "pr review", "review pr", "create release", "release create", # Multi-step workflows "create a pr for", "commit and push", "push my changes", "stage and commit", ] # MEDIUM: Operations that might need error handling but aren't always complex medium_indicators = [ "push -u", "set-upstream", "push origin", "push --tags", "merge", # Might have conflicts "revert", "tag -a", "checkout -b.*origin", # Track remote branch ] # Check simple patterns first for pattern in simple_patterns: if re.match(pattern, query, re.IGNORECASE): logger.debug(f"Complexity: SIMPLE (pattern match: {pattern})") return "simple" # Check complex indicators for indicator in complex_indicators: if indicator in query_lower: logger.debug(f"Complexity: COMPLEX (indicator: {indicator})") return "complex" # Check medium indicators for indicator in medium_indicators: if indicator in query_lower: logger.debug(f"Complexity: MEDIUM (indicator: {indicator})") return "medium" # Default: if query mentions git/github at all, it's medium # Otherwise let task classification determine routing if "git" in query_lower or "gh " in query_lower: logger.debug("Complexity: MEDIUM (default git/gh command)") return "medium" # Not a git/github command - let normal routing decide return "medium" def _classify_task(self, query: str) -> tuple[str, int]: """Classify task type and return recommended timeout.""" query_lower = query.lower() keywords = { "security_audit": ["security", "vulnerability", "audit", "cve", "exploit", "penetration"], "vulnerability_scan": ["scan", "vulnerability", "vuln", "security issue"], "code_review": ["review", "code quality", "best practice", "lint"], "architecture": ["architecture", "design", "system design", "structure"], "refactoring": ["refactor", "restructure", "clean up", "improve code"], "quick_fix": ["fix", "bug", "error", "issue", "broken"], "documentation": ["document", "docs", "readme", "guide", "explain"], "testing": ["test", "unittest", "integration test", "e2e"], "performance": ["performance", "optimize", "speed", "latency", "benchmark"], "git_workflow": ["commit", "push", "rebase", "merge", "cherry-pick", "squash", "git history"], "github_operations": ["pull request", "pr create", "pr review", "issue create", "release"], } # Timeout presets based on task complexity TIMEOUT_PRESETS = { "quick_fix": 60, # 1 min - simple bug fixes "refactoring": 300, # 5 min - code refactoring "security_audit": 600, # 10 min - comprehensive security review "code_review": 600, # 10 min - full code review "performance": 900, # 15 min - profiling/optimization "testing": 300, # 5 min - test generation "documentation": 180, # 3 min - documentation writing "architecture": 300, # 5 min - design work "vulnerability_scan": 300, # 5 min - automated scanning "git_workflow": 180, # 3 min - git operations "github_operations": 240, # 4 min - GitHub API operations "general": 300, # 5 min - default } for task_type, terms in keywords.items(): if any(term in query_lower for term in terms): timeout = TIMEOUT_PRESETS.get(task_type, 300) return task_type, timeout return "general", 300 def _rank_by_capabilities(self, query: str) -> list[str]: """Rank agents by capability scores for this query.""" task_type, _ = self._classify_task(query) # Unpack tuple, ignore timeout scores = [] for name, config in self.config.orchestrators.items(): if not config.enabled: continue # Get capability score for this task type capability_score = getattr(config.capabilities, task_type, 0.5) # Simple scoring: capability is primary factor score = capability_score scores.append((name, score)) # Sort by score descending scores.sort(key=lambda x: x[1], reverse=True) # Log ranking for transparency if scores: ranking_str = ", ".join([f"{name} ({score:.2f})" for name, score in scores[:3]]) logger.info(f"Task: {task_type} | Ranked: {ranking_str}") # Return agent names in ranked order return [name for name, _ in scores] def _determine_delegation( self, query: str, force_delegate: str | None, ) -> tuple[str, DelegationRule | None]: """ Determine which orchestrator should handle the query using capability-based routing. Returns: tuple: (target_orchestrator, matching_rule) """ # Force delegation overrides everything if force_delegate: logger.info(f"Routing: FORCED → {force_delegate}") return force_delegate, None # Check task complexity first - simple tasks handled directly by Claude complexity = self._estimate_task_complexity(query) if complexity == "simple": logger.info(f"Routing: SIMPLE task → claude (delegation overhead not worth it)") return "claude", None # Check explicit delegation rules rule = self.config.find_delegation_rule(query) if rule: logger.info(f"Routing: {rule.pattern} → {rule.delegate_to} (rule-based)") return rule.delegate_to, rule # Use capability-based routing for medium/complex tasks if self.config.routing_strategy in ["capability", "hybrid"]: ranked = self._rank_by_capabilities(query) if ranked: task_type, _ = self._classify_task(query) # Unpack tuple # If top ranked agent is Claude, check if delegation is still worth it if ranked[0] == "claude" and complexity == "medium": logger.info(f"Routing: {task_type} → claude (best match, medium complexity)") return "claude", None logger.info(f"Routing: {task_type} [{complexity}] → {ranked[0]} (capability-based)") return ranked[0], None # Fallback to primary orchestrator logger.info(f"Routing: DEFAULT → claude") return "claude", None def get_statistics(self) -> dict[str, Any]: """Get delegation statistics.""" if not self.history: return {"total": 0, "by_orchestrator": {}, "delegations": 0} by_orchestrator: dict[str, int] = {} delegations = 0 for result in self.history: target = result.delegated_to or result.orchestrator by_orchestrator[target] = by_orchestrator.get(target, 0) + 1 if result.delegated_to: delegations += 1 return { "total": len(self.history), "by_orchestrator": by_orchestrator, "delegations": delegations, "delegation_rate": delegations / len(self.history) * 100, "success_rate": sum(r.success for r in self.history) / len(self.history) * 100, "avg_duration": sum(r.duration for r in self.history) / len(self.history), } def clear_history(self) -> None: """Clear delegation history.""" self.history.clear()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server