Skip to main content
Glama
metrics.py9.9 kB
"""Metrics collection for Agent MCP Gateway.""" import asyncio from dataclasses import dataclass, field from typing import Dict, List @dataclass class OperationMetrics: """Metrics for a specific operation. Attributes: count: Total number of operations recorded total_latency_ms: Cumulative latency in milliseconds latencies: List of individual latency measurements errors: Number of operations that resulted in errors """ count: int = 0 total_latency_ms: float = 0.0 latencies: List[float] = field(default_factory=list) errors: int = 0 def record(self, latency_ms: float, is_error: bool = False): """Record a single operation. Args: latency_ms: Operation latency in milliseconds is_error: Whether the operation resulted in an error """ self.count += 1 self.total_latency_ms += latency_ms self.latencies.append(latency_ms) if is_error: self.errors += 1 def get_summary(self) -> dict: """Generate summary statistics for this operation. Returns: Dictionary containing count, avg, percentiles, and error_rate """ if self.count == 0: return { "count": 0, "avg_latency_ms": 0.0, "p50_latency_ms": 0.0, "p95_latency_ms": 0.0, "p99_latency_ms": 0.0, "error_rate": 0.0 } avg_latency = self.total_latency_ms / self.count error_rate = self.errors / self.count # Calculate percentiles sorted_latencies = sorted(self.latencies) p50 = self._percentile(sorted_latencies, 50) p95 = self._percentile(sorted_latencies, 95) p99 = self._percentile(sorted_latencies, 99) return { "count": self.count, "avg_latency_ms": round(avg_latency, 2), "p50_latency_ms": round(p50, 2), "p95_latency_ms": round(p95, 2), "p99_latency_ms": round(p99, 2), "error_rate": round(error_rate, 4) } @staticmethod def _percentile(sorted_values: List[float], percentile: int) -> float: """Calculate percentile from sorted values. Args: sorted_values: List of values sorted in ascending order percentile: Percentile to calculate (0-100) Returns: Value at the specified percentile """ if not sorted_values: return 0.0 if len(sorted_values) == 1: return sorted_values[0] # Use linear interpolation method k = (len(sorted_values) - 1) * (percentile / 100.0) f = int(k) c = f + 1 if c >= len(sorted_values): return sorted_values[-1] # Interpolate between floor and ceiling d0 = sorted_values[f] * (c - k) d1 = sorted_values[c] * (k - f) return d0 + d1 class MetricsCollector: """Collects and aggregates metrics for gateway operations. Thread-safe metrics collection with per-agent and per-operation tracking. Metrics are stored in memory and can be aggregated for monitoring. """ def __init__(self): """Initialize metrics collector with empty storage.""" # Overall metrics: operation -> OperationMetrics self._metrics: Dict[str, OperationMetrics] = {} # Per-agent metrics: agent_id -> operation -> OperationMetrics self._agent_metrics: Dict[str, Dict[str, OperationMetrics]] = {} # Lock for thread-safe operations self._lock = asyncio.Lock() async def record( self, agent_id: str, operation: str, latency_ms: float, is_error: bool = False ): """Record a single operation metric. Args: agent_id: Agent identifier operation: Operation name (list_servers, execute_tool, etc.) latency_ms: Operation latency in milliseconds is_error: Whether the operation resulted in an error """ async with self._lock: # Record overall metrics if operation not in self._metrics: self._metrics[operation] = OperationMetrics() self._metrics[operation].record(latency_ms, is_error) # Record per-agent metrics if agent_id not in self._agent_metrics: self._agent_metrics[agent_id] = {} if operation not in self._agent_metrics[agent_id]: self._agent_metrics[agent_id][operation] = OperationMetrics() self._agent_metrics[agent_id][operation].record(latency_ms, is_error) def record_sync( self, agent_id: str, operation: str, latency_ms: float, is_error: bool = False ): """Record a single operation metric (synchronous version). Note: This is not thread-safe. Use async record() for concurrent access. Args: agent_id: Agent identifier operation: Operation name (list_servers, execute_tool, etc.) latency_ms: Operation latency in milliseconds is_error: Whether the operation resulted in an error """ # Record overall metrics if operation not in self._metrics: self._metrics[operation] = OperationMetrics() self._metrics[operation].record(latency_ms, is_error) # Record per-agent metrics if agent_id not in self._agent_metrics: self._agent_metrics[agent_id] = {} if operation not in self._agent_metrics[agent_id]: self._agent_metrics[agent_id][operation] = OperationMetrics() self._agent_metrics[agent_id][operation].record(latency_ms, is_error) async def get_summary(self) -> dict: """Get overall summary of all operations. Returns: Dictionary mapping operation names to their summary statistics """ async with self._lock: return self._get_summary_internal() def get_summary_sync(self) -> dict: """Get overall summary of all operations (synchronous version). Returns: Dictionary mapping operation names to their summary statistics """ return self._get_summary_internal() def _get_summary_internal(self) -> dict: """Internal method to get summary without locking.""" return { operation: metrics.get_summary() for operation, metrics in self._metrics.items() } async def get_agent_summary(self, agent_id: str) -> dict: """Get summary for a specific agent. Args: agent_id: Agent identifier Returns: Dictionary mapping operation names to summary statistics for this agent, or empty dict if agent has no recorded metrics """ async with self._lock: return self._get_agent_summary_internal(agent_id) def get_agent_summary_sync(self, agent_id: str) -> dict: """Get summary for a specific agent (synchronous version). Args: agent_id: Agent identifier Returns: Dictionary mapping operation names to summary statistics for this agent, or empty dict if agent has no recorded metrics """ return self._get_agent_summary_internal(agent_id) def _get_agent_summary_internal(self, agent_id: str) -> dict: """Internal method to get agent summary without locking.""" if agent_id not in self._agent_metrics: return {} return { operation: metrics.get_summary() for operation, metrics in self._agent_metrics[agent_id].items() } async def get_operation_summary(self, operation: str) -> dict: """Get summary for a specific operation. Args: operation: Operation name Returns: Summary statistics for this operation, or empty metrics if not found """ async with self._lock: return self._get_operation_summary_internal(operation) def get_operation_summary_sync(self, operation: str) -> dict: """Get summary for a specific operation (synchronous version). Args: operation: Operation name Returns: Summary statistics for this operation, or empty metrics if not found """ return self._get_operation_summary_internal(operation) def _get_operation_summary_internal(self, operation: str) -> dict: """Internal method to get operation summary without locking.""" if operation not in self._metrics: return { "count": 0, "avg_latency_ms": 0.0, "p50_latency_ms": 0.0, "p95_latency_ms": 0.0, "p99_latency_ms": 0.0, "error_rate": 0.0 } return self._metrics[operation].get_summary() async def get_all_agents(self) -> List[str]: """Get list of all agents with recorded metrics. Returns: List of agent identifiers """ async with self._lock: return list(self._agent_metrics.keys()) def get_all_agents_sync(self) -> List[str]: """Get list of all agents with recorded metrics (synchronous version). Returns: List of agent identifiers """ return list(self._agent_metrics.keys()) async def reset(self): """Reset all metrics (useful for testing).""" async with self._lock: self._metrics.clear() self._agent_metrics.clear() def reset_sync(self): """Reset all metrics (synchronous version, useful for testing).""" self._metrics.clear() self._agent_metrics.clear()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server