Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
database_monitoring.py16.3 kB
""" Database and Redis monitoring utilities for MaverickMCP. This module provides comprehensive monitoring for: - SQLAlchemy database connection pools - Database query performance - Redis connection pools and operations - Cache hit rates and performance metrics """ import asyncio import time from contextlib import asynccontextmanager, contextmanager from typing import Any from sqlalchemy.event import listen from sqlalchemy.pool import Pool from maverick_mcp.utils.logging import get_logger from maverick_mcp.utils.monitoring import ( redis_connections, redis_memory_usage, track_cache_operation, track_database_connection_event, track_database_query, track_redis_operation, update_database_metrics, update_redis_metrics, ) from maverick_mcp.utils.tracing import trace_cache_operation, trace_database_query logger = get_logger(__name__) class DatabaseMonitor: """Monitor for SQLAlchemy database operations and connection pools.""" def __init__(self, engine=None): self.engine = engine self.query_stats = {} self._setup_event_listeners() def _setup_event_listeners(self): """Set up SQLAlchemy event listeners for monitoring.""" if not self.engine: return # Connection pool events listen(Pool, "connect", self._on_connection_created) listen(Pool, "checkout", self._on_connection_checkout) listen(Pool, "checkin", self._on_connection_checkin) listen(Pool, "close", self._on_connection_closed) # Query execution events listen(self.engine, "before_cursor_execute", self._on_before_query) listen(self.engine, "after_cursor_execute", self._on_after_query) def _on_connection_created(self, dbapi_connection, connection_record): """Handle new database connection creation.""" track_database_connection_event("created") logger.debug("Database connection created") def _on_connection_checkout( self, dbapi_connection, connection_record, connection_proxy ): """Handle connection checkout from pool.""" # Update connection metrics pool = self.engine.pool self._update_pool_metrics(pool) def _on_connection_checkin(self, dbapi_connection, connection_record): """Handle connection checkin to pool.""" # Update connection metrics pool = self.engine.pool self._update_pool_metrics(pool) def _on_connection_closed(self, dbapi_connection, connection_record): """Handle connection closure.""" track_database_connection_event("closed", "normal") logger.debug("Database connection closed") def _on_before_query( self, conn, cursor, statement, parameters, context, executemany ): """Handle query execution start.""" context._query_start_time = time.time() context._query_statement = statement def _on_after_query( self, conn, cursor, statement, parameters, context, executemany ): """Handle query execution completion.""" if hasattr(context, "_query_start_time"): duration = time.time() - context._query_start_time query_type = self._extract_query_type(statement) table = self._extract_table_name(statement) # Track metrics track_database_query(query_type, table, duration, "success") # Log slow queries if duration > 1.0: # Queries over 1 second logger.warning( "Slow database query detected", extra={ "query_type": query_type, "table": table, "duration_seconds": duration, "statement": statement[:200] + "..." if len(statement) > 200 else statement, }, ) def _update_pool_metrics(self, pool): """Update connection pool metrics.""" try: pool_size = pool.size() checked_out = pool.checkedout() checked_in = pool.checkedin() update_database_metrics( pool_size=pool_size, active_connections=checked_out, idle_connections=checked_in, ) except Exception as e: logger.warning(f"Failed to update pool metrics: {e}") def _extract_query_type(self, statement: str) -> str: """Extract query type from SQL statement.""" statement_upper = statement.strip().upper() if statement_upper.startswith("SELECT"): return "SELECT" elif statement_upper.startswith("INSERT"): return "INSERT" elif statement_upper.startswith("UPDATE"): return "UPDATE" elif statement_upper.startswith("DELETE"): return "DELETE" elif statement_upper.startswith("CREATE"): return "CREATE" elif statement_upper.startswith("DROP"): return "DROP" elif statement_upper.startswith("ALTER"): return "ALTER" else: return "OTHER" def _extract_table_name(self, statement: str) -> str | None: """Extract table name from SQL statement.""" import re # Simple regex to extract table names patterns = [ r"FROM\s+([a-zA-Z_][a-zA-Z0-9_]*)", # SELECT FROM table r"INTO\s+([a-zA-Z_][a-zA-Z0-9_]*)", # INSERT INTO table r"UPDATE\s+([a-zA-Z_][a-zA-Z0-9_]*)", # UPDATE table r"DELETE\s+FROM\s+([a-zA-Z_][a-zA-Z0-9_]*)", # DELETE FROM table ] for pattern in patterns: match = re.search(pattern, statement.upper()) if match: return match.group(1).lower() return "unknown" @contextmanager def trace_query(self, query_type: str, table: str | None = None): """Context manager for tracing database queries.""" with trace_database_query(query_type, table) as span: start_time = time.time() try: yield span duration = time.time() - start_time track_database_query( query_type, table or "unknown", duration, "success" ) except Exception: duration = time.time() - start_time track_database_query(query_type, table or "unknown", duration, "error") raise def get_pool_status(self) -> dict[str, Any]: """Get current database pool status.""" if not self.engine: return {} try: pool = self.engine.pool return { "pool_size": pool.size(), "checked_out": pool.checkedout(), "checked_in": pool.checkedin(), "overflow": pool.overflow(), "invalid": pool.invalid(), } except Exception as e: logger.error(f"Failed to get pool status: {e}") return {} class RedisMonitor: """Monitor for Redis operations and connection pools.""" def __init__(self, redis_client=None): self.redis_client = redis_client self.operation_stats = {} @asynccontextmanager async def trace_operation(self, operation: str, key: str | None = None): """Context manager for tracing Redis operations.""" with trace_cache_operation(operation, "redis") as span: start_time = time.time() if span and key: span.set_attribute("redis.key", key) try: yield span duration = time.time() - start_time track_redis_operation(operation, duration, "success") except Exception as e: duration = time.time() - start_time track_redis_operation(operation, duration, "error") if span: span.record_exception(e) logger.error( f"Redis operation failed: {operation}", extra={ "operation": operation, "key": key, "duration_seconds": duration, "error": str(e), }, ) raise async def monitor_get(self, key: str): """Monitor Redis GET operation.""" async with self.trace_operation("get", key): try: result = await self.redis_client.get(key) hit = result is not None track_cache_operation("redis", "get", hit, self._get_key_prefix(key)) return result except Exception: track_cache_operation("redis", "get", False, self._get_key_prefix(key)) raise async def monitor_set(self, key: str, value: Any, **kwargs): """Monitor Redis SET operation.""" async with self.trace_operation("set", key): return await self.redis_client.set(key, value, **kwargs) async def monitor_delete(self, key: str): """Monitor Redis DELETE operation.""" async with self.trace_operation("delete", key): return await self.redis_client.delete(key) async def monitor_exists(self, key: str): """Monitor Redis EXISTS operation.""" async with self.trace_operation("exists", key): return await self.redis_client.exists(key) async def update_redis_metrics(self): """Update Redis metrics from server info.""" if not self.redis_client: return try: info = await self.redis_client.info() # Connection metrics connected_clients = info.get("connected_clients", 0) redis_connections.set(connected_clients) # Memory metrics used_memory = info.get("used_memory", 0) redis_memory_usage.set(used_memory) # Keyspace metrics keyspace_hits = info.get("keyspace_hits", 0) keyspace_misses = info.get("keyspace_misses", 0) # Update counters (these are cumulative, so we track the difference) update_redis_metrics( connections=connected_clients, memory_bytes=used_memory, hits=0, # Will be updated by individual operations misses=0, # Will be updated by individual operations ) logger.debug( "Redis metrics updated", extra={ "connected_clients": connected_clients, "used_memory_mb": used_memory / 1024 / 1024, "keyspace_hits": keyspace_hits, "keyspace_misses": keyspace_misses, }, ) except Exception as e: logger.error(f"Failed to update Redis metrics: {e}") def _get_key_prefix(self, key: str) -> str: """Extract key prefix for metrics grouping.""" if ":" in key: return key.split(":")[0] return "other" async def get_redis_info(self) -> dict[str, Any]: """Get Redis server information.""" if not self.redis_client: return {} try: info = await self.redis_client.info() return { "connected_clients": info.get("connected_clients", 0), "used_memory": info.get("used_memory", 0), "used_memory_human": info.get("used_memory_human", "0B"), "keyspace_hits": info.get("keyspace_hits", 0), "keyspace_misses": info.get("keyspace_misses", 0), "total_commands_processed": info.get("total_commands_processed", 0), "uptime_in_seconds": info.get("uptime_in_seconds", 0), } except Exception as e: logger.error(f"Failed to get Redis info: {e}") return {} class CacheMonitor: """High-level cache monitoring that supports multiple backends.""" def __init__(self, redis_monitor: RedisMonitor | None = None): self.redis_monitor = redis_monitor @contextmanager def monitor_operation(self, cache_type: str, operation: str, key: str): """Monitor cache operation across different backends.""" start_time = time.time() hit = False try: yield hit = True # If no exception, assume it was a hit for GET operations except Exception as e: logger.error( f"Cache operation failed: {cache_type} {operation}", extra={ "cache_type": cache_type, "operation": operation, "key": key, "error": str(e), }, ) raise finally: duration = time.time() - start_time # Track metrics based on operation if operation in ["get", "exists"]: track_cache_operation( cache_type, operation, hit, self._get_key_prefix(key) ) # Log slow cache operations if duration > 0.1: # Operations over 100ms logger.warning( f"Slow cache operation: {cache_type} {operation}", extra={ "cache_type": cache_type, "operation": operation, "key": key, "duration_seconds": duration, }, ) def _get_key_prefix(self, key: str) -> str: """Extract key prefix for metrics grouping.""" if ":" in key: return key.split(":")[0] return "other" async def update_all_metrics(self): """Update metrics for all monitored cache backends.""" tasks = [] if self.redis_monitor: tasks.append(self.redis_monitor.update_redis_metrics()) if tasks: try: await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: logger.error(f"Failed to update cache metrics: {e}") # Global monitor instances _database_monitor: DatabaseMonitor | None = None _redis_monitor: RedisMonitor | None = None _cache_monitor: CacheMonitor | None = None def get_database_monitor(engine=None) -> DatabaseMonitor: """Get or create the global database monitor.""" global _database_monitor if _database_monitor is None: _database_monitor = DatabaseMonitor(engine) return _database_monitor def get_redis_monitor(redis_client=None) -> RedisMonitor: """Get or create the global Redis monitor.""" global _redis_monitor if _redis_monitor is None: _redis_monitor = RedisMonitor(redis_client) return _redis_monitor def get_cache_monitor() -> CacheMonitor: """Get or create the global cache monitor.""" global _cache_monitor if _cache_monitor is None: redis_monitor = get_redis_monitor() _cache_monitor = CacheMonitor(redis_monitor) return _cache_monitor def initialize_database_monitoring(engine): """Initialize database monitoring with the given engine.""" logger.info("Initializing database monitoring...") monitor = get_database_monitor(engine) logger.info("Database monitoring initialized") return monitor def initialize_redis_monitoring(redis_client): """Initialize Redis monitoring with the given client.""" logger.info("Initializing Redis monitoring...") monitor = get_redis_monitor(redis_client) logger.info("Redis monitoring initialized") return monitor async def start_periodic_metrics_collection(interval: int = 30): """Start periodic collection of database and cache metrics.""" logger.info(f"Starting periodic metrics collection (interval: {interval}s)") cache_monitor = get_cache_monitor() while True: try: await cache_monitor.update_all_metrics() except Exception as e: logger.error(f"Error in periodic metrics collection: {e}") await asyncio.sleep(interval)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server