mcp-dbutils

"""Resource monitoring statistics module""" import statistics import sys from dataclasses import dataclass from datetime import datetime from typing import List, Optional, Tuple @dataclass class ResourceStats: """Resource statistics tracking""" # Connection stats active_connections: int = 0 total_connections: int = 0 connection_start_time: Optional[datetime] = None # Query stats query_count: int = 0 last_query_time: Optional[datetime] = None # Error stats error_count: int = 0 last_error_time: Optional[datetime] = None error_types: Optional[dict[str, int]] = None # Resource stats estimated_memory: int = 0 # Performance monitoring query_durations: Optional[List[float]] = None # 查询执行时间列表 (秒) query_types: Optional[dict[str, int]] = None # 查询类型统计 (SELECT, EXPLAIN等) slow_queries: Optional[List[Tuple[str, float]]] = None # 慢查询记录 (SQL, 时间) peak_memory: int = 0 # 峰值内存使用 def __post_init__(self): """Initialize mutable defaults""" if self.error_types is None: self.error_types = {} if self.query_durations is None: self.query_durations = [] if self.query_types is None: self.query_types = {} if self.slow_queries is None: self.slow_queries = [] def record_connection_start(self): """Record new connection start""" self.active_connections += 1 self.total_connections += 1 self.connection_start_time = datetime.now() def record_connection_end(self): """Record connection end""" self.active_connections = max(0, self.active_connections - 1) def record_query(self): """Record query execution""" self.query_count += 1 self.last_query_time = datetime.now() def record_query_duration(self, sql: str, duration: float): """Record query execution time and type Args: sql: SQL query text duration: Execution time in seconds """ self.query_durations.append(duration) # Record query type query_type = sql.strip().split()[0].upper() self.query_types[query_type] = self.query_types.get(query_type, 0) + 1 # Record slow queries (over 100ms) if duration > 0.1: # 100ms # Keep at most 10 slow queries if len(self.slow_queries) >= 10: self.slow_queries.pop(0) self.slow_queries.append((sql[:100], duration)) # Truncate SQL to avoid excessive length def record_error(self, error_type: str): """Record error occurrence Args: error_type: Type/class name of the error """ self.error_count += 1 self.last_error_time = datetime.now() self.error_types[error_type] = self.error_types.get(error_type, 0) + 1 def update_memory_usage(self, obj: object): """Update estimated memory usage Args: obj: Object to estimate size for """ current_memory = sys.getsizeof(obj) self.estimated_memory = current_memory self.peak_memory = max(self.peak_memory, current_memory) def get_query_time_stats(self) -> dict: """Get query time statistics Returns: Dictionary with min, max, avg, median query times """ if not self.query_durations: return { "min": 0, "max": 0, "avg": 0, "median": 0 } return { "min": min(self.query_durations), "max": max(self.query_durations), "avg": sum(self.query_durations) / len(self.query_durations), "median": statistics.median(self.query_durations) if len(self.query_durations) > 0 else 0 } def get_performance_stats(self) -> str: """Get formatted performance statistics Returns: Formatted string with performance statistics """ stats = [] stats.append("Database Performance Statistics") stats.append("-----------------------------") stats.append(f"Query Count: {self.query_count}") # Query time statistics if self.query_durations: time_stats = self.get_query_time_stats() stats.append(f"Query Times: avg={time_stats['avg']*1000:.2f}ms, min={time_stats['min']*1000:.2f}ms, max={time_stats['max']*1000:.2f}ms, median={time_stats['median']*1000:.2f}ms") # Query type distribution if self.query_types: stats.append("Query Types:") for qtype, count in self.query_types.items(): percentage = (count / self.query_count) * 100 if self.query_count else 0 stats.append(f" - {qtype}: {count} ({percentage:.1f}%)") # Slow queries if self.slow_queries: stats.append("Slow Queries:") for sql, duration in self.slow_queries: stats.append(f" - {duration*1000:.2f}ms: {sql}...") # Error statistics if self.error_count > 0: error_rate = (self.error_count / self.query_count) * 100 if self.query_count else 0 stats.append(f"Error Rate: {error_rate:.2f}% ({self.error_count} errors)") stats.append("Error Types:") for etype, count in self.error_types.items(): stats.append(f" - {etype}: {count}") # Resource usage stats.append(f"Memory Usage: current={self.estimated_memory/1024:.2f}KB, peak={self.peak_memory/1024:.2f}KB") stats.append(f"Connections: active={self.active_connections}, total={self.total_connections}") return "\n".join(stats) def to_dict(self) -> dict: """Convert stats to dictionary for logging Returns: Dictionary of current statistics """ now = datetime.now() connection_duration = None if self.connection_start_time: connection_duration = (now - self.connection_start_time).total_seconds() time_stats = self.get_query_time_stats() return { "active_connections": self.active_connections, "total_connections": self.total_connections, "connection_duration": connection_duration, "query_count": self.query_count, "query_times_ms": { "min": time_stats["min"] * 1000 if self.query_durations else 0, "max": time_stats["max"] * 1000 if self.query_durations else 0, "avg": time_stats["avg"] * 1000 if self.query_durations else 0, "median": time_stats["median"] * 1000 if self.query_durations else 0 }, "query_types": self.query_types, "error_count": self.error_count, "error_types": self.error_types, "estimated_memory_bytes": self.estimated_memory, "peak_memory_bytes": self.peak_memory }