Skip to main content
Glama
metrics.py25.7 kB
"""Metrics collection and monitoring for Ultimate MCP Server.""" import json import os import time from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional, Union from ultimate_mcp_server.utils import get_logger logger = get_logger(__name__) try: import aiofiles AIOFILES_AVAILABLE = True except ImportError: AIOFILES_AVAILABLE = False try: from prometheus_client import Counter, Histogram PROMETHEUS_AVAILABLE = True except ImportError: PROMETHEUS_AVAILABLE = False class MetricsTracker: """Comprehensive metrics tracking and monitoring system for Ultimate MCP Server. The MetricsTracker is a singleton class that collects, processes, and persists operational metrics related to LLM usage, costs, performance, and errors. It provides the data foundation for analytics reporting and monitoring tools. Key features: - Singleton design pattern ensures consistent metrics across application - Persistent storage with automatic serialization to JSON - Tracks usage by provider, model, and time periods (hourly, daily) - Records request counts, token usage, costs, errors, and performance metrics - Cache efficiency monitoring (hits, misses, cost savings) - Optional Prometheus integration for external monitoring systems - Asynchronous persistence to minimize performance impact - Automatic data retention policies to prevent memory bloat The metrics are automatically persisted to disk and can be loaded on startup, providing continuity across server restarts. Time-series data is maintained for historical analysis and trend visualization. Usage: # Get the singleton instance metrics = get_metrics_tracker() # Record a request metrics.record_request( provider="anthropic", model="claude-3-opus", input_tokens=150, output_tokens=500, cost=0.0325, duration=2.5 ) # Record cache operations metrics.record_cache_hit(cost_saved=0.015) metrics.record_cache_miss() # Get current statistics stats = metrics.get_stats() # Manually trigger persistence (usually automatic) metrics.save_metrics() """ _instance = None def __new__(cls, *args, **kwargs): """Create a singleton instance.""" if cls._instance is None: cls._instance = super(MetricsTracker, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__( self, metrics_dir: Optional[Union[str, Path]] = None, enable_prometheus: bool = False, reset_on_start: bool = False ): """Initialize the metrics tracker. Args: metrics_dir: Directory for metrics storage enable_prometheus: Whether to enable Prometheus metrics reset_on_start: Whether to reset metrics on startup """ # Only initialize once for singleton if self._initialized: return # Set metrics directory if metrics_dir: self.metrics_dir = Path(metrics_dir) else: self.metrics_dir = Path.home() / ".ultimate" / "metrics" # Create metrics directory if it doesn't exist self.metrics_dir.mkdir(parents=True, exist_ok=True) # Prometheus settings self.enable_prometheus = enable_prometheus and PROMETHEUS_AVAILABLE # Initialize metrics data if reset_on_start: self._reset_metrics() else: self._load_metrics() # Initialize Prometheus metrics if enabled if self.enable_prometheus: self._init_prometheus_metrics() self._initialized = True logger.info( f"Metrics tracker initialized (dir: {self.metrics_dir}, prometheus: {self.enable_prometheus})", emoji_key="analytics" ) def _reset_metrics(self): """Reset all metrics data.""" # General stats self.start_time = time.time() self.requests_total = 0 self.tokens_total = 0 self.cost_total = 0.0 # Provider-specific stats self.provider_requests = defaultdict(int) self.provider_tokens = defaultdict(int) self.provider_costs = defaultdict(float) # Model-specific stats self.model_requests = defaultdict(int) self.model_tokens = defaultdict(int) self.model_costs = defaultdict(float) # Request timing stats self.request_times = [] self.request_times_by_provider = defaultdict(list) self.request_times_by_model = defaultdict(list) # Error stats self.errors_total = 0 self.errors_by_provider = defaultdict(int) self.errors_by_model = defaultdict(int) # Token usage by time period self.hourly_tokens = defaultdict(int) self.daily_tokens = defaultdict(int) # Request counts by time period self.hourly_requests = defaultdict(int) self.daily_requests = defaultdict(int) # Cost by time period self.hourly_costs = defaultdict(float) self.daily_costs = defaultdict(float) # Cache stats self.cache_hits = 0 self.cache_misses = 0 self.cache_saved_cost = 0.0 def _load_metrics(self): """Load metrics from disk.""" metrics_file = self.metrics_dir / "metrics.json" if metrics_file.exists(): try: with open(metrics_file, "r") as f: data = json.load(f) # Load general stats self.start_time = data.get("start_time", time.time()) self.requests_total = data.get("requests_total", 0) self.tokens_total = data.get("tokens_total", 0) self.cost_total = data.get("cost_total", 0.0) # Load provider stats self.provider_requests = defaultdict(int, data.get("provider_requests", {})) self.provider_tokens = defaultdict(int, data.get("provider_tokens", {})) self.provider_costs = defaultdict(float, data.get("provider_costs", {})) # Load model stats self.model_requests = defaultdict(int, data.get("model_requests", {})) self.model_tokens = defaultdict(int, data.get("model_tokens", {})) self.model_costs = defaultdict(float, data.get("model_costs", {})) # Load timing stats (limited to last 1000 for memory) self.request_times = data.get("request_times", [])[-1000:] self.request_times_by_provider = defaultdict(list) for provider, times in data.get("request_times_by_provider", {}).items(): self.request_times_by_provider[provider] = times[-1000:] self.request_times_by_model = defaultdict(list) for model, times in data.get("request_times_by_model", {}).items(): self.request_times_by_model[model] = times[-1000:] # Load error stats self.errors_total = data.get("errors_total", 0) self.errors_by_provider = defaultdict(int, data.get("errors_by_provider", {})) self.errors_by_model = defaultdict(int, data.get("errors_by_model", {})) # Load time period stats self.hourly_tokens = defaultdict(int, data.get("hourly_tokens", {})) self.daily_tokens = defaultdict(int, data.get("daily_tokens", {})) self.hourly_costs = defaultdict(float, data.get("hourly_costs", {})) self.daily_costs = defaultdict(float, data.get("daily_costs", {})) self.hourly_requests = defaultdict(int, data.get("hourly_requests", {})) self.daily_requests = defaultdict(int, data.get("daily_requests", {})) # Load cache stats self.cache_hits = data.get("cache_hits", 0) self.cache_misses = data.get("cache_misses", 0) self.cache_saved_cost = data.get("cache_saved_cost", 0.0) logger.info( f"Loaded metrics from {metrics_file}", emoji_key="analytics" ) except Exception as e: logger.error( f"Failed to load metrics: {str(e)}", emoji_key="error" ) self._reset_metrics() else: self._reset_metrics() def _init_prometheus_metrics(self): """Initialize Prometheus metrics.""" if not PROMETHEUS_AVAILABLE: return # Request metrics self.prom_requests_total = Counter( "ultimate_requests_total", "Total number of requests", ["provider", "model"] ) # Token metrics self.prom_tokens_total = Counter( "ultimate_tokens_total", "Total number of tokens", ["provider", "model", "type"] # type: input or output ) # Cost metrics self.prom_cost_total = Counter( "ultimate_cost_total", "Total cost in USD", ["provider", "model"] ) # Timing metrics self.prom_request_duration = Histogram( "ultimate_request_duration_seconds", "Request duration in seconds", ["provider", "model"], buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0) ) # Error metrics self.prom_errors_total = Counter( "ultimate_errors_total", "Total number of errors", ["provider", "model"] ) # Cache metrics self.prom_cache_hits = Counter( "ultimate_cache_hits_total", "Total number of cache hits" ) self.prom_cache_misses = Counter( "ultimate_cache_misses_total", "Total number of cache misses" ) self.prom_cache_saved_cost = Counter( "ultimate_cache_saved_cost_total", "Total cost saved by cache in USD" ) async def _save_metrics_async(self): """Save metrics to disk asynchronously.""" if not AIOFILES_AVAILABLE: return metrics_file = self.metrics_dir / "metrics.json" temp_file = metrics_file.with_suffix(".tmp") try: # Prepare data for storage data = { "start_time": self.start_time, "requests_total": self.requests_total, "tokens_total": self.tokens_total, "cost_total": self.cost_total, "provider_requests": dict(self.provider_requests), "provider_tokens": dict(self.provider_tokens), "provider_costs": dict(self.provider_costs), "model_requests": dict(self.model_requests), "model_tokens": dict(self.model_tokens), "model_costs": dict(self.model_costs), "request_times": self.request_times, "request_times_by_provider": {k: v for k, v in self.request_times_by_provider.items()}, "request_times_by_model": {k: v for k, v in self.request_times_by_model.items()}, "errors_total": self.errors_total, "errors_by_provider": dict(self.errors_by_provider), "errors_by_model": dict(self.errors_by_model), "hourly_tokens": dict(self.hourly_tokens), "daily_tokens": dict(self.daily_tokens), "hourly_costs": dict(self.hourly_costs), "daily_costs": dict(self.daily_costs), "hourly_requests": dict(self.hourly_requests), "daily_requests": dict(self.daily_requests), "cache_hits": self.cache_hits, "cache_misses": self.cache_misses, "cache_saved_cost": self.cache_saved_cost, "last_updated": time.time() } # Save to temp file async with aiofiles.open(temp_file, "w") as f: await f.write(json.dumps(data, indent=2)) # Rename temp file to actual file os.replace(temp_file, metrics_file) except Exception as e: logger.error( f"Failed to save metrics: {str(e)}", emoji_key="error" ) def save_metrics(self): """Save metrics to disk synchronously.""" metrics_file = self.metrics_dir / "metrics.json" temp_file = metrics_file.with_suffix(".tmp") try: # Prepare data for storage data = { "start_time": self.start_time, "requests_total": self.requests_total, "tokens_total": self.tokens_total, "cost_total": self.cost_total, "provider_requests": dict(self.provider_requests), "provider_tokens": dict(self.provider_tokens), "provider_costs": dict(self.provider_costs), "model_requests": dict(self.model_requests), "model_tokens": dict(self.model_tokens), "model_costs": dict(self.model_costs), "request_times": self.request_times, "request_times_by_provider": {k: v for k, v in self.request_times_by_provider.items()}, "request_times_by_model": {k: v for k, v in self.request_times_by_model.items()}, "errors_total": self.errors_total, "errors_by_provider": dict(self.errors_by_provider), "errors_by_model": dict(self.errors_by_model), "hourly_tokens": dict(self.hourly_tokens), "daily_tokens": dict(self.daily_tokens), "hourly_costs": dict(self.hourly_costs), "daily_costs": dict(self.daily_costs), "hourly_requests": dict(self.hourly_requests), "daily_requests": dict(self.daily_requests), "cache_hits": self.cache_hits, "cache_misses": self.cache_misses, "cache_saved_cost": self.cache_saved_cost, "last_updated": time.time() } # Save to temp file with open(temp_file, "w") as f: json.dump(data, f, indent=2) # Rename temp file to actual file os.replace(temp_file, metrics_file) except Exception as e: logger.error( f"Failed to save metrics: {str(e)}", emoji_key="error" ) def record_request( self, provider: str, model: str, input_tokens: int, output_tokens: int, cost: float, duration: float, success: bool = True ): """Record metrics for a request. Args: provider: Provider name model: Model name input_tokens: Number of input tokens output_tokens: Number of output tokens cost: Cost of the request duration: Duration of the request in seconds success: Whether the request was successful """ # Update general stats self.requests_total += 1 total_tokens = input_tokens + output_tokens self.tokens_total += total_tokens self.cost_total += cost # Update provider stats self.provider_requests[provider] += 1 self.provider_tokens[provider] += total_tokens self.provider_costs[provider] += cost # Update model stats self.model_requests[model] += 1 self.model_tokens[model] += total_tokens self.model_costs[model] += cost # Update timing stats self.request_times.append(duration) if len(self.request_times) > 1000: self.request_times = self.request_times[-1000:] self.request_times_by_provider[provider].append(duration) if len(self.request_times_by_provider[provider]) > 1000: self.request_times_by_provider[provider] = self.request_times_by_provider[provider][-1000:] self.request_times_by_model[model].append(duration) if len(self.request_times_by_model[model]) > 1000: self.request_times_by_model[model] = self.request_times_by_model[model][-1000:] # Update error stats if request failed if not success: self.errors_total += 1 self.errors_by_provider[provider] += 1 self.errors_by_model[model] += 1 # Update time period stats current_time = time.time() hour_key = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d-%H") day_key = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d") self.hourly_tokens[hour_key] += total_tokens self.daily_tokens[day_key] += total_tokens self.hourly_costs[hour_key] += cost self.daily_costs[day_key] += cost self.hourly_requests[hour_key] += 1 self.daily_requests[day_key] += 1 # Update Prometheus metrics if enabled if self.enable_prometheus: self.prom_requests_total.labels(provider=provider, model=model).inc() self.prom_tokens_total.labels(provider=provider, model=model, type="input").inc(input_tokens) self.prom_tokens_total.labels(provider=provider, model=model, type="output").inc(output_tokens) self.prom_cost_total.labels(provider=provider, model=model).inc(cost) self.prom_request_duration.labels(provider=provider, model=model).observe(duration) if not success: self.prom_errors_total.labels(provider=provider, model=model).inc() # Schedule metrics saving try: import asyncio asyncio.create_task(self._save_metrics_async()) except (ImportError, RuntimeError): # Fall back to synchronous saving if asyncio not available self.save_metrics() def record_cache_hit(self, cost_saved: float = 0.0): """Record a cache hit. Args: cost_saved: Cost saved by the cache hit """ self.cache_hits += 1 self.cache_saved_cost += cost_saved # Update Prometheus metrics if enabled if self.enable_prometheus: self.prom_cache_hits.inc() self.prom_cache_saved_cost.inc(cost_saved) def record_cache_miss(self): """Record a cache miss.""" self.cache_misses += 1 # Update Prometheus metrics if enabled if self.enable_prometheus: self.prom_cache_misses.inc() def get_stats(self) -> Dict[str, Any]: """Get current metrics. Returns: Dictionary of metrics """ # Calculate uptime uptime = time.time() - self.start_time # Calculate request rate (per minute) request_rate = self.requests_total / (uptime / 60) if uptime > 0 else 0 # Calculate average response time avg_response_time = sum(self.request_times) / len(self.request_times) if self.request_times else 0 # Calculate cache hit ratio cache_total = self.cache_hits + self.cache_misses cache_hit_ratio = self.cache_hits / cache_total if cache_total > 0 else 0 # Get top providers by usage top_providers = sorted( [(provider, tokens) for provider, tokens in self.provider_tokens.items()], key=lambda x: x[1], reverse=True )[:5] # Get top models by usage top_models = sorted( [(model, tokens) for model, tokens in self.model_tokens.items()], key=lambda x: x[1], reverse=True )[:5] # Get daily token usage for last 7 days today = datetime.now().strftime("%Y-%m-%d") daily_usage = [] for i in range(7): day = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") daily_usage.append(( day, self.daily_tokens.get(day, 0), self.daily_costs.get(day, 0.0), self.daily_requests.get(day, 0) )) # Compile stats return { "general": { "uptime": uptime, "uptime_human": self._format_duration(uptime), "requests_total": self.requests_total, "tokens_total": self.tokens_total, "cost_total": self.cost_total, "request_rate": request_rate, "avg_response_time": avg_response_time, "errors_total": self.errors_total, "error_rate": self.errors_total / self.requests_total if self.requests_total > 0 else 0, }, "providers": { provider: { "requests": count, "tokens": self.provider_tokens.get(provider, 0), "cost": self.provider_costs.get(provider, 0.0), "avg_response_time": sum(self.request_times_by_provider.get(provider, [])) / len(self.request_times_by_provider.get(provider, [])) if self.request_times_by_provider.get(provider, []) else 0, "errors": self.errors_by_provider.get(provider, 0), } for provider, count in self.provider_requests.items() }, "models": { model: { "requests": count, "tokens": self.model_tokens.get(model, 0), "cost": self.model_costs.get(model, 0.0), "avg_response_time": sum(self.request_times_by_model.get(model, [])) / len(self.request_times_by_model.get(model, [])) if self.request_times_by_model.get(model, []) else 0, "errors": self.errors_by_model.get(model, 0), } for model, count in self.model_requests.items() }, "cache": { "hits": self.cache_hits, "misses": self.cache_misses, "hit_ratio": cache_hit_ratio, "saved_cost": self.cache_saved_cost, }, "top_providers": [ { "provider": provider, "tokens": tokens, "percentage": tokens / self.tokens_total if self.tokens_total > 0 else 0, } for provider, tokens in top_providers ], "top_models": [ { "model": model, "tokens": tokens, "percentage": tokens / self.tokens_total if self.tokens_total > 0 else 0, } for model, tokens in top_models ], "daily_usage": [ { "date": date, "tokens": tokens, "cost": cost, "requests": requests } for date, tokens, cost, requests in daily_usage ], "today": { "tokens": self.daily_tokens.get(today, 0), "cost": self.daily_costs.get(today, 0.0), "requests": self.daily_requests.get(today, 0) } } def _format_duration(self, seconds: float) -> str: """Format duration in a human-readable format. Args: seconds: Duration in seconds Returns: Formatted duration """ if seconds < 60: return f"{seconds:.1f} seconds" elif seconds < 3600: minutes = seconds / 60 return f"{minutes:.1f} minutes" elif seconds < 86400: hours = seconds / 3600 return f"{hours:.1f} hours" else: days = seconds / 86400 return f"{days:.1f} days" def reset(self): """Reset all metrics.""" self._reset_metrics() logger.info( "Metrics reset", emoji_key="analytics" ) # Singleton instance getter def get_metrics_tracker( metrics_dir: Optional[Union[str, Path]] = None, enable_prometheus: bool = False, reset_on_start: bool = False ) -> MetricsTracker: """Get the metrics tracker singleton instance. Args: metrics_dir: Directory for metrics storage enable_prometheus: Whether to enable Prometheus metrics reset_on_start: Whether to reset metrics on startup Returns: MetricsTracker singleton instance """ return MetricsTracker(metrics_dir, enable_prometheus, reset_on_start)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server