"""Base metrics framework."""
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from collections import defaultdict, deque
from ..utils.logging import get_logger
logger = get_logger(__name__)
class BaseMetrics(ABC):
"""Base class for metrics implementations."""
def __init__(self, name: str) -> None:
"""Initialize metrics.
Args:
name: Metrics implementation name
"""
self.name = name
self.enabled = True
self._logger = get_logger(f"{__name__}.{name}")
@abstractmethod
async def counter_increment(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
"""Increment a counter metric.
Args:
name: Metric name
labels: Optional labels
value: Value to increment by
"""
pass
@abstractmethod
async def gauge_set(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Set a gauge metric value.
Args:
name: Metric name
value: Value to set
labels: Optional labels
"""
pass
@abstractmethod
async def histogram_observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Observe a value for a histogram metric.
Args:
name: Metric name
value: Value to observe
labels: Optional labels
"""
pass
@abstractmethod
async def summary_observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Observe a value for a summary metric.
Args:
name: Metric name
value: Value to observe
labels: Optional labels
"""
pass
@abstractmethod
async def get_metrics(self) -> Dict[str, Any]:
"""Get all metrics data.
Returns:
Metrics data
"""
pass
@abstractmethod
async def reset_metrics(self) -> None:
"""Reset all metrics."""
pass
class MetricsCollector:
"""Collector for various metrics implementations."""
def __init__(self) -> None:
"""Initialize metrics collector."""
self.implementations: Dict[str, BaseMetrics] = {}
self._logger = get_logger(__name__)
# Standard MCP server metrics
self._setup_standard_metrics()
def _setup_standard_metrics(self) -> None:
"""Setup standard MCP server metrics."""
self.standard_metrics = {
# Tool metrics
"mcp_tool_calls_total": "Counter for total tool calls",
"mcp_tool_call_duration_seconds": "Histogram for tool call duration",
"mcp_tool_call_errors_total": "Counter for tool call errors",
# Resource metrics
"mcp_resource_reads_total": "Counter for total resource reads",
"mcp_resource_read_duration_seconds": "Histogram for resource read duration",
"mcp_resource_read_errors_total": "Counter for resource read errors",
"mcp_resource_cache_hits_total": "Counter for resource cache hits",
"mcp_resource_cache_misses_total": "Counter for resource cache misses",
# Prompt metrics
"mcp_prompt_gets_total": "Counter for total prompt gets",
"mcp_prompt_get_duration_seconds": "Histogram for prompt get duration",
"mcp_prompt_get_errors_total": "Counter for prompt get errors",
# Server metrics
"mcp_server_requests_total": "Counter for total server requests",
"mcp_server_active_connections": "Gauge for active connections",
"mcp_server_uptime_seconds": "Gauge for server uptime",
"mcp_server_memory_usage_bytes": "Gauge for memory usage",
# Plugin metrics
"mcp_plugins_loaded_total": "Gauge for loaded plugins",
"mcp_plugins_enabled_total": "Gauge for enabled plugins",
# Middleware metrics
"mcp_middleware_execution_duration_seconds": "Histogram for middleware execution time",
}
def add_implementation(self, impl: BaseMetrics) -> None:
"""Add metrics implementation.
Args:
impl: Metrics implementation to add
"""
self.implementations[impl.name] = impl
self._logger.info(f"Added metrics implementation: {impl.name}")
def remove_implementation(self, name: str) -> None:
"""Remove metrics implementation.
Args:
name: Implementation name to remove
"""
if name in self.implementations:
del self.implementations[name]
self._logger.info(f"Removed metrics implementation: {name}")
def get_implementation(self, name: str) -> Optional[BaseMetrics]:
"""Get metrics implementation by name.
Args:
name: Implementation name
Returns:
Metrics implementation or None
"""
return self.implementations.get(name)
async def counter_increment(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
"""Increment counter across all implementations."""
for impl in self.implementations.values():
if impl.enabled:
try:
await impl.counter_increment(name, labels, value)
except Exception as e:
self._logger.error(f"Error incrementing counter in {impl.name}: {e}")
async def gauge_set(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Set gauge across all implementations."""
for impl in self.implementations.values():
if impl.enabled:
try:
await impl.gauge_set(name, value, labels)
except Exception as e:
self._logger.error(f"Error setting gauge in {impl.name}: {e}")
async def histogram_observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Observe histogram across all implementations."""
for impl in self.implementations.values():
if impl.enabled:
try:
await impl.histogram_observe(name, value, labels)
except Exception as e:
self._logger.error(f"Error observing histogram in {impl.name}: {e}")
async def summary_observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None:
"""Observe summary across all implementations."""
for impl in self.implementations.values():
if impl.enabled:
try:
await impl.summary_observe(name, value, labels)
except Exception as e:
self._logger.error(f"Error observing summary in {impl.name}: {e}")
async def get_all_metrics(self) -> Dict[str, Dict[str, Any]]:
"""Get metrics from all implementations.
Returns:
Dictionary mapping implementation names to their metrics
"""
all_metrics = {}
for name, impl in self.implementations.items():
if impl.enabled:
try:
all_metrics[name] = await impl.get_metrics()
except Exception as e:
self._logger.error(f"Error getting metrics from {name}: {e}")
all_metrics[name] = {"error": str(e)}
return all_metrics
async def reset_all_metrics(self) -> None:
"""Reset metrics in all implementations."""
for impl in self.implementations.values():
if impl.enabled:
try:
await impl.reset_metrics()
except Exception as e:
self._logger.error(f"Error resetting metrics in {impl.name}: {e}")
self._logger.info("Reset all metrics")
# Convenience methods for standard MCP metrics
async def record_tool_call(self, tool_name: str, duration: float, success: bool = True) -> None:
"""Record tool call metrics."""
labels = {"tool": tool_name, "status": "success" if success else "error"}
await self.counter_increment("mcp_tool_calls_total", labels)
await self.histogram_observe("mcp_tool_call_duration_seconds", duration, labels)
if not success:
await self.counter_increment("mcp_tool_call_errors_total", {"tool": tool_name})
async def record_resource_read(
self, resource_uri: str, duration: float, cache_hit: bool = False, success: bool = True
) -> None:
"""Record resource read metrics."""
labels = {"resource": resource_uri, "status": "success" if success else "error"}
await self.counter_increment("mcp_resource_reads_total", labels)
await self.histogram_observe("mcp_resource_read_duration_seconds", duration, labels)
if cache_hit:
await self.counter_increment("mcp_resource_cache_hits_total", {"resource": resource_uri})
else:
await self.counter_increment("mcp_resource_cache_misses_total", {"resource": resource_uri})
if not success:
await self.counter_increment("mcp_resource_read_errors_total", {"resource": resource_uri})
async def record_prompt_get(self, prompt_name: str, duration: float, success: bool = True) -> None:
"""Record prompt get metrics."""
labels = {"prompt": prompt_name, "status": "success" if success else "error"}
await self.counter_increment("mcp_prompt_gets_total", labels)
await self.histogram_observe("mcp_prompt_get_duration_seconds", duration, labels)
if not success:
await self.counter_increment("mcp_prompt_get_errors_total", {"prompt": prompt_name})
async def update_server_stats(
self, active_connections: int, uptime: float, memory_usage: Optional[int] = None
) -> None:
"""Update server statistics."""
await self.gauge_set("mcp_server_active_connections", active_connections)
await self.gauge_set("mcp_server_uptime_seconds", uptime)
if memory_usage is not None:
await self.gauge_set("mcp_server_memory_usage_bytes", memory_usage)
def get_standard_metrics(self) -> Dict[str, str]:
"""Get list of standard metrics and their descriptions.
Returns:
Dictionary mapping metric names to descriptions
"""
return self.standard_metrics.copy()