"""
Performance monitoring and metrics collection
Tracks system performance, query execution times, and resource usage
"""
import time
import logging
import asyncio
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict, deque
import threading
import psutil
import json
logger = logging.getLogger(__name__)
@dataclass
class QueryMetrics:
"""Metrics for a single query execution"""
query_id: str
query_type: str # sql, semantic, smart_search
execution_time: float
success: bool
error_type: Optional[str] = None
result_count: int = 0
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class SystemMetrics:
"""System-wide performance metrics"""
cpu_percent: float
memory_percent: float
disk_usage_percent: float
active_connections: int
timestamp: datetime = field(default_factory=datetime.now)
class PerformanceMonitor:
"""
Comprehensive performance monitoring system
Tracks queries, system resources, and provides analytics
"""
def __init__(self, max_history: int = 10000):
self.max_history = max_history
self.query_history: deque = deque(maxlen=max_history)
self.system_history: deque = deque(maxlen=max_history)
# Real-time metrics
self.active_queries: Dict[str, datetime] = {}
self.query_counts: Dict[str, int] = defaultdict(int)
self.error_counts: Dict[str, int] = defaultdict(int)
# Performance thresholds
self.slow_query_threshold = 5.0 # seconds
self.high_cpu_threshold = 80.0 # percent
self.high_memory_threshold = 85.0 # percent
# Threading for background monitoring
self.monitoring_active = False
self.monitor_thread: Optional[threading.Thread] = None
# Alerts
self.alert_callbacks: List[callable] = []
def start_monitoring(self, interval: float = 30.0):
"""Start background system monitoring"""
if self.monitoring_active:
return
self.monitoring_active = True
self.monitor_thread = threading.Thread(
target=self._monitor_system,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
logger.info("Performance monitoring started")
def stop_monitoring(self):
"""Stop background monitoring"""
self.monitoring_active = False
if self.monitor_thread:
self.monitor_thread.join()
logger.info("Performance monitoring stopped")
def _monitor_system(self, interval: float):
"""Background system monitoring loop"""
while self.monitoring_active:
try:
metrics = self._collect_system_metrics()
self.system_history.append(metrics)
# Check for alerts
self._check_system_alerts(metrics)
time.sleep(interval)
except Exception as e:
logger.error(f"System monitoring error: {e}")
time.sleep(interval)
def _collect_system_metrics(self) -> SystemMetrics:
"""Collect current system metrics"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Count active database connections (approximate)
active_connections = len(self.active_queries)
return SystemMetrics(
cpu_percent=cpu_percent,
memory_percent=memory.percent,
disk_usage_percent=disk.percent,
active_connections=active_connections
)
def _check_system_alerts(self, metrics: SystemMetrics):
"""Check for system alert conditions"""
alerts = []
if metrics.cpu_percent > self.high_cpu_threshold:
alerts.append(f"High CPU usage: {metrics.cpu_percent:.1f}%")
if metrics.memory_percent > self.high_memory_threshold:
alerts.append(f"High memory usage: {metrics.memory_percent:.1f}%")
for alert in alerts:
self._trigger_alert("system", alert, metrics)
def start_query(self, query_id: str, query_type: str) -> str:
"""Start tracking a query execution"""
self.active_queries[query_id] = datetime.now()
self.query_counts[query_type] += 1
logger.debug(f"Started tracking query {query_id} ({query_type})")
return query_id
def end_query(
self,
query_id: str,
query_type: str,
success: bool = True,
error_type: Optional[str] = None,
result_count: int = 0
):
"""End tracking a query execution"""
if query_id not in self.active_queries:
logger.warning(f"Query {query_id} not found in active queries")
return
start_time = self.active_queries.pop(query_id)
execution_time = (datetime.now() - start_time).total_seconds()
# Record metrics
metrics = QueryMetrics(
query_id=query_id,
query_type=query_type,
execution_time=execution_time,
success=success,
error_type=error_type,
result_count=result_count
)
self.query_history.append(metrics)
# Track errors
if not success and error_type:
self.error_counts[error_type] += 1
# Check for slow queries
if execution_time > self.slow_query_threshold:
self._trigger_alert(
"slow_query",
f"Slow query detected: {execution_time:.2f}s",
metrics
)
logger.debug(f"Query {query_id} completed in {execution_time:.3f}s")
def _trigger_alert(self, alert_type: str, message: str, context: Any):
"""Trigger an alert to registered callbacks"""
alert_data = {
'type': alert_type,
'message': message,
'timestamp': datetime.now(),
'context': context
}
for callback in self.alert_callbacks:
try:
callback(alert_data)
except Exception as e:
logger.error(f"Alert callback failed: {e}")
def add_alert_callback(self, callback: callable):
"""Add an alert callback function"""
self.alert_callbacks.append(callback)
def get_query_statistics(self, hours: int = 24) -> Dict[str, Any]:
"""Get query statistics for the specified time period"""
cutoff = datetime.now() - timedelta(hours=hours)
recent_queries = [
q for q in self.query_history
if q.timestamp > cutoff
]
if not recent_queries:
return {
'total_queries': 0,
'success_rate': 0.0,
'average_execution_time': 0.0,
'query_types': {},
'error_summary': {}
}
# Calculate statistics
total_queries = len(recent_queries)
successful_queries = sum(1 for q in recent_queries if q.success)
success_rate = (successful_queries / total_queries) * 100
execution_times = [q.execution_time for q in recent_queries]
avg_execution_time = sum(execution_times) / len(execution_times)
# Query type breakdown
query_types = defaultdict(int)
for query in recent_queries:
query_types[query.query_type] += 1
# Error summary
error_summary = defaultdict(int)
for query in recent_queries:
if not query.success and query.error_type:
error_summary[query.error_type] += 1
return {
'total_queries': total_queries,
'success_rate': success_rate,
'average_execution_time': avg_execution_time,
'min_execution_time': min(execution_times),
'max_execution_time': max(execution_times),
'query_types': dict(query_types),
'error_summary': dict(error_summary),
'slow_queries': len([q for q in recent_queries if q.execution_time > self.slow_query_threshold])
}
def get_system_statistics(self, hours: int = 24) -> Dict[str, Any]:
"""Get system statistics for the specified time period"""
cutoff = datetime.now() - timedelta(hours=hours)
recent_metrics = [
m for m in self.system_history
if m.timestamp > cutoff
]
if not recent_metrics:
return {
'samples': 0,
'cpu': {},
'memory': {},
'disk': {},
'connections': {}
}
cpu_values = [m.cpu_percent for m in recent_metrics]
memory_values = [m.memory_percent for m in recent_metrics]
disk_values = [m.disk_usage_percent for m in recent_metrics]
connection_values = [m.active_connections for m in recent_metrics]
return {
'samples': len(recent_metrics),
'cpu': {
'average': sum(cpu_values) / len(cpu_values),
'min': min(cpu_values),
'max': max(cpu_values),
'current': cpu_values[-1] if cpu_values else 0
},
'memory': {
'average': sum(memory_values) / len(memory_values),
'min': min(memory_values),
'max': max(memory_values),
'current': memory_values[-1] if memory_values else 0
},
'disk': {
'average': sum(disk_values) / len(disk_values),
'min': min(disk_values),
'max': max(disk_values),
'current': disk_values[-1] if disk_values else 0
},
'connections': {
'average': sum(connection_values) / len(connection_values),
'min': min(connection_values),
'max': max(connection_values),
'current': connection_values[-1] if connection_values else 0
}
}
def get_performance_report(self) -> Dict[str, Any]:
"""Get comprehensive performance report"""
query_stats = self.get_query_statistics()
system_stats = self.get_system_statistics()
# Health indicators
health_score = 100
issues = []
if query_stats['success_rate'] < 95:
health_score -= 20
issues.append(f"Low success rate: {query_stats['success_rate']:.1f}%")
if query_stats['average_execution_time'] > self.slow_query_threshold:
health_score -= 15
issues.append(f"High average query time: {query_stats['average_execution_time']:.2f}s")
if system_stats['cpu']['current'] > self.high_cpu_threshold:
health_score -= 25
issues.append(f"High CPU usage: {system_stats['cpu']['current']:.1f}%")
if system_stats['memory']['current'] > self.high_memory_threshold:
health_score -= 20
issues.append(f"High memory usage: {system_stats['memory']['current']:.1f}%")
return {
'timestamp': datetime.now(),
'health_score': max(0, health_score),
'issues': issues,
'query_statistics': query_stats,
'system_statistics': system_stats,
'active_queries': len(self.active_queries),
'total_query_history': len(self.query_history),
'total_system_history': len(self.system_history)
}
def export_metrics(self, file_path: str, hours: int = 24):
"""Export metrics to JSON file"""
try:
report = self.get_performance_report()
# Add raw data for analysis
cutoff = datetime.now() - timedelta(hours=hours)
raw_queries = [
{
'query_id': q.query_id,
'query_type': q.query_type,
'execution_time': q.execution_time,
'success': q.success,
'error_type': q.error_type,
'result_count': q.result_count,
'timestamp': q.timestamp.isoformat()
}
for q in self.query_history
if q.timestamp > cutoff
]
raw_system = [
{
'cpu_percent': m.cpu_percent,
'memory_percent': m.memory_percent,
'disk_usage_percent': m.disk_usage_percent,
'active_connections': m.active_connections,
'timestamp': m.timestamp.isoformat()
}
for m in self.system_history
if m.timestamp > cutoff
]
export_data = {
'report': report,
'raw_queries': raw_queries,
'raw_system': raw_system
}
with open(file_path, 'w') as f:
json.dump(export_data, f, indent=2, default=str)
logger.info(f"Metrics exported to {file_path}")
except Exception as e:
logger.error(f"Failed to export metrics: {e}")
# Global performance monitor instance
_performance_monitor: Optional[PerformanceMonitor] = None
def get_performance_monitor() -> PerformanceMonitor:
"""Get global performance monitor instance"""
global _performance_monitor
if _performance_monitor is None:
_performance_monitor = PerformanceMonitor()
return _performance_monitor
def setup_performance_monitoring(config: Dict[str, Any] = None):
"""Setup and start performance monitoring"""
monitor = get_performance_monitor()
if config:
monitor.slow_query_threshold = config.get('slow_query_threshold', 5.0)
monitor.high_cpu_threshold = config.get('high_cpu_threshold', 80.0)
monitor.high_memory_threshold = config.get('high_memory_threshold', 85.0)
# Add default logging alert callback
def log_alert(alert_data):
logger.warning(f"Performance Alert [{alert_data['type']}]: {alert_data['message']}")
monitor.add_alert_callback(log_alert)
# Start monitoring
monitor.start_monitoring()
return monitor
# Decorator for automatic query monitoring
def monitor_query(query_type: str):
"""Decorator to automatically monitor query execution"""
def decorator(func):
async def async_wrapper(*args, **kwargs):
monitor = get_performance_monitor()
query_id = f"{query_type}_{int(time.time() * 1000)}"
monitor.start_query(query_id, query_type)
try:
result = await func(*args, **kwargs)
# Extract result count if possible
result_count = 0
if isinstance(result, dict):
if 'data' in result and isinstance(result['data'], list):
result_count = len(result['data'])
elif 'results' in result and isinstance(result['results'], list):
result_count = len(result['results'])
monitor.end_query(query_id, query_type, True, None, result_count)
return result
except Exception as e:
error_type = type(e).__name__
monitor.end_query(query_id, query_type, False, error_type)
raise
def sync_wrapper(*args, **kwargs):
monitor = get_performance_monitor()
query_id = f"{query_type}_{int(time.time() * 1000)}"
monitor.start_query(query_id, query_type)
try:
result = func(*args, **kwargs)
monitor.end_query(query_id, query_type, True)
return result
except Exception as e:
error_type = type(e).__name__
monitor.end_query(query_id, query_type, False, error_type)
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator