Skip to main content
Glama

Katamari MCP Server

by ciphernaut
performance_tracker.pyโ€ข25.6 kB
""" Performance Tracking System for ACP Capabilities Phase 2: Tracks capability performance, success metrics, and provides analytics for adaptive learning and heuristic optimization. """ import asyncio import json import logging import os import psutil import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict, field from contextlib import asynccontextmanager from collections import defaultdict, deque from ..utils.config import Config from ..utils.error_handler import ErrorHandler logger = logging.getLogger(__name__) @dataclass class PerformanceSnapshot: """Snapshot of performance metrics at a point in time""" timestamp: datetime cpu_percent: float memory_mb: int memory_percent: float disk_io_read: int disk_io_write: int network_io_sent: int network_io_recv: int open_files: int thread_count: int @dataclass class ExecutionMetrics: """Detailed metrics for a single capability execution""" capability_id: str execution_id: str start_time: datetime end_time: datetime duration: float success: bool error_type: Optional[str] = None error_message: Optional[str] = None # Resource usage peak_memory_mb: int = 0 avg_cpu_percent: float = 0.0 disk_operations: int = 0 network_calls: int = 0 # Performance snapshots snapshots: List[PerformanceSnapshot] = field(default_factory=list) # Custom metrics custom_metrics: Dict[str, Any] = field(default_factory=dict) @dataclass class CapabilityPerformance: """Aggregated performance data for a capability""" capability_id: str total_executions: int successful_executions: int failed_executions: int success_rate: float # Timing metrics avg_duration: float min_duration: float max_duration: float recent_durations: deque # Last 20 durations # Resource metrics avg_peak_memory_mb: float max_peak_memory_mb: int avg_cpu_percent: float # Error analysis error_frequency: Dict[str, int] common_errors: List[str] # Top 5 error types # Performance trend performance_trend: List[float] # Success rate over time last_execution: datetime # Health indicators health_score: float # 0-100 overall health performance_grade: str # A, B, C, D, F class PerformanceTracker: """Tracks and analyzes capability performance""" def __init__(self, config: Config): self.config = config self.error_handler = ErrorHandler() # Storage paths workspace_root = config.get('workspace_root', os.getcwd()) self.metrics_dir = Path(workspace_root) / ".katamari" / "acp" / "performance" self.executions_file = self.metrics_dir / "executions.jsonl" self.performance_file = self.metrics_dir / "performance.json" # Create directories self.metrics_dir.mkdir(parents=True, exist_ok=True) # In-memory storage self._active_executions: Dict[str, ExecutionMetrics] = {} self._capability_performance: Dict[str, CapabilityPerformance] = {} self._performance_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100)) # Monitoring settings self.snapshot_interval = 1.0 # seconds self.max_snapshots_per_execution = 60 self.performance_window = 50 # executions for trend analysis # System monitoring self._monitoring_active = False self._monitoring_tasks: Dict[str, asyncio.Task] = {} logger.info("Performance tracker initialized") @asynccontextmanager async def track_execution(self, capability_id: str, execution_id: str): """Context manager for tracking capability execution""" # Start tracking start_time = datetime.now() metrics = ExecutionMetrics( capability_id=capability_id, execution_id=execution_id, start_time=start_time, end_time=start_time, duration=0.0, success=False ) self._active_executions[execution_id] = metrics # Start monitoring monitor_task = asyncio.create_task( self._monitor_execution(execution_id, capability_id) ) self._monitoring_tasks[execution_id] = monitor_task try: yield metrics except Exception as e: # Record error metrics.success = False metrics.error_type = type(e).__name__ metrics.error_message = str(e) raise finally: # Stop monitoring if execution_id in self._monitoring_tasks: self._monitoring_tasks[execution_id].cancel() del self._monitoring_tasks[execution_id] # Finalize metrics metrics.end_time = datetime.now() metrics.duration = (metrics.end_time - metrics.start_time).total_seconds() # Store and analyze await self._store_execution_metrics(metrics) await self._update_capability_performance(metrics.capability_id) # Remove from active if execution_id in self._active_executions: del self._active_executions[execution_id] async def get_capability_performance(self, capability_id: str) -> Optional[CapabilityPerformance]: """Get performance data for a capability""" if capability_id in self._capability_performance: return self._capability_performance[capability_id] # Load from disk await self._load_performance_data() return self._capability_performance.get(capability_id) async def get_performance_summary(self, days_back: int = 30) -> Dict[str, Any]: """Get overall performance summary""" try: cutoff_date = datetime.now() - timedelta(days=days_back) # Load recent executions recent_executions = await self._load_recent_executions(cutoff_date) if not recent_executions: return {"total_executions": 0} # Calculate summary statistics total_executions = len(recent_executions) successful_executions = sum(1 for e in recent_executions if e.success) success_rate = successful_executions / total_executions durations = [e.duration for e in recent_executions] avg_duration = sum(durations) / len(durations) # Capability breakdown capability_stats: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "successful": 0}) for execution in recent_executions: cap = execution.capability_id capability_stats[cap]["total"] += 1 if execution.success: capability_stats[cap]["successful"] += 1 # Error analysis error_counts = defaultdict(int) for execution in recent_executions: if execution.error_type: error_counts[execution.error_type] += 1 return { "period_days": days_back, "total_executions": total_executions, "successful_executions": successful_executions, "success_rate": success_rate, "average_duration": avg_duration, "capability_breakdown": dict(capability_stats), "common_errors": dict(sorted(error_counts.items(), key=lambda x: x[1], reverse=True)[:10]), "capabilities_tracked": len(capability_stats) } except Exception as e: logger.error(f"Failed to get performance summary: {e}") return {"error": str(e)} async def get_performance_trends(self, capability_id: str, days_back: int = 7) -> Dict[str, Any]: """Get performance trends for a specific capability""" try: performance = await self.get_capability_performance(capability_id) if not performance: return {"error": "Capability not found"} # Get recent executions for trend analysis cutoff_date = datetime.now() - timedelta(days=days_back) recent_executions = await self._load_capability_executions(capability_id, cutoff_date) if not recent_executions: return {"error": "No recent executions"} # Calculate trends daily_stats: Dict[str, Dict[str, Any]] = defaultdict(lambda: {"total": 0, "successful": 0, "durations": []}) for execution in recent_executions: day_key = execution.start_time.strftime("%Y-%m-%d") daily_stats[day_key]["total"] += 1 if execution.success: daily_stats[day_key]["successful"] += 1 daily_stats[day_key]["durations"].append(execution.duration) # Build trend data trend_data = [] for date in sorted(daily_stats.keys()): stats = daily_stats[date] success_rate = stats["successful"] / stats["total"] if stats["total"] > 0 else 0 durations = stats["durations"] avg_duration = sum(durations) / len(durations) if durations else 0 trend_data.append({ "date": date, "executions": stats["total"], "success_rate": success_rate, "avg_duration": avg_duration }) return { "capability_id": capability_id, "period_days": days_back, "trend_data": trend_data, "current_health_score": performance.health_score, "performance_grade": performance.performance_grade } except Exception as e: logger.error(f"Failed to get performance trends: {e}") return {"error": str(e)} async def add_custom_metric(self, execution_id: str, metric_name: str, value: Any) -> bool: """Add custom metric to active execution""" try: if execution_id in self._active_executions: self._active_executions[execution_id].custom_metrics[metric_name] = value return True return False except Exception as e: logger.error(f"Failed to add custom metric: {e}") return False async def _monitor_execution(self, execution_id: str, capability_id: str) -> None: """Monitor execution and collect performance snapshots""" try: process = psutil.Process() while execution_id in self._active_executions: # Collect system metrics snapshot = PerformanceSnapshot( timestamp=datetime.now(), cpu_percent=process.cpu_percent(), memory_mb=process.memory_info().rss // 1024 // 1024, memory_percent=process.memory_percent(), disk_io_read=process.io_counters().read_bytes if process.io_counters() else 0, disk_io_write=process.io_counters().write_bytes if process.io_counters() else 0, network_io_sent=process.io_counters().write_bytes if process.io_counters() else 0, network_io_recv=process.io_counters().read_bytes if process.io_counters() else 0, open_files=len(process.open_files()), thread_count=process.num_threads() ) # Add to execution metrics if execution_id in self._active_executions: metrics = self._active_executions[execution_id] metrics.snapshots.append(snapshot) # Update peak metrics metrics.peak_memory_mb = max(metrics.peak_memory_mb, snapshot.memory_mb) # Limit snapshots if len(metrics.snapshots) > self.max_snapshots_per_execution: metrics.snapshots = metrics.snapshots[-self.max_snapshots_per_execution:] # Wait for next snapshot await asyncio.sleep(self.snapshot_interval) except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error monitoring execution {execution_id}: {e}") async def _store_execution_metrics(self, metrics: ExecutionMetrics) -> None: """Store execution metrics to disk""" try: # Calculate derived metrics if metrics.snapshots: metrics.avg_cpu_percent = sum(s.cpu_percent for s in metrics.snapshots) / len(metrics.snapshots) # Store to file with open(self.executions_file, 'a') as f: metrics_dict = asdict(metrics) metrics_dict['start_time'] = metrics.start_time.isoformat() metrics_dict['end_time'] = metrics.end_time.isoformat() metrics_dict['snapshots'] = [asdict(s) for s in metrics.snapshots] for snapshot in metrics_dict['snapshots']: snapshot['timestamp'] = snapshot['timestamp'].isoformat() f.write(json.dumps(metrics_dict) + '\n') except Exception as e: logger.error(f"Failed to store execution metrics: {e}") async def _update_capability_performance(self, capability_id: str) -> None: """Update aggregated performance data for a capability""" try: # Load all executions for this capability executions = await self._load_capability_executions(capability_id) if not executions: return # Calculate aggregated metrics total_executions = len(executions) successful_executions = sum(1 for e in executions if e.success) failed_executions = total_executions - successful_executions success_rate = successful_executions / total_executions durations = [e.duration for e in executions] avg_duration = sum(durations) / len(durations) min_duration = min(durations) max_duration = max(durations) recent_durations = deque([e.duration for e in executions[-20:]], maxlen=20) # Resource metrics peak_memories = [e.peak_memory_mb for e in executions if e.peak_memory_mb > 0] avg_peak_memory = sum(peak_memories) / len(peak_memories) if peak_memories else 0 max_peak_memory = max(peak_memories) if peak_memories else 0 avg_cpus = [e.avg_cpu_percent for e in executions if e.avg_cpu_percent > 0] avg_cpu = sum(avg_cpus) / len(avg_cpus) if avg_cpus else 0 # Error analysis error_frequency = defaultdict(int) for execution in executions: if execution.error_type: error_frequency[execution.error_type] += 1 common_errors = sorted(error_frequency.items(), key=lambda x: x[1], reverse=True)[:5] common_errors = [error[0] for error in common_errors] # Performance trend performance_trend = [1.0 if e.success else 0.0 for e in executions[-self.performance_window:]] # Health score calculation health_score = self._calculate_health_score( success_rate, avg_duration, avg_peak_memory, len(error_frequency) ) performance_grade = self._get_performance_grade(health_score) # Create performance object performance = CapabilityPerformance( capability_id=capability_id, total_executions=total_executions, successful_executions=successful_executions, failed_executions=failed_executions, success_rate=success_rate, avg_duration=avg_duration, min_duration=min_duration, max_duration=max_duration, recent_durations=recent_durations, avg_peak_memory_mb=avg_peak_memory, max_peak_memory_mb=max_peak_memory, avg_cpu_percent=avg_cpu, error_frequency=dict(error_frequency), common_errors=common_errors, performance_trend=performance_trend, last_execution=max(e.start_time for e in executions), health_score=health_score, performance_grade=performance_grade ) # Cache and store self._capability_performance[capability_id] = performance await self._save_performance_data() except Exception as e: logger.error(f"Failed to update capability performance: {e}") async def _load_performance_data(self) -> None: """Load performance data from disk""" try: if not self.performance_file.exists(): return with open(self.performance_file, 'r') as f: data = json.load(f) for capability_id, perf_data in data.items(): # Convert deques back to deques recent_durations = deque(perf_data['recent_durations'], maxlen=20) performance_trend = deque(perf_data['performance_trend'], maxlen=self.performance_window) performance = CapabilityPerformance( capability_id=perf_data['capability_id'], total_executions=perf_data['total_executions'], successful_executions=perf_data['successful_executions'], failed_executions=perf_data['failed_executions'], success_rate=perf_data['success_rate'], avg_duration=perf_data['avg_duration'], min_duration=perf_data['min_duration'], max_duration=perf_data['max_duration'], recent_durations=recent_durations, avg_peak_memory_mb=perf_data['avg_peak_memory_mb'], max_peak_memory_mb=perf_data['max_peak_memory_mb'], avg_cpu_percent=perf_data['avg_cpu_percent'], error_frequency=perf_data['error_frequency'], common_errors=perf_data['common_errors'], performance_trend=list(performance_trend), last_execution=datetime.fromisoformat(perf_data['last_execution']), health_score=perf_data['health_score'], performance_grade=perf_data['performance_grade'] ) self._capability_performance[capability_id] = performance except Exception as e: logger.error(f"Failed to load performance data: {e}") async def _save_performance_data(self) -> None: """Save performance data to disk""" try: data = {} for capability_id, performance in self._capability_performance.items(): perf_data = asdict(performance) perf_data['last_execution'] = performance.last_execution.isoformat() data[capability_id] = perf_data with open(self.performance_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save performance data: {e}") async def _load_recent_executions(self, since: datetime) -> List[ExecutionMetrics]: """Load recent executions from disk""" executions = [] if not self.executions_file.exists(): return executions try: with open(self.executions_file, 'r') as f: for line in f: try: data = json.loads(line.strip()) start_time = datetime.fromisoformat(data['start_time']) if start_time >= since: # Reconstruct ExecutionMetrics snapshots = [] for snap_data in data['snapshots']: snapshot = PerformanceSnapshot( timestamp=datetime.fromisoformat(snap_data['timestamp']), cpu_percent=snap_data['cpu_percent'], memory_mb=snap_data['memory_mb'], memory_percent=snap_data['memory_percent'], disk_io_read=snap_data['disk_io_read'], disk_io_write=snap_data['disk_io_write'], network_io_sent=snap_data['network_io_sent'], network_io_recv=snap_data['network_io_recv'], open_files=snap_data['open_files'], thread_count=snap_data['thread_count'] ) snapshots.append(snapshot) execution = ExecutionMetrics( capability_id=data['capability_id'], execution_id=data['execution_id'], start_time=start_time, end_time=datetime.fromisoformat(data['end_time']), duration=data['duration'], success=data['success'], error_type=data.get('error_type'), error_message=data.get('error_message'), peak_memory_mb=data.get('peak_memory_mb', 0), avg_cpu_percent=data.get('avg_cpu_percent', 0.0), disk_operations=data.get('disk_operations', 0), network_calls=data.get('network_calls', 0), snapshots=snapshots, custom_metrics=data.get('custom_metrics', {}) ) executions.append(execution) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Skipping invalid execution line: {e}") continue except Exception as e: logger.error(f"Failed to load recent executions: {e}") return executions async def _load_capability_executions(self, capability_id: str, since: Optional[datetime] = None) -> List[ExecutionMetrics]: """Load executions for a specific capability""" all_executions = await self._load_recent_executions(since or datetime.min) return [e for e in all_executions if e.capability_id == capability_id] def _calculate_health_score( self, success_rate: float, avg_duration: float, avg_memory_mb: float, error_types: int ) -> float: """Calculate overall health score (0-100)""" try: # Success rate weight: 40% success_score = success_rate * 40 # Duration score (faster is better): 25% # Normalize duration (assuming 10 seconds as baseline) duration_score = max(0, 25 - (avg_duration / 10) * 25) # Memory score (lower is better): 20% # Normalize memory (assuming 500MB as baseline) memory_score = max(0, 20 - (avg_memory_mb / 500) * 20) # Error diversity score (fewer error types is better): 15% error_score = max(0, 15 - error_types * 3) total_score = success_score + duration_score + memory_score + error_score return min(100, max(0, total_score)) except Exception as e: logger.error(f"Failed to calculate health score: {e}") return 50.0 # Default to middle score def _get_performance_grade(self, health_score: float) -> str: """Get performance grade from health score""" if health_score >= 90: return "A" elif health_score >= 80: return "B" elif health_score >= 70: return "C" elif health_score >= 60: return "D" else: return "F"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server