Skip to main content
Glama

AnyDocs MCP Server

by funky1688
metrics.pyโ€ข13.2 kB
#!/usr/bin/env python3 """ Prometheus Metrics for AnyDocs MCP Server Comprehensive monitoring and metrics collection for production deployment. """ import time from typing import Dict, Optional, Any from functools import wraps from prometheus_client import ( Counter, Histogram, Gauge, Info, CollectorRegistry, generate_latest, CONTENT_TYPE_LATEST, start_http_server ) from .logging import get_logger logger = get_logger(__name__) class PrometheusMetrics: """Prometheus metrics collector for AnyDocs MCP Server.""" def __init__(self, registry: Optional[CollectorRegistry] = None): """Initialize Prometheus metrics.""" self.registry = registry or CollectorRegistry() self._setup_metrics() def _setup_metrics(self): """Set up all Prometheus metrics.""" # Application Info self.app_info = Info( 'anydocs_mcp_info', 'AnyDocs MCP Server information', registry=self.registry ) # Request Metrics self.request_count = Counter( 'anydocs_mcp_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'], registry=self.registry ) self.request_duration = Histogram( 'anydocs_mcp_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'], registry=self.registry ) # MCP Tool Metrics self.tool_calls = Counter( 'anydocs_mcp_tool_calls_total', 'Total number of MCP tool calls', ['tool_name', 'status'], registry=self.registry ) self.tool_duration = Histogram( 'anydocs_mcp_tool_duration_seconds', 'MCP tool execution duration', ['tool_name'], registry=self.registry ) # Document Metrics self.documents_indexed = Gauge( 'anydocs_mcp_documents_indexed_total', 'Total number of indexed documents', ['source'], registry=self.registry ) self.document_searches = Counter( 'anydocs_mcp_document_searches_total', 'Total number of document searches', ['source'], registry=self.registry ) self.document_retrievals = Counter( 'anydocs_mcp_document_retrievals_total', 'Total number of document retrievals', ['source'], registry=self.registry ) # Database Metrics self.db_connections = Gauge( 'anydocs_mcp_db_connections_active', 'Active database connections', registry=self.registry ) self.db_query_duration = Histogram( 'anydocs_mcp_db_query_duration_seconds', 'Database query duration', ['operation'], registry=self.registry ) self.db_operations = Counter( 'anydocs_mcp_db_operations_total', 'Total database operations', ['operation', 'status'], registry=self.registry ) # Adapter Metrics self.adapter_operations = Counter( 'anydocs_mcp_adapter_operations_total', 'Total adapter operations', ['adapter_type', 'operation', 'status'], registry=self.registry ) self.adapter_sync_duration = Histogram( 'anydocs_mcp_adapter_sync_duration_seconds', 'Adapter sync duration', ['adapter_type'], registry=self.registry ) # System Metrics self.memory_usage = Gauge( 'anydocs_mcp_memory_usage_bytes', 'Memory usage in bytes', registry=self.registry ) self.cpu_usage = Gauge( 'anydocs_mcp_cpu_usage_percent', 'CPU usage percentage', registry=self.registry ) # Error Metrics self.errors_total = Counter( 'anydocs_mcp_errors_total', 'Total number of errors', ['component', 'error_type'], registry=self.registry ) # Cache Metrics self.cache_hits = Counter( 'anydocs_mcp_cache_hits_total', 'Total cache hits', ['cache_type'], registry=self.registry ) self.cache_misses = Counter( 'anydocs_mcp_cache_misses_total', 'Total cache misses', ['cache_type'], registry=self.registry ) # Initialize app info self.app_info.info({ 'version': '2.0.0', 'name': 'anydocs-mcp', 'description': 'Model Context Protocol server for documentation' }) def record_request(self, method: str, endpoint: str, status: str, duration: float): """Record HTTP request metrics.""" self.request_count.labels(method=method, endpoint=endpoint, status=status).inc() self.request_duration.labels(method=method, endpoint=endpoint).observe(duration) def record_tool_call(self, tool_name: str, status: str, duration: float): """Record MCP tool call metrics.""" self.tool_calls.labels(tool_name=tool_name, status=status).inc() self.tool_duration.labels(tool_name=tool_name).observe(duration) def record_document_search(self, source: Optional[str] = None): """Record document search operation.""" self.document_searches.labels(source=source or 'all').inc() def record_document_retrieval(self, source: str): """Record document retrieval operation.""" self.document_retrievals.labels(source=source).inc() def update_documents_indexed(self, source: str, count: int): """Update indexed documents count.""" self.documents_indexed.labels(source=source).set(count) def record_db_operation(self, operation: str, status: str, duration: float): """Record database operation metrics.""" self.db_operations.labels(operation=operation, status=status).inc() self.db_query_duration.labels(operation=operation).observe(duration) def update_db_connections(self, count: int): """Update active database connections.""" self.db_connections.set(count) def record_adapter_operation(self, adapter_type: str, operation: str, status: str): """Record adapter operation.""" self.adapter_operations.labels( adapter_type=adapter_type, operation=operation, status=status ).inc() def record_adapter_sync(self, adapter_type: str, duration: float): """Record adapter sync duration.""" self.adapter_sync_duration.labels(adapter_type=adapter_type).observe(duration) def update_system_metrics(self, memory_bytes: float, cpu_percent: float): """Update system resource metrics.""" self.memory_usage.set(memory_bytes) self.cpu_usage.set(cpu_percent) def record_error(self, component: str, error_type: str): """Record error occurrence.""" self.errors_total.labels(component=component, error_type=error_type).inc() def record_cache_hit(self, cache_type: str): """Record cache hit.""" self.cache_hits.labels(cache_type=cache_type).inc() def record_cache_miss(self, cache_type: str): """Record cache miss.""" self.cache_misses.labels(cache_type=cache_type).inc() def get_metrics(self) -> str: """Get metrics in Prometheus format.""" return generate_latest(self.registry).decode('utf-8') def get_content_type(self) -> str: """Get Prometheus content type.""" return CONTENT_TYPE_LATEST # Global metrics instance _metrics_instance: Optional[PrometheusMetrics] = None def get_metrics() -> PrometheusMetrics: """Get global metrics instance.""" global _metrics_instance if _metrics_instance is None: _metrics_instance = PrometheusMetrics() return _metrics_instance def metrics_middleware(func): """Decorator for automatic metrics collection.""" @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() metrics = get_metrics() try: result = await func(*args, **kwargs) duration = time.time() - start_time # Record success if hasattr(func, '__name__'): if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')): # MCP tool call tool_name = func.__name__.replace('_', '').replace('tool', '') metrics.record_tool_call(tool_name, 'success', duration) else: # Regular function call metrics.record_request('CALL', func.__name__, 'success', duration) return result except Exception as e: duration = time.time() - start_time # Record error if hasattr(func, '__name__'): metrics.record_error(func.__name__, type(e).__name__) if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')): tool_name = func.__name__.replace('_', '').replace('tool', '') metrics.record_tool_call(tool_name, 'error', duration) else: metrics.record_request('CALL', func.__name__, 'error', duration) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() metrics = get_metrics() try: result = func(*args, **kwargs) duration = time.time() - start_time # Record success if hasattr(func, '__name__'): if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')): # MCP tool call tool_name = func.__name__.replace('_', '').replace('tool', '') metrics.record_tool_call(tool_name, 'success', duration) else: # Regular function call metrics.record_request('CALL', func.__name__, 'success', duration) return result except Exception as e: duration = time.time() - start_time # Record error if hasattr(func, '__name__'): metrics.record_error(func.__name__, type(e).__name__) if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')): tool_name = func.__name__.replace('_', '').replace('tool', '') metrics.record_tool_call(tool_name, 'error', duration) else: metrics.record_request('CALL', func.__name__, 'error', duration) raise # Return appropriate wrapper based on function type if hasattr(func, '__code__') and func.__code__.co_flags & 0x80: # CO_COROUTINE return async_wrapper else: return sync_wrapper class MetricsCollector: """Background metrics collector for system resources.""" def __init__(self, metrics: PrometheusMetrics): self.metrics = metrics self._running = False async def start_collection(self, interval: int = 30): """Start background metrics collection.""" import asyncio import psutil self._running = True logger.info(f"Starting metrics collection with {interval}s interval") while self._running: try: # Collect system metrics memory = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) self.metrics.update_system_metrics( memory_bytes=memory.used, cpu_percent=cpu_percent ) await asyncio.sleep(interval) except Exception as e: logger.error(f"Error collecting system metrics: {e}") await asyncio.sleep(interval) def stop_collection(self): """Stop background metrics collection.""" self._running = False logger.info("Stopped metrics collection") def start_metrics_server(port: int = 8001, host: str = '0.0.0.0'): """Start Prometheus metrics HTTP server.""" try: start_http_server(port, addr=host, registry=get_metrics().registry) logger.info(f"Prometheus metrics server started on {host}:{port}") except Exception as e: logger.error(f"Failed to start metrics server: {e}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server