Skip to main content
Glama
metrics.py10.5 kB
""" OpenTelemetry metrics collection for MCP server operations Provides business-specific metrics for monitoring MCP tool usage, performance, and operational health. """ import time from typing import Optional, Dict, Any from src.logging import get_logger logger = get_logger('TELEMETRY_METRICS') # Global metrics state _meter = None _metrics_enabled = False # Metric instruments _tool_invocation_counter = None _tool_duration_histogram = None _api_request_counter = None _api_duration_histogram = None _database_query_counter = None _database_duration_histogram = None _error_counter = None def initialize_metrics(): """Initialize OpenTelemetry metrics instruments.""" global _meter, _metrics_enabled global _tool_invocation_counter, _tool_duration_histogram global _api_request_counter, _api_duration_histogram global _database_query_counter, _database_duration_histogram global _error_counter try: from src.telemetry.config import get_meter _meter = get_meter() if not _meter: logger.debug("metrics not available | meter not initialized") return False # MCP Tool metrics _tool_invocation_counter = _meter.create_counter( name="mcp_tool_invocations_total", description="Total number of MCP tool invocations", unit="1" ) _tool_duration_histogram = _meter.create_histogram( name="mcp_tool_duration_seconds", description="Duration of MCP tool executions", unit="s" ) # Observe API metrics _api_request_counter = _meter.create_counter( name="observe_api_requests_total", description="Total number of Observe API requests", unit="1" ) _api_duration_histogram = _meter.create_histogram( name="observe_api_duration_seconds", description="Duration of Observe API requests", unit="s" ) # Database metrics _database_query_counter = _meter.create_counter( name="database_queries_total", description="Total number of database queries", unit="1" ) _database_duration_histogram = _meter.create_histogram( name="database_query_duration_seconds", description="Duration of database queries", unit="s" ) # Error metrics _error_counter = _meter.create_counter( name="mcp_errors_total", description="Total number of errors by type", unit="1" ) _metrics_enabled = True logger.info("metrics initialization complete") return True except ImportError: logger.debug("metrics not available | opentelemetry not installed") return False except Exception as e: logger.error(f"metrics initialization failed | error: {e}") return False def record_tool_invocation(tool_name: str, duration: float, success: bool, **attributes): """ Record metrics for MCP tool invocations. Args: tool_name: Name of the MCP tool duration: Execution duration in seconds success: Whether the invocation was successful **attributes: Additional attributes to record """ if not _metrics_enabled or not _tool_invocation_counter: return try: # Basic attributes metric_attributes = { "tool_name": tool_name, "status": "success" if success else "error" } # Add optional attributes if attributes: for key, value in attributes.items(): if isinstance(value, (str, int, float, bool)): metric_attributes[f"tool.{key}"] = str(value) # Record metrics _tool_invocation_counter.add(1, metric_attributes) _tool_duration_histogram.record(duration, metric_attributes) logger.debug(f"recorded tool metrics | tool:{tool_name} | duration:{duration:.3f}s | success:{success}") except Exception as e: logger.debug(f"failed to record tool metrics | error: {e}") def record_api_request(endpoint: str, method: str, status_code: int, duration: float, **attributes): """ Record metrics for Observe API requests. Args: endpoint: API endpoint method: HTTP method status_code: HTTP status code duration: Request duration in seconds **attributes: Additional attributes """ if not _metrics_enabled or not _api_request_counter: return try: metric_attributes = { "endpoint": endpoint, "method": method, "status_code": str(status_code), "status": "success" if status_code < 400 else "error" } # Add optional attributes for key, value in attributes.items(): if isinstance(value, (str, int, float, bool)): metric_attributes[f"api.{key}"] = str(value) _api_request_counter.add(1, metric_attributes) _api_duration_histogram.record(duration, metric_attributes) logger.debug(f"recorded API metrics | endpoint:{endpoint} | status:{status_code} | duration:{duration:.3f}s") except Exception as e: logger.debug(f"failed to record API metrics | error: {e}") def record_database_query(operation: str, table: str, duration: float, success: bool, row_count: Optional[int] = None): """ Record metrics for database queries. Args: operation: Database operation type table: Table name duration: Query duration in seconds success: Whether the query was successful row_count: Number of rows returned/affected """ if not _metrics_enabled or not _database_query_counter: return try: metric_attributes = { "operation": operation, "table": table, "status": "success" if success else "error" } if row_count is not None: metric_attributes["rows"] = str(row_count) _database_query_counter.add(1, metric_attributes) _database_duration_histogram.record(duration, metric_attributes) logger.debug(f"recorded DB metrics | operation:{operation} | table:{table} | duration:{duration:.3f}s | rows:{row_count}") except Exception as e: logger.debug(f"failed to record DB metrics | error: {e}") def record_error(error_type: str, operation: str, **attributes): """ Record error occurrences. Args: error_type: Type/category of error operation: Operation where error occurred **attributes: Additional error context """ if not _metrics_enabled or not _error_counter: return try: metric_attributes = { "error_type": error_type, "operation": operation } for key, value in attributes.items(): if isinstance(value, (str, int, float, bool)): metric_attributes[f"error.{key}"] = str(value) _error_counter.add(1, metric_attributes) logger.debug(f"recorded error metric | type:{error_type} | operation:{operation}") except Exception as e: logger.debug(f"failed to record error metric | error: {e}") class MetricsTimer: """Context manager for timing operations and recording metrics.""" def __init__(self, metric_type: str, operation: str, **attributes): """ Initialize metrics timer. Args: metric_type: Type of metric ("tool", "api", "database") operation: Operation being timed **attributes: Additional attributes to record """ self.metric_type = metric_type self.operation = operation self.attributes = attributes self.start_time = None self.success = True def __enter__(self): """Start timing.""" self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): """Stop timing and record metrics.""" if self.start_time is None: return duration = time.time() - self.start_time # Mark as failed if exception occurred if exc_type is not None: self.success = False # Record appropriate metric type try: if self.metric_type == "tool": record_tool_invocation(self.operation, duration, self.success, **self.attributes) elif self.metric_type == "api": endpoint = self.attributes.get("endpoint", self.operation) method = self.attributes.get("method", "GET") status_code = self.attributes.get("status_code", 500 if not self.success else 200) record_api_request(endpoint, method, status_code, duration, **self.attributes) elif self.metric_type == "database": table = self.attributes.get("table", "unknown") row_count = self.attributes.get("row_count") record_database_query(self.operation, table, duration, self.success, row_count) if not self.success and exc_type: record_error(exc_type.__name__, self.operation, **self.attributes) except Exception as e: logger.debug(f"failed to record metrics in timer | error: {e}") def set_attribute(self, key: str, value: Any): """Set an attribute to be recorded with metrics.""" self.attributes[key] = value def mark_success(self, success: bool = True): """Manually mark operation as successful or failed.""" self.success = success def get_metrics_status() -> Dict[str, Any]: """ Get the current metrics system status. Returns: Dictionary with metrics status information """ return { "enabled": _metrics_enabled, "meter_available": _meter is not None, "instruments": { "tool_invocation_counter": _tool_invocation_counter is not None, "tool_duration_histogram": _tool_duration_histogram is not None, "api_request_counter": _api_request_counter is not None, "api_duration_histogram": _api_duration_histogram is not None, "database_query_counter": _database_query_counter is not None, "database_duration_histogram": _database_duration_histogram is not None, "error_counter": _error_counter is not None } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server