delegation.py•17.2 kB
"""Delegation engine for routing tasks to orchestrators."""
import re
import logging
from typing import Any
from datetime import datetime
from .config import DelegationConfig, DelegationRule
from .orchestrator import OrchestratorRegistry
from .retry import retry_with_backoff
from .logging_config import delegation_logger
logger = logging.getLogger(__name__)
class DelegationResult:
"""Result of a delegation operation."""
def __init__(
self,
query: str,
orchestrator: str,
delegated_to: str | None,
rule: DelegationRule | None,
output: str,
error: str,
success: bool,
duration: float,
):
self.query = query
self.orchestrator = orchestrator
self.delegated_to = delegated_to
self.rule = rule
self.output = output
self.error = error
self.success = success
self.duration = duration
self.timestamp = datetime.now()
def __repr__(self) -> str:
delegation = f" -> {self.delegated_to}" if self.delegated_to else ""
return f"<DelegationResult {self.orchestrator}{delegation}: {self.success}>"
class DelegationEngine:
"""Engine for delegating tasks based on rules."""
def __init__(self, config: DelegationConfig, registry: OrchestratorRegistry):
self.config = config
self.registry = registry
self.history: list[DelegationResult] = []
async def _execute_with_fallback(
self,
query: str,
ranked_agents: list[str],
tried_agents: list[str] = None,
progress_callback: Any = None,
timeout: int | None = None,
) -> tuple[str, str, str, int]:
"""
Execute query with automatic fallback to next best agent on failure.
Returns:
tuple: (target_agent, stdout, stderr, returncode)
"""
if tried_agents is None:
tried_agents = []
for agent in ranked_agents:
if agent in tried_agents:
continue
try:
logger.info(f"Executing: {agent}")
stdout, stderr, returncode = await self.registry.execute(
agent, query, timeout=timeout, progress_callback=progress_callback
)
if returncode == 0:
logger.info(f"Success: {agent} completed task")
return agent, stdout, stderr, returncode
# Failed but can try fallback
logger.warning(f"Fallback: {agent} failed (rc={returncode}) → trying next agent")
tried_agents.append(agent)
except (TimeoutError, RuntimeError, Exception) as e:
error_type = type(e).__name__
logger.warning(f"Fallback: {agent} error ({error_type}) → trying next agent")
tried_agents.append(agent)
continue
# All agents failed
raise RuntimeError(f"All agents failed. Tried: {', '.join(tried_agents)}")
async def process(
self,
query: str,
force_delegate: str | None = None,
progress_callback: Any = None,
guidance_only: bool = False,
) -> DelegationResult:
"""
Process a query with delegation logic and automatic fallback.
Args:
query: User query/task
force_delegate: Force delegation to specific orchestrator
progress_callback: Optional async callback for progress reporting
guidance_only: If True, return routing guidance without executing
Returns:
DelegationResult
"""
start = datetime.now()
orchestrator = "claude"
# Determine delegation and get ranked agents
target, rule = self._determine_delegation(query, force_delegate)
# If guidance_only mode, return routing recommendation without executing
if guidance_only:
if target == orchestrator:
return DelegationResult(
query=query,
orchestrator=orchestrator,
delegated_to=None,
rule=rule,
output="HANDLE_DIRECTLY",
error="",
success=True,
duration=0.0,
)
else:
return DelegationResult(
query=query,
orchestrator=orchestrator,
delegated_to=target,
rule=rule,
output=f"DELEGATE_TO: {target}",
error="",
success=True,
duration=0.0,
)
# Get full ranking for fallback
if force_delegate:
ranked_agents = [force_delegate]
else:
ranked_agents = self._rank_by_capabilities(query)
# Ensure target is executed first
if target in ranked_agents:
ranked_agents.remove(target)
ranked_agents.insert(0, target)
if not ranked_agents:
ranked_agents = [orchestrator]
# Get recommended timeout based on task classification
_, recommended_timeout = self._classify_task(query)
# Log delegation start
delegated_to = target if target != orchestrator else None
delegation_logger.delegation_start(orchestrator, query, delegated_to)
# Execute with fallback
try:
actual_agent, stdout, stderr, returncode = await self._execute_with_fallback(
query, ranked_agents, progress_callback=progress_callback, timeout=recommended_timeout
)
success = returncode == 0
# Update delegated_to if we fell back to different agent
if actual_agent != target:
logger.info(f"Fallback chain: {target} → {actual_agent}")
delegated_to = actual_agent if actual_agent != orchestrator else None
else:
delegated_to = target if target != orchestrator else None
except Exception as e:
# All agents failed
stdout = ""
stderr = str(e)
success = False
actual_agent = target
logger.error(f"All agents failed: {e}")
duration = (datetime.now() - start).total_seconds()
result = DelegationResult(
query=query,
orchestrator=orchestrator,
delegated_to=delegated_to,
rule=rule,
output=stdout,
error=stderr,
success=success,
duration=duration,
)
# Log result
if success:
delegation_logger.delegation_success(orchestrator, delegated_to, duration)
else:
delegation_logger.delegation_failure(orchestrator, delegated_to, stderr, duration)
if self.config.log_delegations:
self.history.append(result)
return result
def _estimate_task_complexity(self, query: str) -> str:
"""
Estimate task complexity to determine if delegation overhead is worth it.
Returns:
"simple" | "medium" | "complex"
Simple tasks: Claude handles directly (delegation overhead > token savings)
Medium/Complex tasks: Delegate to specialized agents (token savings > overhead)
"""
query_lower = query.lower()
# SIMPLE: Read-only operations and single-step deterministic commands
# These don't benefit from AI - just execute directly
simple_patterns = [
r"^git\s+status\s*$",
r"^git\s+log",
r"^git\s+show",
r"^git\s+diff\s+[\w\./\-]+\s*$", # Single file diff
r"^git\s+branch\s*(-a|-r)?\s*$",
r"^git\s+remote",
r"^git\s+stash\s+(list|show)?\s*$",
r"^git\s+checkout\s+[\w\-/]+\s*$", # Simple branch switch
r"^git\s+checkout\s+-b\s+[\w\-/]+\s*$", # Create branch
r"^git\s+add\s+[\w\./\-]+\s*$", # Add specific files
r"^git\s+pull\s*$", # Simple pull (no conflicts mentioned)
r"^gh\s+pr\s+(view|list)",
r"^gh\s+issue\s+list",
r"^gh\s+repo\s+view",
]
# COMPLEX: Multi-step workflows, content generation, safety-critical operations
# These have high token costs or need AI decision-making
complex_indicators = [
# Git operations requiring intelligence
"commit", # Needs message generation
"create a commit",
"commit message",
"amend",
"rebase",
"cherry-pick",
"squash",
"merge conflict",
"resolve conflict",
"git history",
"clean up",
"--force",
"force push",
"force-with-lease",
# GitHub operations requiring content generation
"create pr",
"create pull request",
"pr create",
"pull request",
"create issue",
"issue create",
"pr review",
"review pr",
"create release",
"release create",
# Multi-step workflows
"create a pr for",
"commit and push",
"push my changes",
"stage and commit",
]
# MEDIUM: Operations that might need error handling but aren't always complex
medium_indicators = [
"push -u",
"set-upstream",
"push origin",
"push --tags",
"merge", # Might have conflicts
"revert",
"tag -a",
"checkout -b.*origin", # Track remote branch
]
# Check simple patterns first
for pattern in simple_patterns:
if re.match(pattern, query, re.IGNORECASE):
logger.debug(f"Complexity: SIMPLE (pattern match: {pattern})")
return "simple"
# Check complex indicators
for indicator in complex_indicators:
if indicator in query_lower:
logger.debug(f"Complexity: COMPLEX (indicator: {indicator})")
return "complex"
# Check medium indicators
for indicator in medium_indicators:
if indicator in query_lower:
logger.debug(f"Complexity: MEDIUM (indicator: {indicator})")
return "medium"
# Default: if query mentions git/github at all, it's medium
# Otherwise let task classification determine routing
if "git" in query_lower or "gh " in query_lower:
logger.debug("Complexity: MEDIUM (default git/gh command)")
return "medium"
# Not a git/github command - let normal routing decide
return "medium"
def _classify_task(self, query: str) -> tuple[str, int]:
"""Classify task type and return recommended timeout."""
query_lower = query.lower()
keywords = {
"security_audit": ["security", "vulnerability", "audit", "cve", "exploit", "penetration"],
"vulnerability_scan": ["scan", "vulnerability", "vuln", "security issue"],
"code_review": ["review", "code quality", "best practice", "lint"],
"architecture": ["architecture", "design", "system design", "structure"],
"refactoring": ["refactor", "restructure", "clean up", "improve code"],
"quick_fix": ["fix", "bug", "error", "issue", "broken"],
"documentation": ["document", "docs", "readme", "guide", "explain"],
"testing": ["test", "unittest", "integration test", "e2e"],
"performance": ["performance", "optimize", "speed", "latency", "benchmark"],
"git_workflow": ["commit", "push", "rebase", "merge", "cherry-pick", "squash", "git history"],
"github_operations": ["pull request", "pr create", "pr review", "issue create", "release"],
}
# Timeout presets based on task complexity
TIMEOUT_PRESETS = {
"quick_fix": 60, # 1 min - simple bug fixes
"refactoring": 300, # 5 min - code refactoring
"security_audit": 600, # 10 min - comprehensive security review
"code_review": 600, # 10 min - full code review
"performance": 900, # 15 min - profiling/optimization
"testing": 300, # 5 min - test generation
"documentation": 180, # 3 min - documentation writing
"architecture": 300, # 5 min - design work
"vulnerability_scan": 300, # 5 min - automated scanning
"git_workflow": 180, # 3 min - git operations
"github_operations": 240, # 4 min - GitHub API operations
"general": 300, # 5 min - default
}
for task_type, terms in keywords.items():
if any(term in query_lower for term in terms):
timeout = TIMEOUT_PRESETS.get(task_type, 300)
return task_type, timeout
return "general", 300
def _rank_by_capabilities(self, query: str) -> list[str]:
"""Rank agents by capability scores for this query."""
task_type, _ = self._classify_task(query) # Unpack tuple, ignore timeout
scores = []
for name, config in self.config.orchestrators.items():
if not config.enabled:
continue
# Get capability score for this task type
capability_score = getattr(config.capabilities, task_type, 0.5)
# Simple scoring: capability is primary factor
score = capability_score
scores.append((name, score))
# Sort by score descending
scores.sort(key=lambda x: x[1], reverse=True)
# Log ranking for transparency
if scores:
ranking_str = ", ".join([f"{name} ({score:.2f})" for name, score in scores[:3]])
logger.info(f"Task: {task_type} | Ranked: {ranking_str}")
# Return agent names in ranked order
return [name for name, _ in scores]
def _determine_delegation(
self,
query: str,
force_delegate: str | None,
) -> tuple[str, DelegationRule | None]:
"""
Determine which orchestrator should handle the query using capability-based routing.
Returns:
tuple: (target_orchestrator, matching_rule)
"""
# Force delegation overrides everything
if force_delegate:
logger.info(f"Routing: FORCED → {force_delegate}")
return force_delegate, None
# Check task complexity first - simple tasks handled directly by Claude
complexity = self._estimate_task_complexity(query)
if complexity == "simple":
logger.info(f"Routing: SIMPLE task → claude (delegation overhead not worth it)")
return "claude", None
# Check explicit delegation rules
rule = self.config.find_delegation_rule(query)
if rule:
logger.info(f"Routing: {rule.pattern} → {rule.delegate_to} (rule-based)")
return rule.delegate_to, rule
# Use capability-based routing for medium/complex tasks
if self.config.routing_strategy in ["capability", "hybrid"]:
ranked = self._rank_by_capabilities(query)
if ranked:
task_type, _ = self._classify_task(query) # Unpack tuple
# If top ranked agent is Claude, check if delegation is still worth it
if ranked[0] == "claude" and complexity == "medium":
logger.info(f"Routing: {task_type} → claude (best match, medium complexity)")
return "claude", None
logger.info(f"Routing: {task_type} [{complexity}] → {ranked[0]} (capability-based)")
return ranked[0], None
# Fallback to primary orchestrator
logger.info(f"Routing: DEFAULT → claude")
return "claude", None
def get_statistics(self) -> dict[str, Any]:
"""Get delegation statistics."""
if not self.history:
return {"total": 0, "by_orchestrator": {}, "delegations": 0}
by_orchestrator: dict[str, int] = {}
delegations = 0
for result in self.history:
target = result.delegated_to or result.orchestrator
by_orchestrator[target] = by_orchestrator.get(target, 0) + 1
if result.delegated_to:
delegations += 1
return {
"total": len(self.history),
"by_orchestrator": by_orchestrator,
"delegations": delegations,
"delegation_rate": delegations / len(self.history) * 100,
"success_rate": sum(r.success for r in self.history) / len(self.history) * 100,
"avg_duration": sum(r.duration for r in self.history) / len(self.history),
}
def clear_history(self) -> None:
"""Clear delegation history."""
self.history.clear()