metrics.pyโข13.2 kB
#!/usr/bin/env python3
"""
Prometheus Metrics for AnyDocs MCP Server
Comprehensive monitoring and metrics collection for production deployment.
"""
import time
from typing import Dict, Optional, Any
from functools import wraps
from prometheus_client import (
Counter, Histogram, Gauge, Info, CollectorRegistry,
generate_latest, CONTENT_TYPE_LATEST, start_http_server
)
from .logging import get_logger
logger = get_logger(__name__)
class PrometheusMetrics:
"""Prometheus metrics collector for AnyDocs MCP Server."""
def __init__(self, registry: Optional[CollectorRegistry] = None):
"""Initialize Prometheus metrics."""
self.registry = registry or CollectorRegistry()
self._setup_metrics()
def _setup_metrics(self):
"""Set up all Prometheus metrics."""
# Application Info
self.app_info = Info(
'anydocs_mcp_info',
'AnyDocs MCP Server information',
registry=self.registry
)
# Request Metrics
self.request_count = Counter(
'anydocs_mcp_requests_total',
'Total number of requests',
['method', 'endpoint', 'status'],
registry=self.registry
)
self.request_duration = Histogram(
'anydocs_mcp_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint'],
registry=self.registry
)
# MCP Tool Metrics
self.tool_calls = Counter(
'anydocs_mcp_tool_calls_total',
'Total number of MCP tool calls',
['tool_name', 'status'],
registry=self.registry
)
self.tool_duration = Histogram(
'anydocs_mcp_tool_duration_seconds',
'MCP tool execution duration',
['tool_name'],
registry=self.registry
)
# Document Metrics
self.documents_indexed = Gauge(
'anydocs_mcp_documents_indexed_total',
'Total number of indexed documents',
['source'],
registry=self.registry
)
self.document_searches = Counter(
'anydocs_mcp_document_searches_total',
'Total number of document searches',
['source'],
registry=self.registry
)
self.document_retrievals = Counter(
'anydocs_mcp_document_retrievals_total',
'Total number of document retrievals',
['source'],
registry=self.registry
)
# Database Metrics
self.db_connections = Gauge(
'anydocs_mcp_db_connections_active',
'Active database connections',
registry=self.registry
)
self.db_query_duration = Histogram(
'anydocs_mcp_db_query_duration_seconds',
'Database query duration',
['operation'],
registry=self.registry
)
self.db_operations = Counter(
'anydocs_mcp_db_operations_total',
'Total database operations',
['operation', 'status'],
registry=self.registry
)
# Adapter Metrics
self.adapter_operations = Counter(
'anydocs_mcp_adapter_operations_total',
'Total adapter operations',
['adapter_type', 'operation', 'status'],
registry=self.registry
)
self.adapter_sync_duration = Histogram(
'anydocs_mcp_adapter_sync_duration_seconds',
'Adapter sync duration',
['adapter_type'],
registry=self.registry
)
# System Metrics
self.memory_usage = Gauge(
'anydocs_mcp_memory_usage_bytes',
'Memory usage in bytes',
registry=self.registry
)
self.cpu_usage = Gauge(
'anydocs_mcp_cpu_usage_percent',
'CPU usage percentage',
registry=self.registry
)
# Error Metrics
self.errors_total = Counter(
'anydocs_mcp_errors_total',
'Total number of errors',
['component', 'error_type'],
registry=self.registry
)
# Cache Metrics
self.cache_hits = Counter(
'anydocs_mcp_cache_hits_total',
'Total cache hits',
['cache_type'],
registry=self.registry
)
self.cache_misses = Counter(
'anydocs_mcp_cache_misses_total',
'Total cache misses',
['cache_type'],
registry=self.registry
)
# Initialize app info
self.app_info.info({
'version': '2.0.0',
'name': 'anydocs-mcp',
'description': 'Model Context Protocol server for documentation'
})
def record_request(self, method: str, endpoint: str, status: str, duration: float):
"""Record HTTP request metrics."""
self.request_count.labels(method=method, endpoint=endpoint, status=status).inc()
self.request_duration.labels(method=method, endpoint=endpoint).observe(duration)
def record_tool_call(self, tool_name: str, status: str, duration: float):
"""Record MCP tool call metrics."""
self.tool_calls.labels(tool_name=tool_name, status=status).inc()
self.tool_duration.labels(tool_name=tool_name).observe(duration)
def record_document_search(self, source: Optional[str] = None):
"""Record document search operation."""
self.document_searches.labels(source=source or 'all').inc()
def record_document_retrieval(self, source: str):
"""Record document retrieval operation."""
self.document_retrievals.labels(source=source).inc()
def update_documents_indexed(self, source: str, count: int):
"""Update indexed documents count."""
self.documents_indexed.labels(source=source).set(count)
def record_db_operation(self, operation: str, status: str, duration: float):
"""Record database operation metrics."""
self.db_operations.labels(operation=operation, status=status).inc()
self.db_query_duration.labels(operation=operation).observe(duration)
def update_db_connections(self, count: int):
"""Update active database connections."""
self.db_connections.set(count)
def record_adapter_operation(self, adapter_type: str, operation: str, status: str):
"""Record adapter operation."""
self.adapter_operations.labels(
adapter_type=adapter_type,
operation=operation,
status=status
).inc()
def record_adapter_sync(self, adapter_type: str, duration: float):
"""Record adapter sync duration."""
self.adapter_sync_duration.labels(adapter_type=adapter_type).observe(duration)
def update_system_metrics(self, memory_bytes: float, cpu_percent: float):
"""Update system resource metrics."""
self.memory_usage.set(memory_bytes)
self.cpu_usage.set(cpu_percent)
def record_error(self, component: str, error_type: str):
"""Record error occurrence."""
self.errors_total.labels(component=component, error_type=error_type).inc()
def record_cache_hit(self, cache_type: str):
"""Record cache hit."""
self.cache_hits.labels(cache_type=cache_type).inc()
def record_cache_miss(self, cache_type: str):
"""Record cache miss."""
self.cache_misses.labels(cache_type=cache_type).inc()
def get_metrics(self) -> str:
"""Get metrics in Prometheus format."""
return generate_latest(self.registry).decode('utf-8')
def get_content_type(self) -> str:
"""Get Prometheus content type."""
return CONTENT_TYPE_LATEST
# Global metrics instance
_metrics_instance: Optional[PrometheusMetrics] = None
def get_metrics() -> PrometheusMetrics:
"""Get global metrics instance."""
global _metrics_instance
if _metrics_instance is None:
_metrics_instance = PrometheusMetrics()
return _metrics_instance
def metrics_middleware(func):
"""Decorator for automatic metrics collection."""
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
metrics = get_metrics()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# Record success
if hasattr(func, '__name__'):
if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')):
# MCP tool call
tool_name = func.__name__.replace('_', '').replace('tool', '')
metrics.record_tool_call(tool_name, 'success', duration)
else:
# Regular function call
metrics.record_request('CALL', func.__name__, 'success', duration)
return result
except Exception as e:
duration = time.time() - start_time
# Record error
if hasattr(func, '__name__'):
metrics.record_error(func.__name__, type(e).__name__)
if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')):
tool_name = func.__name__.replace('_', '').replace('tool', '')
metrics.record_tool_call(tool_name, 'error', duration)
else:
metrics.record_request('CALL', func.__name__, 'error', duration)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
metrics = get_metrics()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
# Record success
if hasattr(func, '__name__'):
if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')):
# MCP tool call
tool_name = func.__name__.replace('_', '').replace('tool', '')
metrics.record_tool_call(tool_name, 'success', duration)
else:
# Regular function call
metrics.record_request('CALL', func.__name__, 'success', duration)
return result
except Exception as e:
duration = time.time() - start_time
# Record error
if hasattr(func, '__name__'):
metrics.record_error(func.__name__, type(e).__name__)
if func.__name__.startswith('_') and ('tool' in func.__name__ or func.__name__.endswith('_tool')):
tool_name = func.__name__.replace('_', '').replace('tool', '')
metrics.record_tool_call(tool_name, 'error', duration)
else:
metrics.record_request('CALL', func.__name__, 'error', duration)
raise
# Return appropriate wrapper based on function type
if hasattr(func, '__code__') and func.__code__.co_flags & 0x80: # CO_COROUTINE
return async_wrapper
else:
return sync_wrapper
class MetricsCollector:
"""Background metrics collector for system resources."""
def __init__(self, metrics: PrometheusMetrics):
self.metrics = metrics
self._running = False
async def start_collection(self, interval: int = 30):
"""Start background metrics collection."""
import asyncio
import psutil
self._running = True
logger.info(f"Starting metrics collection with {interval}s interval")
while self._running:
try:
# Collect system metrics
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics.update_system_metrics(
memory_bytes=memory.used,
cpu_percent=cpu_percent
)
await asyncio.sleep(interval)
except Exception as e:
logger.error(f"Error collecting system metrics: {e}")
await asyncio.sleep(interval)
def stop_collection(self):
"""Stop background metrics collection."""
self._running = False
logger.info("Stopped metrics collection")
def start_metrics_server(port: int = 8001, host: str = '0.0.0.0'):
"""Start Prometheus metrics HTTP server."""
try:
start_http_server(port, addr=host, registry=get_metrics().registry)
logger.info(f"Prometheus metrics server started on {host}:{port}")
except Exception as e:
logger.error(f"Failed to start metrics server: {e}")
raise