Skip to main content
Glama
liqcui

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_storage_ovnk.py39.5 kB
""" OVN-Kubernetes Metrics Storage Module Handles DuckDB storage operations for performance data """ import asyncio import duckdb import json import pandas as pd from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional, Union from pathlib import Path import os class ovnkMetricStor: """Storage handler for OVN-Kubernetes metrics using DuckDB""" def __init__(self, db_path: str = "storage/ovnk_benchmark.db"): self.db_path = db_path self.connection: Optional[duckdb.DuckDBPyConnection] = None self._ensure_storage_directory() def _ensure_storage_directory(self) -> None: """Ensure storage directory exists""" Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) async def initialize(self) -> None: """Initialize database and create tables""" try: self.connection = duckdb.connect(self.db_path) await self._create_tables() print(f"✅ Storage initialized: {self.db_path}") except Exception as e: print(f"❌ Failed to initialize storage: {e}") raise async def _create_tables(self) -> None: """Create necessary tables for storing metrics""" # Main metrics table metrics_table = """ CREATE TABLE IF NOT EXISTS metrics ( id INTEGER PRIMARY KEY, timestamp TIMESTAMP, metric_category VARCHAR, metric_name VARCHAR, metric_value DOUBLE, metric_unit VARCHAR, labels JSON, duration VARCHAR, start_time TIMESTAMP, end_time TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Summary table for aggregated data summary_table = """ CREATE TABLE IF NOT EXISTS metric_summaries ( id INTEGER PRIMARY KEY, timestamp TIMESTAMP, category VARCHAR, summary_data JSON, health_score INTEGER, overall_status VARCHAR, alerts JSON, duration VARCHAR, start_time TIMESTAMP, end_time TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Performance snapshots table snapshots_table = """ CREATE TABLE IF NOT EXISTS performance_snapshots ( id INTEGER PRIMARY KEY, run_id VARCHAR, timestamp TIMESTAMP, snapshot_data JSON, cluster_info JSON, total_metrics INTEGER, duration VARCHAR, start_time TIMESTAMP, end_time TIMESTAMP, success_count INTEGER, total_steps INTEGER, success_rate DOUBLE, errors JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Benchmark runs table benchmark_runs_table = """ CREATE TABLE IF NOT EXISTS benchmark_runs ( id INTEGER PRIMARY KEY, run_id VARCHAR UNIQUE, start_time TIMESTAMP, end_time TIMESTAMP, duration VARCHAR, run_type VARCHAR, configuration JSON, status VARCHAR, total_collections INTEGER, successful_collections INTEGER, success_rate DOUBLE, errors JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Alerts history table alerts_table = """ CREATE TABLE IF NOT EXISTS alerts_history ( id INTEGER PRIMARY KEY, timestamp TIMESTAMP, alert_type VARCHAR, alert_message VARCHAR, severity VARCHAR, category VARCHAR, metric_name VARCHAR, value DOUBLE, threshold DOUBLE, resolved_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Collection categories table collection_categories_table = """ CREATE TABLE IF NOT EXISTS collection_categories ( id INTEGER PRIMARY KEY, run_id VARCHAR, timestamp TIMESTAMP, category VARCHAR, category_data JSON, status VARCHAR, error_message VARCHAR, duration VARCHAR, start_time TIMESTAMP, end_time TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ tables = [ metrics_table, summary_table, snapshots_table, benchmark_runs_table, alerts_table, collection_categories_table ] for table_sql in tables: self.connection.execute(table_sql) # Create indexes for better performance indexes = [ "CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp)", "CREATE INDEX IF NOT EXISTS idx_metrics_category ON metrics(metric_category)", "CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(metric_name)", "CREATE INDEX IF NOT EXISTS idx_summaries_timestamp ON metric_summaries(timestamp)", "CREATE INDEX IF NOT EXISTS idx_summaries_category ON metric_summaries(category)", "CREATE INDEX IF NOT EXISTS idx_snapshots_timestamp ON performance_snapshots(timestamp)", "CREATE INDEX IF NOT EXISTS idx_snapshots_run_id ON performance_snapshots(run_id)", "CREATE INDEX IF NOT EXISTS idx_benchmark_runs_start_time ON benchmark_runs(start_time)", "CREATE INDEX IF NOT EXISTS idx_benchmark_runs_run_id ON benchmark_runs(run_id)", "CREATE INDEX IF NOT EXISTS idx_alerts_timestamp ON alerts_history(timestamp)", "CREATE INDEX IF NOT EXISTS idx_collection_categories_run_id ON collection_categories(run_id)", "CREATE INDEX IF NOT EXISTS idx_collection_categories_category ON collection_categories(category)", ] for index_sql in indexes: self.connection.execute(index_sql) # ========== Common Save Functions ========== async def save_collection_result(self, collection_result: Dict[str, Any]) -> str: """ Common function to save complete collection result to DuckDB Handles all aspects: run metadata, categories, metrics, and snapshots Args: collection_result: Complete result from agent collection Returns: run_id of the saved collection """ run_id = collection_result.get('run_id') timestamp = collection_result.get('timestamp') if not run_id or not timestamp: raise ValueError("Collection result must contain run_id and timestamp") try: # 1. Save benchmark run metadata await self._save_benchmark_run(collection_result) # 2. Save individual category data await self._save_collection_categories(collection_result) # 3. Save performance snapshot await self._save_performance_snapshot(collection_result) # 4. Extract and save metrics from categories await self._save_metrics_from_categories(collection_result) # 5. Extract and save alerts if present await self._save_alerts_from_collection(collection_result) print(f"✅ Collection saved successfully - Run ID: {run_id}") return run_id except Exception as e: print(f"❌ Error saving collection result: {e}") raise async def _save_benchmark_run(self, collection_result: Dict[str, Any]) -> None: """Save benchmark run metadata""" run_id = collection_result.get('run_id') timestamp_str = collection_result.get('timestamp') duration = collection_result.get('duration') start_time_str = collection_result.get('start_time') end_time_str = collection_result.get('end_time') timestamp_dt = self._parse_timestamp(timestamp_str) start_time_dt = self._parse_timestamp(start_time_str) if start_time_str else None end_time_dt = self._parse_timestamp(end_time_str) if end_time_str else None self.connection.execute(""" INSERT INTO benchmark_runs (run_id, start_time, end_time, duration, run_type, configuration, status, total_collections, successful_collections, success_rate, errors) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ run_id, start_time_dt or timestamp_dt, end_time_dt or timestamp_dt, duration, collection_result.get('mode', 'unknown'), json.dumps(collection_result.get('config', {})), 'completed' if collection_result.get('success') else 'failed', collection_result.get('total_steps', 0), collection_result.get('success_count', 0), collection_result.get('success_rate', 0.0), json.dumps(collection_result.get('errors', [])) ]) async def _save_collection_categories(self, collection_result: Dict[str, Any]) -> None: """Save individual category data""" run_id = collection_result.get('run_id') timestamp_str = collection_result.get('timestamp') timestamp_dt = self._parse_timestamp(timestamp_str) duration = collection_result.get('duration') start_time_str = collection_result.get('start_time') end_time_str = collection_result.get('end_time') start_time_dt = self._parse_timestamp(start_time_str) if start_time_str else None end_time_dt = self._parse_timestamp(end_time_str) if end_time_str else None collected_data = collection_result.get('collected_data', {}) for category, category_data in collected_data.items(): status = 'success' if not category_data.get('error') else 'failed' error_message = category_data.get('error', '') self.connection.execute(""" INSERT INTO collection_categories (run_id, timestamp, category, category_data, status, error_message, duration, start_time, end_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ run_id, timestamp_dt, category, json.dumps(category_data), status, error_message, duration, start_time_dt, end_time_dt ]) async def _save_performance_snapshot(self, collection_result: Dict[str, Any]) -> None: """Save complete performance snapshot""" run_id = collection_result.get('run_id') timestamp_str = collection_result.get('timestamp') timestamp_dt = self._parse_timestamp(timestamp_str) duration = collection_result.get('duration') start_time_str = collection_result.get('start_time') end_time_str = collection_result.get('end_time') start_time_dt = self._parse_timestamp(start_time_str) if start_time_str else None end_time_dt = self._parse_timestamp(end_time_str) if end_time_str else None collected_data = collection_result.get('collected_data', {}) cluster_info = collected_data.get('cluster_info', {}) self.connection.execute(""" INSERT INTO performance_snapshots (run_id, timestamp, snapshot_data, cluster_info, total_metrics, duration, start_time, end_time, success_count, total_steps, success_rate, errors) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ run_id, timestamp_dt, json.dumps(collected_data), json.dumps(cluster_info), len(collected_data), duration, start_time_dt, end_time_dt, collection_result.get('success_count', 0), collection_result.get('total_steps', 0), collection_result.get('success_rate', 0.0), json.dumps(collection_result.get('errors', [])) ]) async def _save_metrics_from_categories(self, collection_result: Dict[str, Any]) -> None: """Extract and save individual metrics from category data""" run_id = collection_result.get('run_id') timestamp_str = collection_result.get('timestamp') timestamp_dt = self._parse_timestamp(timestamp_str) duration = collection_result.get('duration') start_time_str = collection_result.get('start_time') end_time_str = collection_result.get('end_time') start_time_dt = self._parse_timestamp(start_time_str) if start_time_str else None end_time_dt = self._parse_timestamp(end_time_str) if end_time_str else None collected_data = collection_result.get('collected_data', {}) metrics_to_insert = [] for category, category_data in collected_data.items(): if 'error' in category_data: continue # Extract metrics based on category structure metrics_to_insert.extend( self._extract_metrics_from_category( category, category_data, timestamp_dt, duration, start_time_dt, end_time_dt ) ) # Bulk insert metrics if metrics_to_insert: df = pd.DataFrame(metrics_to_insert) self.connection.register('metrics_df', df) self.connection.execute(""" INSERT INTO metrics (timestamp, metric_category, metric_name, metric_value, metric_unit, labels, duration, start_time, end_time) SELECT timestamp, metric_category, metric_name, metric_value, metric_unit, labels, duration, start_time, end_time FROM metrics_df """) self.connection.unregister('metrics_df') def _extract_metrics_from_category(self, category: str, category_data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from category data structure""" metrics = [] # Handle different category structures if category == 'cluster_info': metrics.extend(self._extract_cluster_info_metrics( category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) elif category == 'node_usage': metrics.extend(self._extract_node_usage_metrics( category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) elif category == 'api_server': metrics.extend(self._extract_api_metrics( category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) elif category in ['ovnk_pods', 'ovnk_containers', 'multus']: metrics.extend(self._extract_pod_metrics( category, category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) elif category == 'latency_metrics': metrics.extend(self._extract_latency_metrics( category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) elif category == 'ovs_metrics': metrics.extend(self._extract_ovs_metrics( category_data, timestamp_dt, duration, start_time_dt, end_time_dt )) return metrics def _extract_cluster_info_metrics(self, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from cluster info""" metrics = [] # Total nodes if 'total_nodes' in data: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'cluster_info', 'metric_name': 'total_nodes', 'metric_value': float(data['total_nodes']), 'metric_unit': 'count', 'labels': json.dumps({'cluster_version': data.get('cluster_version', 'unknown')}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) # Network policies if 'networkpolicies_count' in data: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'cluster_info', 'metric_name': 'network_policies_count', 'metric_value': float(data['networkpolicies_count']), 'metric_unit': 'count', 'labels': json.dumps({}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) # Resource counts for resource_type in ['pods', 'services', 'secrets', 'configmaps']: key = f'{resource_type}_count' if key in data: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'cluster_info', 'metric_name': key, 'metric_value': float(data[key]), 'metric_unit': 'count', 'labels': json.dumps({'resource_type': resource_type}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _extract_node_usage_metrics(self, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from node usage data""" metrics = [] groups = data.get('groups', {}) for role, group_data in groups.items(): nodes = group_data.get('nodes', []) for node in nodes: node_name = node.get('node', 'unknown') # CPU metrics cpu = node.get('cpu', {}) for stat in ['min', 'avg', 'max']: if stat in cpu: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'node_usage', 'metric_name': f'cpu_{stat}', 'metric_value': float(cpu[stat]), 'metric_unit': 'percent', 'labels': json.dumps({'node': node_name, 'role': role}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) # Memory metrics memory = node.get('memory', {}) for stat in ['min', 'avg', 'max']: if stat in memory: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'node_usage', 'metric_name': f'memory_{stat}', 'metric_value': float(memory[stat]), 'metric_unit': 'bytes', 'labels': json.dumps({'node': node_name, 'role': role}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _extract_api_metrics(self, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from API server data""" metrics = [] summary = data.get('summary', {}) # Health score if 'health_score' in summary: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'api_server', 'metric_name': 'health_score', 'metric_value': float(summary['health_score']), 'metric_unit': 'score', 'labels': json.dumps({'status': summary.get('overall_status', 'unknown')}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _extract_pod_metrics(self, category: str, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from pod/container data""" metrics = [] top_pods = data.get('top_10_pods', []) for pod_data in top_pods: pod_name = pod_data.get('pod', 'unknown') node = pod_data.get('node', 'unknown') # CPU metrics cpu = pod_data.get('cpu', {}) for stat in ['min', 'avg', 'max']: if stat in cpu: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': category, 'metric_name': f'pod_cpu_{stat}', 'metric_value': float(cpu[stat]), 'metric_unit': 'percent', 'labels': json.dumps({'pod': pod_name, 'node': node}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) # Memory metrics memory = pod_data.get('memory', {}) for stat in ['min', 'avg', 'max']: if stat in memory: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': category, 'metric_name': f'pod_memory_{stat}', 'metric_value': float(memory[stat]), 'metric_unit': 'bytes', 'labels': json.dumps({'pod': pod_name, 'node': node}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _extract_latency_metrics(self, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from latency data""" metrics = [] latency_categories = data.get('latency_metrics', {}) for metric_name, metric_data in latency_categories.items(): if 'error' in metric_data: continue # Extract max value max_val = metric_data.get('max') if max_val is not None: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'latency', 'metric_name': metric_name, 'metric_value': float(max_val), 'metric_unit': 'seconds', 'labels': json.dumps({}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _extract_ovs_metrics(self, data: Dict[str, Any], timestamp_dt: datetime, duration: Optional[str], start_time_dt: Optional[datetime], end_time_dt: Optional[datetime]) -> List[Dict[str, Any]]: """Extract metrics from OVS data""" metrics = [] # Extract from various OVS metric categories for category_key in ['ovs_vswitchd_cpu', 'ovs_vswitchd_memory', 'dataplane_flows']: category_data = data.get(category_key, {}) if 'top_6' in category_data: for item in category_data['top_6']: node = item.get('node', 'unknown') value_key = 'max' if 'max' in item else 'avg' value = item.get(value_key) if value is not None: metrics.append({ 'timestamp': timestamp_dt, 'metric_category': 'ovs', 'metric_name': f'{category_key}_{value_key}', 'metric_value': float(value), 'metric_unit': self._get_ovs_unit(category_key), 'labels': json.dumps({'node': node}), 'duration': duration, 'start_time': start_time_dt, 'end_time': end_time_dt }) return metrics def _get_ovs_unit(self, metric_name: str) -> str: """Get unit for OVS metric""" if 'cpu' in metric_name: return 'percent' elif 'memory' in metric_name: return 'bytes' elif 'flows' in metric_name: return 'count' return 'unknown' async def _save_alerts_from_collection(self, collection_result: Dict[str, Any]) -> None: """Extract and save alerts from collection data""" timestamp_str = collection_result.get('timestamp') timestamp_dt = self._parse_timestamp(timestamp_str) collected_data = collection_result.get('collected_data', {}) for category, category_data in collected_data.items(): if 'error' in category_data: continue # Check for alerts in summary summary = category_data.get('summary', {}) alerts = summary.get('alerts', []) for alert in alerts: self.connection.execute(""" INSERT INTO alerts_history (timestamp, alert_type, alert_message, severity, category, metric_name, value, threshold) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [ timestamp_dt, alert.get('type', 'unknown'), alert.get('message', ''), alert.get('severity', 'info'), category, alert.get('metric_name', ''), alert.get('value'), alert.get('threshold') ]) def _parse_timestamp(self, timestamp_str: Optional[str]) -> Optional[datetime]: """Parse timestamp string to datetime object""" if not timestamp_str: return None try: return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) except Exception: return None # ========== Query Functions ========== async def get_metrics_history(self, category: Optional[str] = None, metric_name: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, limit: int = 1000) -> List[Dict[str, Any]]: """Get metrics history""" query = "SELECT * FROM metrics WHERE 1=1" params = [] if category: query += " AND metric_category = ?" params.append(category) if metric_name: query += " AND metric_name = ?" params.append(metric_name) if start_time: query += " AND timestamp >= ?" params.append(self._parse_timestamp(start_time)) if end_time: query += " AND timestamp <= ?" params.append(self._parse_timestamp(end_time)) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) result = self.connection.execute(query, params).fetchall() columns = ['id', 'timestamp', 'metric_category', 'metric_name', 'metric_value', 'metric_unit', 'labels', 'duration', 'start_time', 'end_time', 'created_at'] return [dict(zip(columns, row)) for row in result] async def get_collection_by_run_id(self, run_id: str) -> Optional[Dict[str, Any]]: """Get complete collection data by run_id""" # Get benchmark run run_result = self.connection.execute(""" SELECT * FROM benchmark_runs WHERE run_id = ? """, [run_id]).fetchone() if not run_result: return None run_columns = ['id', 'run_id', 'start_time', 'end_time', 'duration', 'run_type', 'configuration', 'status', 'total_collections', 'successful_collections', 'success_rate', 'errors', 'created_at'] run_data = dict(zip(run_columns, run_result)) # Parse JSON fields if run_data['configuration']: run_data['configuration'] = json.loads(run_data['configuration']) if run_data['errors']: run_data['errors'] = json.loads(run_data['errors']) # Get collection categories categories_result = self.connection.execute(""" SELECT * FROM collection_categories WHERE run_id = ? ORDER BY timestamp """, [run_id]).fetchall() category_columns = ['id', 'run_id', 'timestamp', 'category', 'category_data', 'status', 'error_message', 'duration', 'start_time', 'end_time', 'created_at'] categories = [] for row in categories_result: cat_data = dict(zip(category_columns, row)) if cat_data['category_data']: cat_data['category_data'] = json.loads(cat_data['category_data']) categories.append(cat_data) run_data['categories'] = categories return run_data async def get_performance_snapshots(self, start_time: Optional[str] = None, end_time: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: """Get performance snapshots""" query = "SELECT * FROM performance_snapshots WHERE 1=1" params = [] if start_time: query += " AND timestamp >= ?" params.append(self._parse_timestamp(start_time)) if end_time: query += " AND timestamp <= ?" params.append(self._parse_timestamp(end_time)) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) result = self.connection.execute(query, params).fetchall() columns = ['id', 'run_id', 'timestamp', 'snapshot_data', 'cluster_info', 'total_metrics', 'duration', 'start_time', 'end_time', 'success_count', 'total_steps', 'success_rate', 'errors', 'created_at'] snapshots = [] for row in result: row_dict = dict(zip(columns, row)) # Parse JSON fields if row_dict['snapshot_data']: row_dict['snapshot_data'] = json.loads(row_dict['snapshot_data']) if row_dict['cluster_info']: row_dict['cluster_info'] = json.loads(row_dict['cluster_info']) if row_dict['errors']: row_dict['errors'] = json.loads(row_dict['errors']) snapshots.append(row_dict) return snapshots async def get_benchmark_runs(self, limit: int = 50) -> List[Dict[str, Any]]: """Get benchmark runs""" result = self.connection.execute(""" SELECT * FROM benchmark_runs ORDER BY start_time DESC LIMIT ? """, [limit]).fetchall() columns = ['id', 'run_id', 'start_time', 'end_time', 'duration', 'run_type', 'configuration', 'status', 'total_collections', 'successful_collections', 'success_rate', 'errors', 'created_at'] runs = [] for row in result: row_dict = dict(zip(columns, row)) if row_dict['configuration']: row_dict['configuration'] = json.loads(row_dict['configuration']) if row_dict['errors']: row_dict['errors'] = json.loads(row_dict['errors']) runs.append(row_dict) return runs async def get_recent_alerts(self, hours: int = 24, limit: int = 100) -> List[Dict[str, Any]]: """Get recent alerts""" since = datetime.now(timezone.utc) - timedelta(hours=hours) result = self.connection.execute(""" SELECT * FROM alerts_history WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT ? """, [since, limit]).fetchall() columns = ['id', 'timestamp', 'alert_type', 'alert_message', 'severity', 'category', 'metric_name', 'value', 'threshold', 'resolved_at', 'created_at'] return [dict(zip(columns, row)) for row in result] async def get_health_trend(self, category: str, days: int = 7) -> List[Dict[str, Any]]: """Get health score trend for a category""" since = datetime.now(timezone.utc) - timedelta(days=days) result = self.connection.execute(""" SELECT DATE(timestamp) as date, AVG(health_score) as avg_health_score, MIN(health_score) as min_health_score, MAX(health_score) as max_health_score, COUNT(*) as measurements FROM metric_summaries WHERE category = ? AND timestamp >= ? GROUP BY DATE(timestamp) ORDER BY date DESC """, [category, since]).fetchall() columns = ['date', 'avg_health_score', 'min_health_score', 'max_health_score', 'measurements'] return [dict(zip(columns, row)) for row in result] async def get_database_stats(self) -> Dict[str, Any]: """Get database statistics""" stats = {} # Table counts stats['metrics_count'] = self.connection.execute("SELECT COUNT(*) FROM metrics").fetchone()[0] stats['summaries_count'] = self.connection.execute("SELECT COUNT(*) FROM metric_summaries").fetchone()[0] stats['snapshots_count'] = self.connection.execute("SELECT COUNT(*) FROM performance_snapshots").fetchone()[0] stats['benchmark_runs_count'] = self.connection.execute("SELECT COUNT(*) FROM benchmark_runs").fetchone()[0] stats['alerts_count'] = self.connection.execute("SELECT COUNT(*) FROM alerts_history").fetchone()[0] stats['categories_count'] = self.connection.execute("SELECT COUNT(*) FROM collection_categories").fetchone()[0] # Date ranges try: oldest_metric = self.connection.execute("SELECT MIN(timestamp) FROM metrics").fetchone()[0] newest_metric = self.connection.execute("SELECT MAX(timestamp) FROM metrics").fetchone()[0] stats['data_range'] = { 'oldest': oldest_metric.isoformat() if oldest_metric else None, 'newest': newest_metric.isoformat() if newest_metric else None } except: stats['data_range'] = {'oldest': None, 'newest': None} # Database file size try: file_size = os.path.getsize(self.db_path) stats['database_size_mb'] = round(file_size / (1024 * 1024), 2) except: stats['database_size_mb'] = 0 return stats async def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: """Clean up old data beyond retention period""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) # Count records before deletion metrics_count = self.connection.execute("SELECT COUNT(*) FROM metrics WHERE timestamp < ?", [cutoff]).fetchone()[0] summaries_count = self.connection.execute("SELECT COUNT(*) FROM metric_summaries WHERE timestamp < ?", [cutoff]).fetchone()[0] snapshots_count = self.connection.execute("SELECT COUNT(*) FROM performance_snapshots WHERE timestamp < ?", [cutoff]).fetchone()[0] alerts_count = self.connection.execute("SELECT COUNT(*) FROM alerts_history WHERE timestamp < ?", [cutoff]).fetchone()[0] categories_count = self.connection.execute("SELECT COUNT(*) FROM collection_categories WHERE timestamp < ?", [cutoff]).fetchone()[0] # Delete old records self.connection.execute("DELETE FROM metrics WHERE timestamp < ?", [cutoff]) self.connection.execute("DELETE FROM metric_summaries WHERE timestamp < ?", [cutoff]) self.connection.execute("DELETE FROM performance_snapshots WHERE timestamp < ?", [cutoff]) self.connection.execute("DELETE FROM alerts_history WHERE timestamp < ?", [cutoff]) self.connection.execute("DELETE FROM collection_categories WHERE timestamp < ?", [cutoff]) # Vacuum to reclaim space self.connection.execute("VACUUM") return { 'metrics_deleted': metrics_count, 'summaries_deleted': summaries_count, 'snapshots_deleted': snapshots_count, 'alerts_deleted': alerts_count, 'categories_deleted': categories_count, 'cutoff_date': cutoff.isoformat() } async def close(self) -> None: """Close database connection""" if self.connection: self.connection.close() self.connection = None print("✅ Database connection closed") # Alias for backward compatibility PrometheusStorage = ovnkMetricStor

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server