Skip to main content
Glama

Katamari MCP Server

by ciphernaut
system_context.py12.5 kB
""" System context integration for intelligent ACP routing and execution. """ import logging from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime, timedelta import asyncio logger = logging.getLogger(__name__) @dataclass class SystemStatus: """Current system status information.""" acp_status: str # active, learning, healing, maintenance active_workflows: int recent_errors: int performance_score: float # 0.0 to 1.0 learning_rate: float last_update: datetime @dataclass class PerformanceMetrics: """Recent performance metrics.""" avg_response_time: float # milliseconds success_rate: float # 0.0 to 1.0 error_rate: float # 0.0 to 1.0 throughput: int # requests per minute resource_usage: Dict[str, float] # cpu, memory, etc. @dataclass class WorkflowContext: """Active workflow context.""" workflow_id: str name: str status: str # running, completed, failed, paused progress: float # 0.0 to 1.0 capabilities_used: List[str] start_time: datetime estimated_completion: Optional[datetime] class SystemContextManager: """Manages system context for intelligent routing with persistence.""" def __init__(self, persistence=None): self.persistence = persistence self._context_loaded = False self.status = SystemStatus( acp_status="active", active_workflows=0, recent_errors=0, performance_score=0.8, learning_rate=0.1, last_update=datetime.now() ) self.performance = PerformanceMetrics( avg_response_time=150.0, success_rate=0.95, error_rate=0.05, throughput=10, resource_usage={"cpu": 0.3, "memory": 0.4} ) self.active_workflows: List[WorkflowContext] = [] self._context_cache: Dict[str, Any] = {} self._cache_ttl = timedelta(minutes=5) async def update_system_status(self): """Update system status from actual system components.""" try: # Get real-time status from various components self.status.last_update = datetime.now() # Update performance metrics (would integrate with actual monitoring) self.performance.success_rate = min(0.99, self.performance.success_rate + 0.01) self.performance.avg_response_time = max(50.0, self.performance.avg_response_time - 5.0) # Update ACP status based on learning activity if self.status.learning_rate > 0.2: self.status.acp_status = "learning" elif self.status.recent_errors > 5: self.status.acp_status = "healing" else: self.status.acp_status = "active" except Exception as e: # Fallback to safe defaults self.status.acp_status = "active" self.status.performance_score = 0.7 def add_workflow(self, workflow: WorkflowContext): """Add an active workflow to context.""" self.active_workflows.append(workflow) self.status.active_workflows = len(self.active_workflows) def remove_workflow(self, workflow_id: str): """Remove a workflow from active context.""" self.active_workflows = [ wf for wf in self.active_workflows if wf.workflow_id != workflow_id ] self.status.active_workflows = len(self.active_workflows) def update_workflow_progress(self, workflow_id: str, progress: float): """Update workflow progress.""" for workflow in self.active_workflows: if workflow.workflow_id == workflow_id: workflow.progress = min(1.0, max(0.0, progress)) break def record_error(self, error_type: str, severity: str = "medium"): """Record an error in system context.""" self.status.recent_errors += 1 # Adjust performance score based on errors if severity == "high": self.status.performance_score *= 0.9 elif severity == "medium": self.status.performance_score *= 0.95 else: self.status.performance_score *= 0.98 def get_context_summary(self) -> Dict[str, Any]: """Get comprehensive system context for LLM.""" return { "system_status": { "acp_status": self.status.acp_status, "active_workflows": self.status.active_workflows, "recent_errors": self.status.recent_errors, "performance_score": self.status.performance_score, "learning_rate": self.status.learning_rate, "last_update": self.status.last_update.isoformat() }, "performance_metrics": { "avg_response_time": self.performance.avg_response_time, "success_rate": self.performance.success_rate, "error_rate": self.performance.error_rate, "throughput": self.performance.throughput, "resource_usage": self.performance.resource_usage }, "active_workflows": [ { "workflow_id": wf.workflow_id, "name": wf.name, "status": wf.status, "progress": wf.progress, "capabilities_used": wf.capabilities_used, "start_time": wf.start_time.isoformat(), "estimated_completion": wf.estimated_completion.isoformat() if wf.estimated_completion else None } for wf in self.active_workflows[-5:] # Last 5 workflows ], "system_health": self._calculate_health_score(), "recommendations": self._generate_recommendations() } def _calculate_health_score(self) -> Dict[str, Any]: """Calculate overall system health score.""" health_score = ( self.status.performance_score * 0.4 + self.performance.success_rate * 0.3 + (1.0 - self.performance.error_rate) * 0.2 + (1.0 - min(1.0, self.status.recent_errors / 10.0)) * 0.1 ) if health_score >= 0.9: status = "excellent" elif health_score >= 0.8: status = "good" elif health_score >= 0.7: status = "fair" else: status = "poor" return { "overall_score": round(health_score, 3), "status": status, "factors": { "performance": round(self.status.performance_score, 3), "success_rate": round(self.performance.success_rate, 3), "error_rate": round(self.performance.error_rate, 3), "error_impact": round(min(1.0, self.status.recent_errors / 10.0), 3) } } def _generate_recommendations(self) -> List[str]: """Generate system recommendations based on context.""" recommendations = [] if self.performance.success_rate < 0.9: recommendations.append("Consider reviewing error logs and optimizing frequently used capabilities") if self.performance.avg_response_time > 200: recommendations.append("System response times are elevated - consider resource optimization") if self.status.recent_errors > 5: recommendations.append("High error rate detected - system healing recommended") if self.status.active_workflows > 3: recommendations.append("Multiple concurrent workflows - consider load balancing") if self.status.learning_rate < 0.1: recommendations.append("Learning rate is low - consider increasing feedback collection") if not recommendations: recommendations.append("System operating normally - no immediate actions required") return recommendations def get_llm_context(self) -> str: """Get formatted context for LLM consumption.""" context = self.get_context_summary() llm_context = "## Current System Context:\n\n" # System Status status = context["system_status"] llm_context += f"### System Status\n" llm_context += f"- **ACP Status**: {status['acp_status']}\n" llm_context += f"- **Active Workflows**: {status['active_workflows']}\n" llm_context += f"- **Performance Score**: {status['performance_score']:.2f}\n" llm_context += f"- **Recent Errors**: {status['recent_errors']}\n\n" # Performance Metrics perf = context["performance_metrics"] llm_context += f"### Performance Metrics\n" llm_context += f"- **Response Time**: {perf['avg_response_time']:.0f}ms\n" llm_context += f"- **Success Rate**: {perf['success_rate']:.1%}\n" llm_context += f"- **Error Rate**: {perf['error_rate']:.1%}\n" llm_context += f"- **Throughput**: {perf['throughput']} req/min\n\n" # System Health health = context["system_health"] llm_context += f"### System Health\n" llm_context += f"- **Overall**: {health['status']} ({health['overall_score']:.2f})\n\n" # Recommendations if context["recommendations"]: llm_context += f"### Recommendations\n" for rec in context["recommendations"]: llm_context += f"- {rec}\n" llm_context += "\n" return llm_context async def load_context(self): """Load system context from persistence.""" if self.persistence and not self._context_loaded: try: loaded = await self.persistence.load_system_context() if loaded: # Restore system status if "system_status" in loaded: status_data = loaded["system_status"] self.status = SystemStatus( acp_status=status_data.get("acp_status", "active"), active_workflows=status_data.get("active_workflows", 0), recent_errors=status_data.get("recent_errors", 0), performance_score=status_data.get("performance_score", 0.8), learning_rate=status_data.get("learning_rate", 0.1), last_update=datetime.fromisoformat(status_data.get("last_update", datetime.now().isoformat())) ) # Restore performance metrics if "performance_metrics" in loaded: perf_data = loaded["performance_metrics"] self.performance = PerformanceMetrics( avg_response_time=perf_data.get("avg_response_time", 150.0), success_rate=perf_data.get("success_rate", 0.95), error_rate=perf_data.get("error_rate", 0.05), throughput=perf_data.get("throughput", 10), resource_usage=perf_data.get("resource_usage", {"cpu": 0.5, "memory": 0.6}) ) self._context_loaded = True except Exception as e: logger.warning(f"Failed to load system context from persistence: {e}") async def save_context(self): """Save system context to persistence.""" if self.persistence: try: context_data = self.get_context_summary() await self.persistence.save_system_context(context_data) except Exception as e: logger.warning(f"Failed to save system context to persistence: {e}") def get_system_overview(self) -> Dict[str, Any]: """Get simplified system overview for routing.""" return { "health_score": int(self.status.performance_score * 100), "active_workflows": self.status.active_workflows, "acp_status": self.status.acp_status, "recent_errors": self.status.recent_errors }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server