intelligent_router.pyโข34.6 kB
"""Intelligent router using tiny LLM for call routing."""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
logger = logging.getLogger(__name__)
class IntelligentRouter:
"""Intelligent router for MCP calls using tiny LLM."""
def __init__(self):
from ..utils.config import Config
self.capabilities: Dict[str, Dict] = {}
self._model_loaded = False
self.config = Config()
self._acp_enabled = self.config.get('acp_enabled', True)
self.tokenizer = None
self.model = None
# Initialize dynamic registries (lazy loading)
self._capability_registry = None
self._execution_registry = None
self._system_context = None
# System prompt will be built dynamically
self.system_prompt = None
async def _get_registries(self):
"""Lazy load registries with persistence."""
if self._capability_registry is None:
from katamari_mcp.router.capability_registry import CapabilityRegistry
from katamari_mcp.router.registry_persistence import get_persistence
persistence = get_persistence()
self._capability_registry = CapabilityRegistry(persistence)
await self._capability_registry.load_capabilities()
if self._execution_registry is None:
from katamari_mcp.router.execution_patterns import ExecutionPatternRegistry
from katamari_mcp.router.registry_persistence import get_persistence
persistence = get_persistence()
self._execution_registry = ExecutionPatternRegistry(persistence)
await self._execution_registry.load_patterns()
if self._system_context is None:
from katamari_mcp.router.system_context import SystemContextManager
from katamari_mcp.router.registry_persistence import get_persistence
persistence = get_persistence()
self._system_context = SystemContextManager(persistence)
await self._system_context.load_context()
return self._capability_registry, self._execution_registry, self._system_context
async def _build_dynamic_system_prompt(self) -> str:
"""Build dynamic system prompt from registries."""
if self.system_prompt is None:
# Get registries (lazy loading)
cap_registry, exec_registry, sys_context = await self._get_registries()
# Get capabilities from registry
capabilities = cap_registry.get_all_capabilities()
# Get execution patterns
patterns = exec_registry.get_all_patterns()
# Get system context
context = sys_context.get_system_overview()
# Build capabilities section with heuristics
capabilities_text = "## Available Capabilities:\n\n"
for cap in capabilities:
capabilities_text += f"- **{cap.name}**: {cap.description}\n"
capabilities_text += f" - Use for: {', '.join(cap.tags)}\n"
# Add heuristic information if available
if hasattr(cap, 'risk_level'):
capabilities_text += f" - Risk Level: {cap.risk_level}\n"
if hasattr(cap, 'approval_required'):
capabilities_text += f" - Approval Required: {cap.approval_required}\n"
if hasattr(cap, 'testing_level'):
capabilities_text += f" - Testing Level: {cap.testing_level}\n"
if cap.input_schema:
required_args = cap.input_schema.get('required', [])
if required_args:
capabilities_text += f" - Required args: {', '.join(required_args)}\n"
capabilities_text += "\n"
# Build execution patterns section
patterns_text = "## Execution Patterns:\n\n"
for pattern in patterns:
patterns_text += f"- **{pattern.name}**: {pattern.description}\n"
patterns_text += f" - Strategy: {pattern.strategy.value}\n"
patterns_text += f" - Use when: {', '.join(pattern.examples[:1]) if pattern.examples else 'Complex workflows'}\n\n"
# Build system context section
context_text = f"## System Context:\n\n"
context_text += f"- System Health: {context.get('health_score', 'Unknown')}/100\n"
context_text += f"- Active Workflows: {context.get('active_workflows', 0)}\n"
context_text += f"- Available Capabilities: {len(capabilities)}\n\n"
# Combine into full system prompt with dynamic content
self.system_prompt = f"""You are an intelligent MCP (Model Context Protocol) router with advanced reasoning capabilities. Your role is to analyze user requests and determine the optimal routing strategy using dynamic execution patterns.
## Core Responsibilities:
1. **Intent Analysis**: Understand user goals beyond literal tool names
2. **Capability Matching**: Map requests to the most appropriate capabilities
3. **Execution Strategy Selection**: Choose optimal execution pattern (direct, sequential, parallel, adaptive)
4. **Argument Enhancement**: Suggest argument improvements when beneficial
5. **Workflow Composition**: Identify opportunities for multi-capability workflows
6. **Error Prevention**: Catch potential issues before execution
7. **Safety-Aware Routing**: Apply heuristic safety checks for all operations
{capabilities_text}
{patterns_text}
{context_text}
## Routing Strategy:
### 1. Direct Mapping:
If the request explicitly names a capability, validate arguments and proceed with direct execution.
### 2. Intent Inference:
Analyze the user's goal and suggest the best capability even if not explicitly named.
### 3. Execution Pattern Selection:
Based on request complexity, choose appropriate execution strategy.
### 4. Workflow Detection:
Identify requests that could benefit from multiple capabilities in sequence.
### 5. Argument Enhancement:
Suggest improvements to arguments for better results.
### 6. Safety-First Routing:
Always check heuristic safety levels before execution. High-risk operations require approval.
## Response Format:
Provide a structured analysis with:
- **analysis**: Brief reasoning about the request
- **recommended_capability**: Primary capability to use
- **execution_strategy**: DIRECT/SEQUENTIAL/PARALLEL/ADAPTIVE
- **argument_suggestions**: Optional improvements to arguments
- **workflow_suggestions**: Optional multi-capability workflow
- **confidence**: High/Medium/Low confidence in routing decision
- **safety_assessment**: Safe/NeedsApproval/Blocked based on heuristics
## Examples:
User: "Check how the system is doing"
โ analysis: User wants system status information
โ recommended_capability: acp_inspect
โ execution_strategy: DIRECT
โ confidence: High
โ safety_assessment: Safe
User: "Search for python tutorials and extract the main points"
โ analysis: User wants web research followed by content extraction
โ recommended_capability: web_search (first), then web_scrape
โ execution_strategy: SEQUENTIAL
โ workflow_suggestions: ["web_search", "web_scrape"]
โ confidence: High
โ safety_assessment: Safe
User: "Compare performance of all capabilities"
โ analysis: User wants comprehensive performance analysis across multiple capabilities
โ recommended_capability: acp_performance_metrics
โ execution_strategy: PARALLEL
โ workflow_suggestions: ["acp_performance_metrics", "acp_feedback_summary"]
โ confidence: Medium
โ safety_assessment: Safe
Remember: You are a router+executor with heuristic governance. Your job is intelligent routing, execution strategy selection, safety assessment, and guidance."""
return self.system_prompt
async def load_model(self):
"""Load the tiny LLM model for routing."""
try:
model_name = "Qwen/Qwen2-0.5B-Instruct"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
self._model_loaded = True
logger.info(f"Router model {model_name} loaded successfully")
except Exception as e:
logger.error(f"Failed to load model: {e}")
self._model_loaded = False
async def _route_with_llm(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Use tiny LLM to route and enhance the call."""
if not self._model_loaded:
await self.load_model()
if not self._model_loaded or self.model is None or self.tokenizer is None:
# Fallback to simple routing
return await self._route_simple(tool_name, arguments)
try:
# Build dynamic system prompt if needed
if self.system_prompt is None:
self.system_prompt = await self._build_dynamic_system_prompt()
# Create enhanced routing prompt
user_request = f"Tool: {tool_name}\nArguments: {arguments}"
# Combine system prompt with user request
full_prompt = f"{self.system_prompt}\n\n## User Request:\n{user_request}\n\n## Routing Analysis:"
# Generate routing decision with more tokens for detailed analysis
inputs = self.tokenizer(full_prompt, return_tensors="pt", truncation=True, max_length=1024)
# Move inputs to same device as model
device = next(self.model.parameters()).device
input_ids = inputs['input_ids'].to(device)
attention_mask = inputs.get('attention_mask')
if attention_mask is not None:
attention_mask = attention_mask.to(device)
with torch.no_grad():
outputs = self.model.generate(
input_ids=input_ids,
max_new_tokens=200, # Increased for detailed analysis
temperature=0.3, # Slightly higher for creativity
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
attention_mask=attention_mask
)
response = self.tokenizer.decode(outputs[0][input_ids.shape[1]:], skip_special_tokens=True)
logger.info(f"LLM routing analysis: {response}")
# Parse the LLM response to extract routing decision
routing_decision = self._parse_routing_response(response, tool_name, arguments)
# Apply heuristic safety checks
safety_check = await self._apply_heuristic_safety_checks(tool_name, arguments)
# Determine optimal execution strategy using heuristics
execution_strategy = await self._determine_execution_strategy(
routing_decision.get('recommended_capability', tool_name),
routing_decision.get('argument_suggestions', arguments),
routing_decision.get('execution_strategy', 'DIRECT')
)
logger.info(f"LLM analysis: {routing_decision.get('analysis', 'No analysis')}")
logger.info(f"LLM suggested: {routing_decision.get('recommended_capability', tool_name)}")
logger.info(f"Heuristic safety check: {safety_check}")
logger.info(f"Execution strategy: {execution_strategy}")
# Apply routing decision with heuristic constraints
final_tool = routing_decision.get('recommended_capability', tool_name)
final_args = routing_decision.get('argument_suggestions', arguments)
# Override with safer options if heuristics require it
if not safety_check.get('safe_to_execute', True):
logger.warning(f"Heuristic safety check failed for {final_tool}")
# Fall back to safer capability or require approval
if safety_check.get('safer_alternative'):
final_tool = safety_check['safer_alternative']
logger.info(f"Using safer alternative: {final_tool}")
else:
return f"Execution blocked by heuristic safety checks: {safety_check.get('reason', 'Unknown safety concern')}"
# Execute with heuristic-enhanced routing and strategy
return await self._execute_with_strategy(final_tool, final_args, execution_strategy, routing_decision)
except Exception as e:
logger.error(f"LLM routing failed: {e}")
return await self._route_simple(tool_name, arguments)
async def list_capabilities(self) -> List[Dict]:
"""List all available capabilities using dynamic registry with heuristics."""
if not self._model_loaded:
await self.load_model()
# Get capabilities from registry with heuristics
cap_registry, _, _ = await self._get_registries()
capabilities = cap_registry.get_all_capabilities()
# Convert to dict format for MCP compatibility
return [cap.to_dict() for cap in capabilities]
async def route_call(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Route a call to the appropriate capability."""
if not self._model_loaded:
await self.load_model()
# Use LLM for intelligent routing
return await self._route_with_llm(tool_name, arguments)
async def _route_simple(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Simple routing fallback."""
if tool_name == "web_search":
return await self._handle_web_search(arguments)
elif tool_name == "web_scrape":
return await self._handle_web_scrape(arguments)
elif tool_name.startswith("acp_"):
return await self._handle_acp_call(tool_name, arguments)
else:
raise ValueError(f"Unknown capability: {tool_name}")
async def _handle_web_search(self, arguments: Dict[str, Any]) -> str:
"""Handle web search calls."""
from ..capabilities.web_search import web_search
query = arguments.get('query', '')
max_results = arguments.get('max_results', 5)
try:
result = await web_search(query, max_results)
return f"Web search results: {result}"
except Exception as e:
logger.error(f"Web search failed: {e}")
return f"Web search error: {str(e)}"
async def _handle_web_scrape(self, arguments: Dict[str, Any]) -> str:
"""Handle web scraping calls."""
from ..capabilities.web_scrape import web_scrape
url = arguments.get('url', '')
format = arguments.get('format', 'markdown')
try:
result = await web_scrape(url, format)
return f"Web scraping results: {result}"
except Exception as e:
logger.error(f"Web scraping failed: {e}")
return f"Web scraping error: {str(e)}"
async def _handle_acp_call(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Handle ACP capability calls."""
# Import here to avoid circular imports
from ..acp import ACPController
from ..acp.adaptive_learning import AdaptiveLearningEngine
from ..acp.feedback import FeedbackCollector
from ..acp.performance_tracker import PerformanceTracker
acp_controller = ACPController()
try:
if tool_name == "acp_inspect":
result = await acp_controller.inspect_capabilities()
return f"System inspection: {result}"
elif tool_name == "acp_propose":
need = arguments.get("need", "")
context = arguments.get("context", {})
proposal = await acp_controller.propose_capability(need, context)
return f"Capability proposal: {proposal}"
elif tool_name == "acp_compose":
capabilities = arguments.get("capabilities", [])
workflow_name = arguments.get("workflow_name", "")
workflow = await acp_controller.compose_workflow(capabilities, workflow_name)
return f"Composed workflow: {workflow}"
elif tool_name == "acp_heal":
issue = arguments.get("issue", {})
healing_plan = await acp_controller.heal_system(issue)
return f"Healing plan: {healing_plan}"
# Phase 2 feedback endpoints
elif tool_name == "acp_feedback_submit":
from ..acp.feedback import FeedbackCollector, FeedbackSubmission, FeedbackType, FeedbackSource
from ..acp.adaptive_learning import AdaptiveLearningEngine
capability_id = arguments.get("capability_id", "")
rating = arguments.get("rating")
comment = arguments.get("comment")
learning_engine = AdaptiveLearningEngine(self.config)
feedback_collector = FeedbackCollector(self.config, learning_engine)
submission = FeedbackSubmission(
capability_id=capability_id,
execution_id=None,
feedback_type=FeedbackType.USER_SATISFACTION,
source=FeedbackSource.DIRECT_USER,
rating=rating,
comment=comment
)
result = await feedback_collector.submit_feedback(submission)
return f"Feedback submitted for {capability_id}: {result}"
elif tool_name == "acp_feedback_summary":
from ..acp.feedback import FeedbackCollector
from ..acp.adaptive_learning import AdaptiveLearningEngine
capability_id = arguments.get("capability_id")
days_back = arguments.get("days_back", 30)
learning_engine = AdaptiveLearningEngine(self.config)
feedback_collector = FeedbackCollector(self.config, learning_engine)
summary = await feedback_collector.get_feedback_summary(
capability_id=capability_id,
days_back=days_back
)
return f"Feedback summary for {capability_id or 'all capabilities'}: {summary}"
elif tool_name == "acp_performance_metrics":
from ..acp.performance_tracker import PerformanceTracker
capability_id = arguments.get("capability_id")
days_back = arguments.get("days_back", 7)
perf_tracker = PerformanceTracker(self.config)
if capability_id:
performance_data = await perf_tracker.get_capability_performance(capability_id)
else:
performance_data = await perf_tracker.get_performance_summary(days_back)
return f"Performance metrics for {capability_id or 'all capabilities'}: {performance_data}"
elif tool_name == "acp_learning_summary":
from ..acp.adaptive_learning import AdaptiveLearningEngine
learning_engine = AdaptiveLearningEngine(self.config)
summary = await learning_engine.get_learning_summary()
return f"Learning summary: {summary}"
else:
raise ValueError(f"Unknown ACP capability: {tool_name}")
except Exception as e:
return f"ACP operation failed: {str(e)}"
async def format_error(self, error: Exception) -> str:
"""Format error message using LLM for better context."""
if not self._model_loaded or not self.tokenizer or not self.model:
return f"Error: {str(error)}\n\nPlease check your inputs and try again."
try:
prompt = f"""Format this error message to be more helpful and actionable for a user:
Error: {str(error)}
Error Type: {type(error).__name__}
Provide a clear explanation of what went wrong and suggest specific steps to fix it."""
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
outputs = self.model.generate(
inputs.input_ids,
max_length=inputs.input_ids.shape[1] + 200,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
formatted_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return formatted_response[len(prompt):].strip()
except Exception as e:
logger.warning(f"Failed to format error with LLM: {e}")
return f"Error: {str(error)}\n\nPlease check your inputs and try again."
def _parse_routing_response(self, response: str, original_tool: str, original_args: Dict[str, Any]) -> Dict[str, Any]:
"""Parse LLM routing response to extract structured decision."""
routing_decision = {
'analysis': '',
'recommended_capability': original_tool,
'execution_strategy': 'DIRECT', # New field
'argument_suggestions': original_args,
'workflow_suggestions': [],
'confidence': 'Medium'
}
try:
# Extract key information from response
lines = response.strip().split('\n')
for line in lines:
line = line.strip()
# Look for structured fields
if line.lower().startswith('analysis:'):
routing_decision['analysis'] = line.split(':', 1)[1].strip()
elif line.lower().startswith('recommended_capability:'):
cap = line.split(':', 1)[1].strip()
# Validate that capability exists
valid_caps = ['web_search', 'web_scrape', 'acp_inspect', 'acp_propose',
'acp_compose', 'acp_heal', 'acp_feedback_submit',
'acp_feedback_summary', 'acp_performance_metrics',
'acp_learning_summary']
if cap in valid_caps:
routing_decision['recommended_capability'] = cap
elif line.lower().startswith('argument_suggestions:'):
# Try to parse JSON argument suggestions
try:
import json
args_str = line.split(':', 1)[1].strip()
if args_str.startswith('{') and args_str.endswith('}'):
routing_decision['argument_suggestions'] = json.loads(args_str)
except:
pass
elif line.lower().startswith('workflow_suggestions:'):
# Try to parse workflow suggestions
try:
import json
workflow_str = line.split(':', 1)[1].strip()
if workflow_str.startswith('[') and workflow_str.endswith(']'):
routing_decision['workflow_suggestions'] = json.loads(workflow_str)
except:
pass
elif line.lower().startswith('confidence:'):
conf = line.split(':', 1)[1].strip().lower()
if conf in ['high', 'medium', 'low']:
routing_decision['confidence'] = conf.capitalize()
# If no structured format found, try to extract capability name from text
if routing_decision['recommended_capability'] == original_tool:
# Look for capability names in response
valid_caps = ['web_search', 'web_scrape', 'acp_inspect', 'acp_propose',
'acp_compose', 'acp_heal', 'acp_feedback_submit',
'acp_feedback_summary', 'acp_performance_metrics',
'acp_learning_summary']
for cap in valid_caps:
if cap in response.lower() and cap != original_tool:
routing_decision['recommended_capability'] = cap
break
except Exception as e:
logger.warning(f"Failed to parse routing response: {e}")
return routing_decision
async def _determine_execution_strategy(self, tool_name: str, arguments: Dict[str, Any], llm_suggestion: str = "DIRECT") -> str:
"""Determine optimal execution strategy using heuristics."""
try:
# Get registries for strategy evaluation
cap_registry, exec_registry, _ = await self._get_registries()
# Get capability metadata
capability = cap_registry.get_capability(tool_name)
if not capability:
return "DIRECT" # Fallback for unknown capabilities
# Get execution patterns
patterns = exec_registry.get_all_patterns()
# Base strategy from heuristics
risk_level = capability.risk_level
complexity = capability.complexity
# Simple capabilities with low risk -> DIRECT
if risk_level == "low" and complexity in ["simple"]:
return "DIRECT"
# Medium complexity or risk -> SEQUENTIAL if workflow detected
if complexity in ["moderate"] or risk_level == "medium":
# Check if this might be part of a workflow
if any(keyword in tool_name.lower() for keyword in ["compose", "heal", "performance", "feedback"]):
return "SEQUENTIAL"
return "DIRECT"
# High complexity or multi-step operations -> ADAPTIVE
if complexity in ["complex", "architectural"] or risk_level == "high":
return "ADAPTIVE"
# Analytical operations that might benefit from parallel processing
if any(keyword in tool_name.lower() for keyword in ["performance", "feedback", "metrics"]):
return "PARALLEL"
# Web operations that might be chained
if tool_name.startswith("web_"):
# If this is a web operation and arguments suggest multiple URLs/queries
if isinstance(arguments.get('query'), list) or isinstance(arguments.get('urls'), list):
return "PARALLEL"
return "DIRECT"
# Respect LLM suggestion if it makes sense
valid_strategies = ["DIRECT", "SEQUENTIAL", "PARALLEL", "ADAPTIVE"]
if llm_suggestion.upper() in valid_strategies:
return llm_suggestion.upper()
return "DIRECT" # Default fallback
except Exception as e:
logger.warning(f"Failed to determine execution strategy: {e}")
return "DIRECT"
async def _apply_heuristic_safety_checks(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Apply heuristic safety checks to routing decisions."""
try:
# Get registries for safety evaluation
cap_registry, _, _ = await self._get_registries()
# Get capability metadata
capability = cap_registry.get_capability(tool_name)
if not capability:
return {"safe_to_execute": False, "reason": f"Unknown capability: {tool_name}"}
# Evaluate operation safety
operation = {
"type": "execute",
"target": tool_name,
"arguments": arguments,
"dependencies": capability.dependencies or []
}
safety_evaluation = cap_registry.evaluate_operation_safety(operation)
# Determine if execution is safe
safe_to_execute = (
safety_evaluation["can_auto_approve"] and
not safety_evaluation["needs_manual_approval"] and
safety_evaluation["safety_score"] >= 0.5
)
result = {
"safe_to_execute": safe_to_execute,
"safety_score": safety_evaluation["safety_score"],
"heuristic_tags": safety_evaluation["heuristic_tags"],
"needs_approval": safety_evaluation["needs_manual_approval"],
"needs_testing": safety_evaluation["needs_parallel_testing"]
}
# Suggest safer alternatives if needed
if not safe_to_execute:
safer_caps = cap_registry.get_safe_capabilities()
if safer_caps:
result["safer_alternative"] = safer_caps[0].name
result["reason"] = f"Original operation requires manual approval. Consider using {safer_caps[0].name} instead."
else:
result["reason"] = "Operation requires manual approval and no safer alternatives available."
return result
except Exception as e:
logger.error(f"Heuristic safety check failed: {e}")
return {"safe_to_execute": False, "reason": f"Safety check error: {str(e)}"}
async def _execute_with_strategy(self, tool_name: str, arguments: Dict[str, Any], strategy: str, routing_decision: Dict[str, Any]) -> str:
"""Execute capability using the specified strategy."""
try:
if strategy == "DIRECT":
# Simple direct execution
return await self._route_simple(tool_name, arguments)
elif strategy == "SEQUENTIAL":
# Sequential execution for workflows
workflow_suggestions = routing_decision.get('workflow_suggestions', [])
if workflow_suggestions:
results = []
for step in workflow_suggestions:
if isinstance(step, str):
step_args = arguments if step == tool_name else {}
result = await self._route_simple(step, step_args)
results.append(f"{step}: {result}")
return "Sequential workflow results:\n" + "\n".join(results)
else:
return await self._route_simple(tool_name, arguments)
elif strategy == "PARALLEL":
# Parallel execution for independent operations
workflow_suggestions = routing_decision.get('workflow_suggestions', [tool_name])
if len(workflow_suggestions) > 1:
# Execute in parallel
tasks = []
for step in workflow_suggestions:
if isinstance(step, str):
step_args = arguments if step == tool_name else {}
task = asyncio.create_task(self._route_simple(step, step_args))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
formatted_results = []
for i, result in enumerate(results):
step_name = workflow_suggestions[i]
if isinstance(result, Exception):
formatted_results.append(f"{step_name}: ERROR - {str(result)}")
else:
formatted_results.append(f"{step_name}: {result}")
return "Parallel execution results:\n" + "\n".join(formatted_results)
else:
return await self._route_simple(tool_name, arguments)
elif strategy == "ADAPTIVE":
# Adaptive execution based on results
# Start with the primary capability
primary_result = await self._route_simple(tool_name, arguments)
# Analyze result and decide on next steps
workflow_suggestions = routing_decision.get('workflow_suggestions', [])
if workflow_suggestions and "error" not in primary_result.lower():
# Continue with workflow if primary succeeded
additional_results = []
for step in workflow_suggestions[1:]: # Skip first (already executed)
if isinstance(step, str):
step_args = arguments if step == tool_name else {}
result = await self._route_simple(step, step_args)
additional_results.append(f"{step}: {result}")
if additional_results:
return f"Adaptive execution:\nPrimary: {primary_result}\nAdditional:\n" + "\n".join(additional_results)
return primary_result
else:
# Unknown strategy, fall back to direct
logger.warning(f"Unknown execution strategy: {strategy}, falling back to DIRECT")
return await self._route_simple(tool_name, arguments)
except Exception as e:
logger.error(f"Strategy execution failed: {e}")
# Fallback to simple execution
return await self._route_simple(tool_name, arguments)