Skip to main content
Glama

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_mcp_api.py36 kB
#!/usr/bin/env python3 """ OpenShift OVN-Kubernetes Benchmark MCP API Wrapper Exposes MCP tools as FastAPI endpoints using lifespan management """ import asyncio import os from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException, status from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, ConfigDict import uvicorn # Import all the components from the original MCP server from tools.ovnk_benchmark_openshift_general_info import OpenShiftGeneralInfo from tools.ovnk_benchmark_prometheus_basequery import PrometheusBaseQuery from tools.ovnk_benchmark_prometheus_kubeapi import kubeAPICollector from tools.ovnk_benchmark_prometheus_pods_usage import PodsUsageCollector, collect_ovn_duration_usage from tools.ovnk_benchmark_prometheus_ovnk_latency import OVNLatencyCollector from tools.ovnk_benchmark_prometheus_ovnk_ovs import OVSUsageCollector from tools.ovnk_benchmark_prometheus_nodes_usage import NodeUsageQuery from ocauth.ovnk_benchmark_auth import OpenShiftAuth from config.ovnk_benchmark_config import Config from elt.ovnk_benchmark_elt_duckdb import PerformanceELT from storage.ovnk_benchmark_storage_ovnk import PrometheusStorage # Import analysis modules try: from analysis.ovnk_benchmark_performance_unified import UnifiedPerformanceAnalyzer UNIFIED_ANALYZER_AVAILABLE = True except ImportError: UNIFIED_ANALYZER_AVAILABLE = False print("Warning: Unified analyzer not available") # Configure timezone os.environ['TZ'] = 'UTC' # Pydantic models (reusing from original MCP server) class MetricsRequest(BaseModel): """Request model for basic metrics queries""" duration: str = Field( default="5m", description="Query duration (e.g., 5m, 1h, 1d, 7d)" ) start_time: Optional[str] = Field( default=None, description="Start time in ISO format (YYYY-MM-DDTHH:MM:SSZ)" ) end_time: Optional[str] = Field( default=None, description="End time in ISO format (YYYY-MM-DDTHH:MM:SSZ)" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") class PODsRequest(BaseModel): """Request model for pod-specific metrics queries""" pod_pattern: str = Field( default="ovnkube.*", description="Regex pattern for pod names (e.g., 'ovnkube.*', 'multus.*')" ) container_pattern: str = Field( default=".*", description="Regex pattern for container names (e.g., 'ovnkube-controller', 'kube-rbac-proxy.*')" ) label_selector: str = Field( default=".*", description="Regex pattern for pod label selectors" ) namespace_pattern: str = Field( default="openshift-ovn-kubernetes", description="Regex pattern for namespace (e.g., 'openshift-ovn-kubernetes', 'openshift-multus')" ) top_n: int = Field( default=10, description="Number of top results to return (1-50)" ) duration: str = Field( default="1h", description="Query duration for range queries (e.g., 5m, 1h, 1d)" ) start_time: Optional[str] = Field( default=None, description="Start time in ISO format" ) end_time: Optional[str] = Field( default=None, description="End time in ISO format" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") class GeneralInfoRequest(BaseModel): """Request model for OpenShift general information queries""" namespace: Optional[str] = Field( default=None, description="Specific namespace to query (optional, queries all if not specified)" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") class PerformanceDataRequest(BaseModel): """Request model for performance data storage""" metrics_data: Dict[str, Any] = Field( description="Metrics data dictionary to store in database" ) timestamp: Optional[str] = Field( default=None, description="ISO timestamp for the data (defaults to current time)" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") class AnalysisRequest(BaseModel): """Request model for performance analysis operations""" component: str = Field( description="Component to analyze: 'pods', 'ovs', 'sync_duration', 'multus', or 'all'" ) metrics_data: Optional[Dict[str, Any]] = Field( default=None, description="Metrics data to analyze (if not provided, will be collected automatically)" ) duration: str = Field( default="1h", description="Duration for data collection if metrics_data not provided" ) save_reports: bool = Field( default=True, description="Whether to save analysis reports to files" ) output_dir: str = Field( default=".", description="Directory to save analysis reports" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") class PerformanceHistoryRequest(BaseModel): """Request model for performance history queries""" days: int = Field( default=7, description="Number of days of historical data to retrieve (1-365)" ) metric_type: Optional[str] = Field( default=None, description="Filter by specific metric type (e.g., 'pods', 'ovs', 'sync_duration', 'multus')" ) model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") # Global components auth_manager: Optional[OpenShiftAuth] = None config: Optional[Config] = None prometheus_client: Optional[PrometheusBaseQuery] = None storage: Optional[PrometheusStorage] = None unified_analyzer: Optional[UnifiedPerformanceAnalyzer] = None async def initialize_components(): """Initialize global components""" global auth_manager, config, prometheus_client, storage, unified_analyzer try: config = Config() auth_manager = OpenShiftAuth(config.kubeconfig_path) await auth_manager.initialize() prometheus_client = PrometheusBaseQuery( auth_manager.prometheus_url, auth_manager.prometheus_token ) storage = PrometheusStorage() await storage.initialize() if UNIFIED_ANALYZER_AVAILABLE: unified_analyzer = UnifiedPerformanceAnalyzer() print("MCP components initialized successfully") except Exception as e: print(f"Error initializing components: {e}") raise async def cleanup_components(): """Cleanup global components""" global auth_manager, storage try: if storage: await storage.close() if auth_manager: await auth_manager.cleanup() print("MCP components cleaned up successfully") except Exception as e: print(f"Error cleaning up components: {e}") @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI lifespan management for proper startup and shutdown""" # Startup print("Starting OCP Benchmark MCP API...") await initialize_components() yield # Shutdown print("Shutting down OCP Benchmark MCP API...") await cleanup_components() # Create FastAPI app with lifespan app = FastAPI( title="OpenShift OVN-Kubernetes Benchmark API", description="REST API wrapper for OCP Benchmark MCP Server tools", version="1.0.0", lifespan=lifespan ) @app.get("/", summary="Health Check") async def root(): """Health check endpoint""" return { "status": "healthy", "service": "OCP Benchmark MCP API", "timestamp": datetime.now(timezone.utc).isoformat(), "unified_analyzer_available": UNIFIED_ANALYZER_AVAILABLE } @app.post("/api/v1/openshift/general-info", summary="Get OpenShift General Information") async def get_openshift_general_info(request: GeneralInfoRequest = GeneralInfoRequest()): """Get comprehensive OpenShift cluster information including NetworkPolicy counts, AdminNetworkPolicy counts, EgressFirewall counts, node status, and resource utilization.""" try: general_info = OpenShiftGeneralInfo() await general_info.initialize() cluster_info = await general_info.collect_cluster_info() return general_info.to_dict(cluster_info) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/cluster/node-usage", summary="Get Cluster Node Usage Metrics") async def query_cluster_node_usage(request: MetricsRequest = MetricsRequest()): """Get cluster node resource usage metrics including CPU utilization, memory consumption, disk usage, and network utilization.""" global prometheus_client try: if not prometheus_client: await initialize_components() cluster_nodes_usage = NodeUsageQuery(prometheus_client) cluster_info = await cluster_nodes_usage.query_node_usage(request.duration) return cluster_info except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/kube-api/metrics", summary="Query Kubernetes API Metrics") async def query_kube_api_metrics(request: MetricsRequest = MetricsRequest()): """Query Kubernetes API server performance metrics including request rates, response times, error rates, and resource consumption.""" global prometheus_client try: if not prometheus_client: await initialize_components() kube_api_metrics = kubeAPICollector(prometheus_client) return await kube_api_metrics.get_metrics(request.duration, request.start_time, request.end_time) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/ovnk/pods-metrics", summary="Query OVN-K Pod Metrics") async def query_ovnk_pods_metrics(request: PODsRequest = PODsRequest()): """Query OVN-Kubernetes pod resource usage metrics including CPU and memory consumption.""" global prometheus_client try: if not prometheus_client: await initialize_components() pods_duration_summary = await collect_ovn_duration_usage( prometheus_client, duration=request.duration ) return pods_duration_summary except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/ovnk/containers-metrics", summary="Query OVN-K Container Metrics") async def query_ovnk_containers_metrics(request: PODsRequest = PODsRequest()): """Query detailed container-level metrics within OVN-Kubernetes pods including per-container CPU and memory usage.""" global prometheus_client try: if not prometheus_client: await initialize_components() collector = PodsUsageCollector(prometheus_client) return await collector.collect_duration_usage( request.duration, request.pod_pattern, request.container_pattern, request.namespace_pattern ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/ovnk/ovs-metrics", summary="Query OVS Metrics") async def query_ovnk_ovs_metrics(request: PODsRequest = PODsRequest()): """Query Open vSwitch (OVS) performance metrics including CPU usage, memory consumption, flow table statistics.""" global prometheus_client, auth_manager try: if not prometheus_client or not auth_manager: await initialize_components() collector = OVSUsageCollector(prometheus_client, auth_manager) range_results = await collector.collect_all_ovs_metrics(request.duration) return range_results except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/multus/metrics", summary="Query Multus Metrics") async def query_multus_metrics(request: PODsRequest = PODsRequest(pod_pattern="multus.*", namespace_pattern="openshift-multus")): """Query Multus CNI metrics including network attachment processing times, resource usage.""" global prometheus_client try: if not prometheus_client: await initialize_components() collector = PodsUsageCollector(prometheus_client) return await collector.collect_duration_usage( request.duration, request.pod_pattern, request.container_pattern, request.namespace_pattern ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/ovnk/sync-duration-metrics", summary="Query OVN-K Sync Duration Metrics") async def query_ovnk_sync_duration_seconds_metrics(request: MetricsRequest = MetricsRequest()): """Query OVN-Kubernetes synchronization duration metrics including controller sync times, resource reconciliation durations.""" global prometheus_client try: if not prometheus_client: await initialize_components() collector = OVNLatencyCollector(prometheus_client) return await collector.collect_sync_duration_seconds_metrics(request.duration) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/analysis/pods", summary="Analyze Pod Performance") async def analyze_pods_performance(request: AnalysisRequest): """Perform comprehensive performance analysis of OVN-Kubernetes pods.""" global prometheus_client, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client: await initialize_components() # Get metrics data if not provided metrics_data = request.metrics_data if not metrics_data: metrics_data = await collect_ovn_duration_usage( prometheus_client, duration=request.duration ) # Perform analysis if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() analysis_result = unified_analyzer.analyze_component("pods", metrics_data) # Save reports if requested if request.save_reports: try: report_files = unified_analyzer.generate_comprehensive_report( {"pods": analysis_result}, save_json=True, save_text=True, output_dir=request.output_dir ) analysis_result["saved_reports"] = report_files except Exception as e: analysis_result["report_save_error"] = str(e) return { "analysis_result": analysis_result, "metrics_data_summary": { "collection_type": metrics_data.get("collection_type", "unknown"), "total_pods": len(metrics_data.get("top_10_pods", [])), "timestamp": metrics_data.get("collection_timestamp", datetime.now(timezone.utc).isoformat()) } } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/analysis/ovs", summary="Analyze OVS Performance") async def analyze_ovs_performance(request: AnalysisRequest): """Perform comprehensive performance analysis of Open vSwitch (OVS) components.""" global prometheus_client, auth_manager, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client or not auth_manager: await initialize_components() # Get metrics data if not provided metrics_data = request.metrics_data if not metrics_data: collector = OVSUsageCollector(prometheus_client, auth_manager) metrics_data = await collector.collect_all_ovs_metrics(request.duration) # Perform analysis if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() analysis_result = unified_analyzer.analyze_component("ovs", metrics_data) # Save reports if requested if request.save_reports: try: report_files = unified_analyzer.generate_comprehensive_report( {"ovs": analysis_result}, save_json=True, save_text=True, output_dir=request.output_dir ) analysis_result["saved_reports"] = report_files except Exception as e: analysis_result["report_save_error"] = str(e) return { "analysis_result": analysis_result, "metrics_data_summary": { "collection_type": metrics_data.get("collection_type", "unknown"), "timestamp": metrics_data.get("timestamp", datetime.now(timezone.utc).isoformat()) } } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/analysis/sync-duration", summary="Analyze Sync Duration Performance") async def analyze_sync_duration_performance(request: AnalysisRequest): """Perform comprehensive analysis of OVN-Kubernetes synchronization duration metrics.""" global prometheus_client, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client: await initialize_components() # Get metrics data if not provided metrics_data = request.metrics_data if not metrics_data: collector = OVNLatencyCollector(prometheus_client) metrics_data = await collector.collect_sync_duration_seconds_metrics(request.duration) # Perform analysis if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() analysis_result = unified_analyzer.analyze_component("sync_duration", metrics_data) # Save reports if requested if request.save_reports: try: report_files = unified_analyzer.generate_comprehensive_report( {"sync_duration": analysis_result}, save_json=True, save_text=True, output_dir=request.output_dir ) analysis_result["saved_reports"] = report_files except Exception as e: analysis_result["report_save_error"] = str(e) return { "analysis_result": analysis_result, "metrics_data_summary": { "collection_type": metrics_data.get("collection_type", "unknown"), "duration": metrics_data.get("duration", request.duration), "timestamp": metrics_data.get("timestamp", datetime.now(timezone.utc).isoformat()) } } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/analysis/multus", summary="Analyze Multus Performance") async def analyze_multus_performance(request: AnalysisRequest): """Perform comprehensive performance analysis of Multus CNI components.""" global prometheus_client, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client: await initialize_components() # Get metrics data if not provided metrics_data = request.metrics_data if not metrics_data: # Use pod collector with Multus-specific patterns collector = PodsUsageCollector(prometheus_client) metrics_data = await collector.collect_duration_usage( request.duration, pod_pattern="multus.*|network-metrics.*", namespace_pattern="openshift-multus" ) # Perform analysis if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() analysis_result = unified_analyzer.analyze_component("multus", metrics_data) # Save reports if requested if request.save_reports: try: report_files = unified_analyzer.generate_comprehensive_report( {"multus": analysis_result}, save_json=True, save_text=True, output_dir=request.output_dir ) analysis_result["saved_reports"] = report_files except Exception as e: analysis_result["report_save_error"] = str(e) return { "analysis_result": analysis_result, "metrics_data_summary": { "collection_type": metrics_data.get("collection_type", "unknown"), "total_pods": len(metrics_data.get("top_10_pods", [])), "timestamp": metrics_data.get("collection_timestamp", datetime.now(timezone.utc).isoformat()) } } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/analysis/unified", summary="Analyze Unified Performance") async def analyze_unified_performance(request: AnalysisRequest): """Perform comprehensive unified performance analysis across all OVNK components.""" global prometheus_client, auth_manager, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client or not auth_manager: await initialize_components() if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() # Collect data for all components if not provided components_data = request.metrics_data or {} if not components_data: print("Collecting metrics for all components...") # Collect pods metrics try: components_data["pods"] = await collect_ovn_duration_usage( prometheus_client, duration=request.duration ) except Exception as e: components_data["pods"] = {"error": str(e)} # Collect OVS metrics try: ovs_collector = OVSUsageCollector(prometheus_client, auth_manager) components_data["ovs"] = await ovs_collector.collect_all_ovs_metrics(request.duration) except Exception as e: components_data["ovs"] = {"error": str(e)} # Collect sync duration metrics try: sync_collector = OVNLatencyCollector(prometheus_client) components_data["sync_duration"] = await sync_collector.collect_sync_duration_seconds_metrics(request.duration) except Exception as e: components_data["sync_duration"] = {"error": str(e)} # Collect Multus metrics try: multus_collector = PodsUsageCollector(prometheus_client) components_data["multus"] = await multus_collector.collect_duration_usage( request.duration, pod_pattern="multus.*|network-metrics.*", namespace_pattern="openshift-multus" ) except Exception as e: components_data["multus"] = {"error": str(e)} # Perform unified analysis analysis_results = unified_analyzer.analyze_all_components(components_data) # Generate comprehensive reports if request.save_reports: try: report_files = unified_analyzer.generate_comprehensive_report( analysis_results, save_json=True, save_text=True, output_dir=request.output_dir ) return { "analysis_results": analysis_results, "saved_reports": report_files, "executive_summary": unified_analyzer._generate_executive_summary(analysis_results), "cross_component_insights": unified_analyzer._generate_cross_component_insights(analysis_results) } except Exception as e: return { "analysis_results": analysis_results, "report_save_error": str(e), "executive_summary": unified_analyzer._generate_executive_summary(analysis_results) } return { "analysis_results": analysis_results, "executive_summary": unified_analyzer._generate_executive_summary(analysis_results), "cross_component_insights": unified_analyzer._generate_cross_component_insights(analysis_results) } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/health-check", summary="Performance Health Check") async def get_performance_health_check(request: PODsRequest = PODsRequest()): """Perform a quick health check across all OVNK components providing a high-level status overview.""" global prometheus_client, auth_manager, unified_analyzer try: if not UNIFIED_ANALYZER_AVAILABLE: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail={"error": "Unified analyzer not available", "timestamp": datetime.now(timezone.utc).isoformat()} ) if not prometheus_client or not auth_manager: await initialize_components() if not unified_analyzer: unified_analyzer = UnifiedPerformanceAnalyzer() # Quick data collection (shorter duration for health check) health_duration = "5m" # Quick health check components_data = {} # Collect minimal data for quick assessment try: components_data["pods"] = await collect_ovn_duration_usage( prometheus_client, duration=health_duration ) except Exception as e: components_data["pods"] = {"error": str(e)} try: ovs_collector = OVSUsageCollector(prometheus_client, auth_manager) components_data["ovs"] = await ovs_collector.collect_all_ovs_metrics(health_duration) except Exception as e: components_data["ovs"] = {"error": str(e)} # Quick analysis analysis_results = unified_analyzer.analyze_all_components(components_data) executive_summary = unified_analyzer._generate_executive_summary(analysis_results) # Generate health status health_status = {} for component, result in analysis_results.items(): if "error" in result: health_status[component] = "ERROR" else: health_score = unified_analyzer._extract_health_score(component, result) if health_score is None: health_status[component] = "UNKNOWN" elif health_score >= 85: health_status[component] = "EXCELLENT" elif health_score >= 70: health_status[component] = "GOOD" elif health_score >= 50: health_status[component] = "MODERATE" elif health_score >= 30: health_status[component] = "POOR" else: health_status[component] = "CRITICAL" return { "health_check_timestamp": datetime.now(timezone.utc).isoformat(), "overall_cluster_health": executive_summary.get("overall_cluster_health", "unknown"), "component_health_status": health_status, "critical_issues": executive_summary.get("critical_issues_count", 0), "warning_issues": executive_summary.get("warning_issues_count", 0), "immediate_action_required": executive_summary.get("immediate_action_required", False), "key_findings": executive_summary.get("key_findings", []), "components_analyzed": list(health_status.keys()), "health_check_duration": health_duration } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/storage/performance-data", summary="Store Performance Data") async def store_performance_data(request: PerformanceDataRequest): """Store performance metrics data in the DuckDB database for historical analysis and trend monitoring.""" global storage try: if not storage: await initialize_components() elt = PerformanceELT(storage.db_path) timestamp = request.timestamp or datetime.now(timezone.utc).isoformat() await elt.store_performance_data(request.metrics_data, timestamp) return { "status": "success", "message": "Performance data stored successfully", "timestamp": timestamp } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) @app.post("/api/v1/storage/performance-history", summary="Get Performance History") async def get_performance_history(request: PerformanceHistoryRequest = PerformanceHistoryRequest()): """Retrieve historical performance data from the database for trend analysis and performance tracking over time.""" global storage try: if not storage: await initialize_components() elt = PerformanceELT(storage.db_path) result = await elt.get_performance_history(request.days, request.metric_type) return result except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": str(e), "timestamp": datetime.now(timezone.utc).isoformat()} ) # Additional API endpoints for convenience @app.get("/api/v1/components/status", summary="Get Component Status") async def get_component_status(): """Get status of all initialized components""" return { "auth_manager": auth_manager is not None, "config": config is not None, "prometheus_client": prometheus_client is not None, "storage": storage is not None, "unified_analyzer": unified_analyzer is not None, "unified_analyzer_available": UNIFIED_ANALYZER_AVAILABLE, "timestamp": datetime.now(timezone.utc).isoformat() } @app.get("/api/v1/metrics", summary="List Available Metrics") async def list_available_metrics(): """List all available metric endpoints and their descriptions""" return { "metrics_endpoints": { "/api/v1/openshift/general-info": "OpenShift cluster general information", "/api/v1/cluster/node-usage": "Cluster node resource usage metrics", "/api/v1/kube-api/metrics": "Kubernetes API server performance metrics", "/api/v1/ovnk/pods-metrics": "OVN-K pod resource usage metrics", "/api/v1/ovnk/containers-metrics": "OVN-K container-level metrics", "/api/v1/ovnk/ovs-metrics": "Open vSwitch performance metrics", "/api/v1/multus/metrics": "Multus CNI metrics", "/api/v1/ovnk/sync-duration-metrics": "OVN-K synchronization duration metrics" }, "analysis_endpoints": { "/api/v1/analysis/pods": "Pod performance analysis", "/api/v1/analysis/ovs": "OVS performance analysis", "/api/v1/analysis/sync-duration": "Sync duration performance analysis", "/api/v1/analysis/multus": "Multus performance analysis", "/api/v1/analysis/unified": "Unified cross-component analysis", "/api/v1/health-check": "Quick health check across all components" }, "storage_endpoints": { "/api/v1/storage/performance-data": "Store performance data", "/api/v1/storage/performance-history": "Retrieve historical performance data" } } def create_app(): """Factory function to create the FastAPI app (useful for testing)""" return app if __name__ == "__main__": # Run the server uvicorn.run( "ocp_benchmark_mcp_api:app", host="0.0.0.0", port=8080, reload=True, log_level="info" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server