Skip to main content
Glama

Katamari MCP Server

by ciphernaut
intelligent_router.pyโ€ข34.6 kB
"""Intelligent router using tiny LLM for call routing.""" import asyncio import json import logging from typing import Dict, List, Optional, Any import torch from transformers import AutoTokenizer, AutoModelForCausalLM logger = logging.getLogger(__name__) class IntelligentRouter: """Intelligent router for MCP calls using tiny LLM.""" def __init__(self): from ..utils.config import Config self.capabilities: Dict[str, Dict] = {} self._model_loaded = False self.config = Config() self._acp_enabled = self.config.get('acp_enabled', True) self.tokenizer = None self.model = None # Initialize dynamic registries (lazy loading) self._capability_registry = None self._execution_registry = None self._system_context = None # System prompt will be built dynamically self.system_prompt = None async def _get_registries(self): """Lazy load registries with persistence.""" if self._capability_registry is None: from katamari_mcp.router.capability_registry import CapabilityRegistry from katamari_mcp.router.registry_persistence import get_persistence persistence = get_persistence() self._capability_registry = CapabilityRegistry(persistence) await self._capability_registry.load_capabilities() if self._execution_registry is None: from katamari_mcp.router.execution_patterns import ExecutionPatternRegistry from katamari_mcp.router.registry_persistence import get_persistence persistence = get_persistence() self._execution_registry = ExecutionPatternRegistry(persistence) await self._execution_registry.load_patterns() if self._system_context is None: from katamari_mcp.router.system_context import SystemContextManager from katamari_mcp.router.registry_persistence import get_persistence persistence = get_persistence() self._system_context = SystemContextManager(persistence) await self._system_context.load_context() return self._capability_registry, self._execution_registry, self._system_context async def _build_dynamic_system_prompt(self) -> str: """Build dynamic system prompt from registries.""" if self.system_prompt is None: # Get registries (lazy loading) cap_registry, exec_registry, sys_context = await self._get_registries() # Get capabilities from registry capabilities = cap_registry.get_all_capabilities() # Get execution patterns patterns = exec_registry.get_all_patterns() # Get system context context = sys_context.get_system_overview() # Build capabilities section with heuristics capabilities_text = "## Available Capabilities:\n\n" for cap in capabilities: capabilities_text += f"- **{cap.name}**: {cap.description}\n" capabilities_text += f" - Use for: {', '.join(cap.tags)}\n" # Add heuristic information if available if hasattr(cap, 'risk_level'): capabilities_text += f" - Risk Level: {cap.risk_level}\n" if hasattr(cap, 'approval_required'): capabilities_text += f" - Approval Required: {cap.approval_required}\n" if hasattr(cap, 'testing_level'): capabilities_text += f" - Testing Level: {cap.testing_level}\n" if cap.input_schema: required_args = cap.input_schema.get('required', []) if required_args: capabilities_text += f" - Required args: {', '.join(required_args)}\n" capabilities_text += "\n" # Build execution patterns section patterns_text = "## Execution Patterns:\n\n" for pattern in patterns: patterns_text += f"- **{pattern.name}**: {pattern.description}\n" patterns_text += f" - Strategy: {pattern.strategy.value}\n" patterns_text += f" - Use when: {', '.join(pattern.examples[:1]) if pattern.examples else 'Complex workflows'}\n\n" # Build system context section context_text = f"## System Context:\n\n" context_text += f"- System Health: {context.get('health_score', 'Unknown')}/100\n" context_text += f"- Active Workflows: {context.get('active_workflows', 0)}\n" context_text += f"- Available Capabilities: {len(capabilities)}\n\n" # Combine into full system prompt with dynamic content self.system_prompt = f"""You are an intelligent MCP (Model Context Protocol) router with advanced reasoning capabilities. Your role is to analyze user requests and determine the optimal routing strategy using dynamic execution patterns. ## Core Responsibilities: 1. **Intent Analysis**: Understand user goals beyond literal tool names 2. **Capability Matching**: Map requests to the most appropriate capabilities 3. **Execution Strategy Selection**: Choose optimal execution pattern (direct, sequential, parallel, adaptive) 4. **Argument Enhancement**: Suggest argument improvements when beneficial 5. **Workflow Composition**: Identify opportunities for multi-capability workflows 6. **Error Prevention**: Catch potential issues before execution 7. **Safety-Aware Routing**: Apply heuristic safety checks for all operations {capabilities_text} {patterns_text} {context_text} ## Routing Strategy: ### 1. Direct Mapping: If the request explicitly names a capability, validate arguments and proceed with direct execution. ### 2. Intent Inference: Analyze the user's goal and suggest the best capability even if not explicitly named. ### 3. Execution Pattern Selection: Based on request complexity, choose appropriate execution strategy. ### 4. Workflow Detection: Identify requests that could benefit from multiple capabilities in sequence. ### 5. Argument Enhancement: Suggest improvements to arguments for better results. ### 6. Safety-First Routing: Always check heuristic safety levels before execution. High-risk operations require approval. ## Response Format: Provide a structured analysis with: - **analysis**: Brief reasoning about the request - **recommended_capability**: Primary capability to use - **execution_strategy**: DIRECT/SEQUENTIAL/PARALLEL/ADAPTIVE - **argument_suggestions**: Optional improvements to arguments - **workflow_suggestions**: Optional multi-capability workflow - **confidence**: High/Medium/Low confidence in routing decision - **safety_assessment**: Safe/NeedsApproval/Blocked based on heuristics ## Examples: User: "Check how the system is doing" โ†’ analysis: User wants system status information โ†’ recommended_capability: acp_inspect โ†’ execution_strategy: DIRECT โ†’ confidence: High โ†’ safety_assessment: Safe User: "Search for python tutorials and extract the main points" โ†’ analysis: User wants web research followed by content extraction โ†’ recommended_capability: web_search (first), then web_scrape โ†’ execution_strategy: SEQUENTIAL โ†’ workflow_suggestions: ["web_search", "web_scrape"] โ†’ confidence: High โ†’ safety_assessment: Safe User: "Compare performance of all capabilities" โ†’ analysis: User wants comprehensive performance analysis across multiple capabilities โ†’ recommended_capability: acp_performance_metrics โ†’ execution_strategy: PARALLEL โ†’ workflow_suggestions: ["acp_performance_metrics", "acp_feedback_summary"] โ†’ confidence: Medium โ†’ safety_assessment: Safe Remember: You are a router+executor with heuristic governance. Your job is intelligent routing, execution strategy selection, safety assessment, and guidance.""" return self.system_prompt async def load_model(self): """Load the tiny LLM model for routing.""" try: model_name = "Qwen/Qwen2-0.5B-Instruct" self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, device_map="auto" ) self._model_loaded = True logger.info(f"Router model {model_name} loaded successfully") except Exception as e: logger.error(f"Failed to load model: {e}") self._model_loaded = False async def _route_with_llm(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Use tiny LLM to route and enhance the call.""" if not self._model_loaded: await self.load_model() if not self._model_loaded or self.model is None or self.tokenizer is None: # Fallback to simple routing return await self._route_simple(tool_name, arguments) try: # Build dynamic system prompt if needed if self.system_prompt is None: self.system_prompt = await self._build_dynamic_system_prompt() # Create enhanced routing prompt user_request = f"Tool: {tool_name}\nArguments: {arguments}" # Combine system prompt with user request full_prompt = f"{self.system_prompt}\n\n## User Request:\n{user_request}\n\n## Routing Analysis:" # Generate routing decision with more tokens for detailed analysis inputs = self.tokenizer(full_prompt, return_tensors="pt", truncation=True, max_length=1024) # Move inputs to same device as model device = next(self.model.parameters()).device input_ids = inputs['input_ids'].to(device) attention_mask = inputs.get('attention_mask') if attention_mask is not None: attention_mask = attention_mask.to(device) with torch.no_grad(): outputs = self.model.generate( input_ids=input_ids, max_new_tokens=200, # Increased for detailed analysis temperature=0.3, # Slightly higher for creativity do_sample=True, pad_token_id=self.tokenizer.eos_token_id, attention_mask=attention_mask ) response = self.tokenizer.decode(outputs[0][input_ids.shape[1]:], skip_special_tokens=True) logger.info(f"LLM routing analysis: {response}") # Parse the LLM response to extract routing decision routing_decision = self._parse_routing_response(response, tool_name, arguments) # Apply heuristic safety checks safety_check = await self._apply_heuristic_safety_checks(tool_name, arguments) # Determine optimal execution strategy using heuristics execution_strategy = await self._determine_execution_strategy( routing_decision.get('recommended_capability', tool_name), routing_decision.get('argument_suggestions', arguments), routing_decision.get('execution_strategy', 'DIRECT') ) logger.info(f"LLM analysis: {routing_decision.get('analysis', 'No analysis')}") logger.info(f"LLM suggested: {routing_decision.get('recommended_capability', tool_name)}") logger.info(f"Heuristic safety check: {safety_check}") logger.info(f"Execution strategy: {execution_strategy}") # Apply routing decision with heuristic constraints final_tool = routing_decision.get('recommended_capability', tool_name) final_args = routing_decision.get('argument_suggestions', arguments) # Override with safer options if heuristics require it if not safety_check.get('safe_to_execute', True): logger.warning(f"Heuristic safety check failed for {final_tool}") # Fall back to safer capability or require approval if safety_check.get('safer_alternative'): final_tool = safety_check['safer_alternative'] logger.info(f"Using safer alternative: {final_tool}") else: return f"Execution blocked by heuristic safety checks: {safety_check.get('reason', 'Unknown safety concern')}" # Execute with heuristic-enhanced routing and strategy return await self._execute_with_strategy(final_tool, final_args, execution_strategy, routing_decision) except Exception as e: logger.error(f"LLM routing failed: {e}") return await self._route_simple(tool_name, arguments) async def list_capabilities(self) -> List[Dict]: """List all available capabilities using dynamic registry with heuristics.""" if not self._model_loaded: await self.load_model() # Get capabilities from registry with heuristics cap_registry, _, _ = await self._get_registries() capabilities = cap_registry.get_all_capabilities() # Convert to dict format for MCP compatibility return [cap.to_dict() for cap in capabilities] async def route_call(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Route a call to the appropriate capability.""" if not self._model_loaded: await self.load_model() # Use LLM for intelligent routing return await self._route_with_llm(tool_name, arguments) async def _route_simple(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Simple routing fallback.""" if tool_name == "web_search": return await self._handle_web_search(arguments) elif tool_name == "web_scrape": return await self._handle_web_scrape(arguments) elif tool_name.startswith("acp_"): return await self._handle_acp_call(tool_name, arguments) else: raise ValueError(f"Unknown capability: {tool_name}") async def _handle_web_search(self, arguments: Dict[str, Any]) -> str: """Handle web search calls.""" from ..capabilities.web_search import web_search query = arguments.get('query', '') max_results = arguments.get('max_results', 5) try: result = await web_search(query, max_results) return f"Web search results: {result}" except Exception as e: logger.error(f"Web search failed: {e}") return f"Web search error: {str(e)}" async def _handle_web_scrape(self, arguments: Dict[str, Any]) -> str: """Handle web scraping calls.""" from ..capabilities.web_scrape import web_scrape url = arguments.get('url', '') format = arguments.get('format', 'markdown') try: result = await web_scrape(url, format) return f"Web scraping results: {result}" except Exception as e: logger.error(f"Web scraping failed: {e}") return f"Web scraping error: {str(e)}" async def _handle_acp_call(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Handle ACP capability calls.""" # Import here to avoid circular imports from ..acp import ACPController from ..acp.adaptive_learning import AdaptiveLearningEngine from ..acp.feedback import FeedbackCollector from ..acp.performance_tracker import PerformanceTracker acp_controller = ACPController() try: if tool_name == "acp_inspect": result = await acp_controller.inspect_capabilities() return f"System inspection: {result}" elif tool_name == "acp_propose": need = arguments.get("need", "") context = arguments.get("context", {}) proposal = await acp_controller.propose_capability(need, context) return f"Capability proposal: {proposal}" elif tool_name == "acp_compose": capabilities = arguments.get("capabilities", []) workflow_name = arguments.get("workflow_name", "") workflow = await acp_controller.compose_workflow(capabilities, workflow_name) return f"Composed workflow: {workflow}" elif tool_name == "acp_heal": issue = arguments.get("issue", {}) healing_plan = await acp_controller.heal_system(issue) return f"Healing plan: {healing_plan}" # Phase 2 feedback endpoints elif tool_name == "acp_feedback_submit": from ..acp.feedback import FeedbackCollector, FeedbackSubmission, FeedbackType, FeedbackSource from ..acp.adaptive_learning import AdaptiveLearningEngine capability_id = arguments.get("capability_id", "") rating = arguments.get("rating") comment = arguments.get("comment") learning_engine = AdaptiveLearningEngine(self.config) feedback_collector = FeedbackCollector(self.config, learning_engine) submission = FeedbackSubmission( capability_id=capability_id, execution_id=None, feedback_type=FeedbackType.USER_SATISFACTION, source=FeedbackSource.DIRECT_USER, rating=rating, comment=comment ) result = await feedback_collector.submit_feedback(submission) return f"Feedback submitted for {capability_id}: {result}" elif tool_name == "acp_feedback_summary": from ..acp.feedback import FeedbackCollector from ..acp.adaptive_learning import AdaptiveLearningEngine capability_id = arguments.get("capability_id") days_back = arguments.get("days_back", 30) learning_engine = AdaptiveLearningEngine(self.config) feedback_collector = FeedbackCollector(self.config, learning_engine) summary = await feedback_collector.get_feedback_summary( capability_id=capability_id, days_back=days_back ) return f"Feedback summary for {capability_id or 'all capabilities'}: {summary}" elif tool_name == "acp_performance_metrics": from ..acp.performance_tracker import PerformanceTracker capability_id = arguments.get("capability_id") days_back = arguments.get("days_back", 7) perf_tracker = PerformanceTracker(self.config) if capability_id: performance_data = await perf_tracker.get_capability_performance(capability_id) else: performance_data = await perf_tracker.get_performance_summary(days_back) return f"Performance metrics for {capability_id or 'all capabilities'}: {performance_data}" elif tool_name == "acp_learning_summary": from ..acp.adaptive_learning import AdaptiveLearningEngine learning_engine = AdaptiveLearningEngine(self.config) summary = await learning_engine.get_learning_summary() return f"Learning summary: {summary}" else: raise ValueError(f"Unknown ACP capability: {tool_name}") except Exception as e: return f"ACP operation failed: {str(e)}" async def format_error(self, error: Exception) -> str: """Format error message using LLM for better context.""" if not self._model_loaded or not self.tokenizer or not self.model: return f"Error: {str(error)}\n\nPlease check your inputs and try again." try: prompt = f"""Format this error message to be more helpful and actionable for a user: Error: {str(error)} Error Type: {type(error).__name__} Provide a clear explanation of what went wrong and suggest specific steps to fix it.""" inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) outputs = self.model.generate( inputs.input_ids, max_length=inputs.input_ids.shape[1] + 200, temperature=0.7, do_sample=True, pad_token_id=self.tokenizer.eos_token_id ) formatted_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) return formatted_response[len(prompt):].strip() except Exception as e: logger.warning(f"Failed to format error with LLM: {e}") return f"Error: {str(error)}\n\nPlease check your inputs and try again." def _parse_routing_response(self, response: str, original_tool: str, original_args: Dict[str, Any]) -> Dict[str, Any]: """Parse LLM routing response to extract structured decision.""" routing_decision = { 'analysis': '', 'recommended_capability': original_tool, 'execution_strategy': 'DIRECT', # New field 'argument_suggestions': original_args, 'workflow_suggestions': [], 'confidence': 'Medium' } try: # Extract key information from response lines = response.strip().split('\n') for line in lines: line = line.strip() # Look for structured fields if line.lower().startswith('analysis:'): routing_decision['analysis'] = line.split(':', 1)[1].strip() elif line.lower().startswith('recommended_capability:'): cap = line.split(':', 1)[1].strip() # Validate that capability exists valid_caps = ['web_search', 'web_scrape', 'acp_inspect', 'acp_propose', 'acp_compose', 'acp_heal', 'acp_feedback_submit', 'acp_feedback_summary', 'acp_performance_metrics', 'acp_learning_summary'] if cap in valid_caps: routing_decision['recommended_capability'] = cap elif line.lower().startswith('argument_suggestions:'): # Try to parse JSON argument suggestions try: import json args_str = line.split(':', 1)[1].strip() if args_str.startswith('{') and args_str.endswith('}'): routing_decision['argument_suggestions'] = json.loads(args_str) except: pass elif line.lower().startswith('workflow_suggestions:'): # Try to parse workflow suggestions try: import json workflow_str = line.split(':', 1)[1].strip() if workflow_str.startswith('[') and workflow_str.endswith(']'): routing_decision['workflow_suggestions'] = json.loads(workflow_str) except: pass elif line.lower().startswith('confidence:'): conf = line.split(':', 1)[1].strip().lower() if conf in ['high', 'medium', 'low']: routing_decision['confidence'] = conf.capitalize() # If no structured format found, try to extract capability name from text if routing_decision['recommended_capability'] == original_tool: # Look for capability names in response valid_caps = ['web_search', 'web_scrape', 'acp_inspect', 'acp_propose', 'acp_compose', 'acp_heal', 'acp_feedback_submit', 'acp_feedback_summary', 'acp_performance_metrics', 'acp_learning_summary'] for cap in valid_caps: if cap in response.lower() and cap != original_tool: routing_decision['recommended_capability'] = cap break except Exception as e: logger.warning(f"Failed to parse routing response: {e}") return routing_decision async def _determine_execution_strategy(self, tool_name: str, arguments: Dict[str, Any], llm_suggestion: str = "DIRECT") -> str: """Determine optimal execution strategy using heuristics.""" try: # Get registries for strategy evaluation cap_registry, exec_registry, _ = await self._get_registries() # Get capability metadata capability = cap_registry.get_capability(tool_name) if not capability: return "DIRECT" # Fallback for unknown capabilities # Get execution patterns patterns = exec_registry.get_all_patterns() # Base strategy from heuristics risk_level = capability.risk_level complexity = capability.complexity # Simple capabilities with low risk -> DIRECT if risk_level == "low" and complexity in ["simple"]: return "DIRECT" # Medium complexity or risk -> SEQUENTIAL if workflow detected if complexity in ["moderate"] or risk_level == "medium": # Check if this might be part of a workflow if any(keyword in tool_name.lower() for keyword in ["compose", "heal", "performance", "feedback"]): return "SEQUENTIAL" return "DIRECT" # High complexity or multi-step operations -> ADAPTIVE if complexity in ["complex", "architectural"] or risk_level == "high": return "ADAPTIVE" # Analytical operations that might benefit from parallel processing if any(keyword in tool_name.lower() for keyword in ["performance", "feedback", "metrics"]): return "PARALLEL" # Web operations that might be chained if tool_name.startswith("web_"): # If this is a web operation and arguments suggest multiple URLs/queries if isinstance(arguments.get('query'), list) or isinstance(arguments.get('urls'), list): return "PARALLEL" return "DIRECT" # Respect LLM suggestion if it makes sense valid_strategies = ["DIRECT", "SEQUENTIAL", "PARALLEL", "ADAPTIVE"] if llm_suggestion.upper() in valid_strategies: return llm_suggestion.upper() return "DIRECT" # Default fallback except Exception as e: logger.warning(f"Failed to determine execution strategy: {e}") return "DIRECT" async def _apply_heuristic_safety_checks(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Apply heuristic safety checks to routing decisions.""" try: # Get registries for safety evaluation cap_registry, _, _ = await self._get_registries() # Get capability metadata capability = cap_registry.get_capability(tool_name) if not capability: return {"safe_to_execute": False, "reason": f"Unknown capability: {tool_name}"} # Evaluate operation safety operation = { "type": "execute", "target": tool_name, "arguments": arguments, "dependencies": capability.dependencies or [] } safety_evaluation = cap_registry.evaluate_operation_safety(operation) # Determine if execution is safe safe_to_execute = ( safety_evaluation["can_auto_approve"] and not safety_evaluation["needs_manual_approval"] and safety_evaluation["safety_score"] >= 0.5 ) result = { "safe_to_execute": safe_to_execute, "safety_score": safety_evaluation["safety_score"], "heuristic_tags": safety_evaluation["heuristic_tags"], "needs_approval": safety_evaluation["needs_manual_approval"], "needs_testing": safety_evaluation["needs_parallel_testing"] } # Suggest safer alternatives if needed if not safe_to_execute: safer_caps = cap_registry.get_safe_capabilities() if safer_caps: result["safer_alternative"] = safer_caps[0].name result["reason"] = f"Original operation requires manual approval. Consider using {safer_caps[0].name} instead." else: result["reason"] = "Operation requires manual approval and no safer alternatives available." return result except Exception as e: logger.error(f"Heuristic safety check failed: {e}") return {"safe_to_execute": False, "reason": f"Safety check error: {str(e)}"} async def _execute_with_strategy(self, tool_name: str, arguments: Dict[str, Any], strategy: str, routing_decision: Dict[str, Any]) -> str: """Execute capability using the specified strategy.""" try: if strategy == "DIRECT": # Simple direct execution return await self._route_simple(tool_name, arguments) elif strategy == "SEQUENTIAL": # Sequential execution for workflows workflow_suggestions = routing_decision.get('workflow_suggestions', []) if workflow_suggestions: results = [] for step in workflow_suggestions: if isinstance(step, str): step_args = arguments if step == tool_name else {} result = await self._route_simple(step, step_args) results.append(f"{step}: {result}") return "Sequential workflow results:\n" + "\n".join(results) else: return await self._route_simple(tool_name, arguments) elif strategy == "PARALLEL": # Parallel execution for independent operations workflow_suggestions = routing_decision.get('workflow_suggestions', [tool_name]) if len(workflow_suggestions) > 1: # Execute in parallel tasks = [] for step in workflow_suggestions: if isinstance(step, str): step_args = arguments if step == tool_name else {} task = asyncio.create_task(self._route_simple(step, step_args)) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) formatted_results = [] for i, result in enumerate(results): step_name = workflow_suggestions[i] if isinstance(result, Exception): formatted_results.append(f"{step_name}: ERROR - {str(result)}") else: formatted_results.append(f"{step_name}: {result}") return "Parallel execution results:\n" + "\n".join(formatted_results) else: return await self._route_simple(tool_name, arguments) elif strategy == "ADAPTIVE": # Adaptive execution based on results # Start with the primary capability primary_result = await self._route_simple(tool_name, arguments) # Analyze result and decide on next steps workflow_suggestions = routing_decision.get('workflow_suggestions', []) if workflow_suggestions and "error" not in primary_result.lower(): # Continue with workflow if primary succeeded additional_results = [] for step in workflow_suggestions[1:]: # Skip first (already executed) if isinstance(step, str): step_args = arguments if step == tool_name else {} result = await self._route_simple(step, step_args) additional_results.append(f"{step}: {result}") if additional_results: return f"Adaptive execution:\nPrimary: {primary_result}\nAdditional:\n" + "\n".join(additional_results) return primary_result else: # Unknown strategy, fall back to direct logger.warning(f"Unknown execution strategy: {strategy}, falling back to DIRECT") return await self._route_simple(tool_name, arguments) except Exception as e: logger.error(f"Strategy execution failed: {e}") # Fallback to simple execution return await self._route_simple(tool_name, arguments)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server