Skip to main content
Glama

OpenShift OVN-Kubernetes Benchmark MCP Server

by liqcui
ovnk_benchmark_mcp_agent_stor2db.py49.7 kB
#!/usr/bin/env python3 """ OVN-Kubernetes Benchmark MCP Agent - Performance Data Collection with Storage AI agent using LangGraph to collect performance data via MCP server and store in DuckDB Updated to match ETCD analyzer pattern with comprehensive storage integration """ import asyncio import json import uuid import logging from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional, TypedDict import traceback import os from dotenv import load_dotenv # MCP Streamable HTTP client imports try: from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client MCP_CLIENT_AVAILABLE = True except ImportError: MCP_CLIENT_AVAILABLE = False print("Warning: MCP client not available. Install with: pip install mcp") # LangGraph imports try: from langgraph.graph import StateGraph, END, START from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage from langchain_openai import ChatOpenAI AI_AVAILABLE = True except ImportError: AI_AVAILABLE = False print("LangGraph/LangChain not available. Running in non-AI mode.") # Storage imports try: from storage.ovnk_benchmark_storage_ovnk import ovnkMetricStor STORAGE_AVAILABLE = True except ImportError: STORAGE_AVAILABLE = False print("Warning: Storage module not available.") # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AgentState(TypedDict): """State for the performance data collection agent""" messages: List[Any] current_step: str cluster_info: Optional[Dict[str, Any]] node_usage_data: Optional[Dict[str, Any]] basic_info_data: Optional[Dict[str, Any]] api_metrics_data: Optional[Dict[str, Any]] multus_metrics_data: Optional[Dict[str, Any]] ovnk_pods_data: Optional[Dict[str, Any]] ovnk_containers_data: Optional[Dict[str, Any]] ovs_metrics_data: Optional[Dict[str, Any]] latency_metrics_data: Optional[Dict[str, Any]] duration: str start_time: Optional[str] end_time: Optional[str] testing_id: str query_params: Dict[str, Any] results: Dict[str, Any] error: Optional[str] class PerformanceDataAgent: """AI agent for collecting OVN-K performance data via MCP server with DuckDB storage""" def __init__(self, mcp_server_url: str = "http://localhost:8000", openai_api_key: Optional[str] = None, duration: Optional[str] = "5m", start_time: Optional[str] = None, end_time: Optional[str] = None, use_ai: bool = True, db_path: str = "storage/ovnk_benchmark.db"): self.mcp_server_url = mcp_server_url.rstrip('/') self.duration = duration self.start_time = start_time self.end_time = end_time self.run_uuid = str(uuid.uuid4()) self.use_ai = use_ai and AI_AVAILABLE self.db_path = db_path self.storage: Optional[ovnkMetricStor] = None self._storage_initialized = False if self.use_ai: load_dotenv() api_key = openai_api_key or os.getenv("OPENAI_API_KEY") base_url = os.getenv("BASE_URL") if not api_key: logger.warning("OPENAI_API_KEY not provided. Running in non-AI mode.") self.use_ai = False else: self.llm = ChatOpenAI( model="gemini-1.5-flash", base_url=base_url, api_key=api_key, temperature=0.1, streaming=True ) if self.use_ai: self.graph = self._create_graph() else: logger.info("Running in non-AI mode - direct collection without LangGraph") def _create_graph(self) -> StateGraph: """Create the LangGraph StateGraph workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("initialize", self._initialize_node) workflow.add_node("collect_cluster_info", self._collect_cluster_info_node) workflow.add_node("collect_node_usage", self._collect_node_usage_node) workflow.add_node("collect_basic_info", self._collect_basic_info_node) workflow.add_node("collect_api_metrics", self._collect_api_metrics_node) workflow.add_node("collect_multus_metrics", self._collect_multus_metrics_node) workflow.add_node("collect_ovnk_pods", self._collect_ovnk_pods_node) workflow.add_node("collect_ovnk_containers", self._collect_ovnk_containers_node) workflow.add_node("collect_ovs_metrics", self._collect_ovs_metrics_node) workflow.add_node("collect_latency_metrics", self._collect_latency_metrics_node) workflow.add_node("store_data", self._store_data_node) workflow.add_node("finalize", self._finalize_node) workflow.add_node("handle_error", self._handle_error_node) # Set entry point workflow.add_edge(START, "initialize") # Add conditional edges with error handling workflow.add_conditional_edges( "initialize", self._should_handle_error, { "error": "handle_error", "continue": "collect_cluster_info" } ) workflow.add_conditional_edges( "collect_cluster_info", self._should_handle_error, { "error": "handle_error", "continue": "collect_node_usage" } ) workflow.add_conditional_edges( "collect_node_usage", self._should_handle_error, { "error": "handle_error", "continue": "collect_basic_info" } ) workflow.add_conditional_edges( "collect_basic_info", self._should_handle_error, { "error": "handle_error", "continue": "collect_api_metrics" } ) workflow.add_conditional_edges( "collect_api_metrics", self._should_handle_error, { "error": "handle_error", "continue": "collect_multus_metrics" } ) workflow.add_conditional_edges( "collect_multus_metrics", self._should_handle_error, { "error": "handle_error", "continue": "collect_ovnk_pods" } ) workflow.add_conditional_edges( "collect_ovnk_pods", self._should_handle_error, { "error": "handle_error", "continue": "collect_ovnk_containers" } ) workflow.add_conditional_edges( "collect_ovnk_containers", self._should_handle_error, { "error": "handle_error", "continue": "collect_ovs_metrics" } ) workflow.add_conditional_edges( "collect_ovs_metrics", self._should_handle_error, { "error": "handle_error", "continue": "collect_latency_metrics" } ) workflow.add_conditional_edges( "collect_latency_metrics", self._should_handle_error, { "error": "handle_error", "continue": "store_data" } ) # Set workflow path workflow.add_edge("store_data", "finalize") workflow.add_edge("handle_error", END) workflow.add_edge("finalize", END) return workflow.compile() def _should_handle_error(self, state: AgentState) -> str: """Check if there's a critical error to handle""" return "error" if state.get("error") else "continue" def _get_time_params(self) -> Dict[str, Any]: """Get time parameters for MCP tool calls""" params = {} if self.duration: params["duration"] = self.duration if self.start_time: params["start_time"] = self.start_time if self.end_time: params["end_time"] = self.end_time return params async def _initialize_node(self, state: AgentState) -> AgentState: """Initialize the data collection process""" query_params = state.get("query_params", {}) testing_id = query_params.get("testing_id") or self.run_uuid duration = query_params.get("duration", self.duration) start_time = query_params.get("start_time", self.start_time) end_time = query_params.get("end_time", self.end_time) time_range = f"duration: {duration}" if duration else f"from {start_time} to {end_time}" logger.info(f"Starting performance data collection - Testing ID: {testing_id} ({time_range})") if self.use_ai: initialization_message = SystemMessage(content=f"""You are a performance data collection agent for OpenShift OVN-Kubernetes clusters. Your task is to collect comprehensive performance metrics from various components. Collection steps: 1. Collect OpenShift cluster information 2. Collect cluster node usage metrics 3. Collect Prometheus basic info 4. Collect API server metrics 5. Collect Multus CNI metrics 6. Collect OVN-K pods metrics 7. Collect OVN-K containers metrics 8. Collect OVS metrics 9. Collect OVN-K latency metrics Time range: {time_range} Be thorough and handle errors gracefully.""") start_message = HumanMessage(content=f"Start performance data collection for {time_range}") messages = [initialization_message, start_message] else: messages = [] return { **state, "testing_id": testing_id, "duration": duration, "start_time": start_time, "end_time": end_time, "results": {}, "messages": messages, "current_step": "initialize" } async def _initialize_storage(self) -> bool: """Initialize storage with retry logic for database locks""" if self._storage_initialized or not STORAGE_AVAILABLE: return self._storage_initialized try: self.storage = ovnkMetricStor(self.db_path) await self.storage.initialize() self._storage_initialized = True logger.info(f"✅ Storage initialized: {self.db_path}") return True except Exception as e: err_msg = str(e) # Detect lock conflict and attempt seamless fallback without noisy warning is_lock_conflict = ("Could not set lock on file" in err_msg) or ("Conflicting lock" in err_msg) if is_lock_conflict: logger.info("Database is locked by another process; switching to a per-run database file.") else: logger.warning(f"⚠️ Warning: Failed to initialize storage: {err_msg}") # Check if it's a lock error and retry with per-run database if is_lock_conflict: try: base_no_ext, _ = os.path.splitext(os.path.abspath(self.db_path)) alt_db_path = f"{base_no_ext}_{self.run_uuid}.db" logger.info(f"➡️ Retrying storage with per-run DB: {alt_db_path}") self.storage = ovnkMetricStor(alt_db_path) await self.storage.initialize() self.db_path = alt_db_path self._storage_initialized = True logger.info(f"✅ Storage initialized with per-run DB: {self.db_path}") return True except Exception as e2: logger.warning(f"⚠️ Warning: Fallback storage init failed: {e2}") self.storage = None return False else: self.storage = None return False async def _collect_cluster_info_node(self, state: AgentState) -> AgentState: """Collect OpenShift cluster information""" logger.info("Collecting OpenShift cluster information...") state["current_step"] = "collect_cluster_info" try: data = await self._call_mcp_tool("get_openshift_cluster_info", { "include_node_details": True, "include_resource_counts": True, "include_network_policies": True, "include_operator_status": True, "include_mcp_status": True }) total_nodes = data.get("total_nodes", 0) cluster_version = data.get("cluster_version", "unknown") msg = f"Successfully collected cluster information. Version: {cluster_version}, Nodes: {total_nodes}" logger.info(f" → {msg}") return { **state, "cluster_info": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect cluster info: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "cluster_info": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_node_usage_node(self, state: AgentState) -> AgentState: """Collect cluster node usage metrics""" logger.info("Collecting cluster node usage metrics...") state["current_step"] = "collect_node_usage" try: params = self._get_time_params() data = await self._call_mcp_tool("query_cluster_node_usage", params) groups = data.get("groups", {}) node_count = sum(len(g.get("nodes", [])) for g in groups.values()) msg = f"Node usage metrics collected. Total nodes analyzed: {node_count}" logger.info(f" → {msg}") return { **state, "node_usage_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect node usage: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "node_usage_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_basic_info_node(self, state: AgentState) -> AgentState: """Collect Prometheus basic OVN info""" logger.info("Collecting Prometheus basic OVN information...") state["current_step"] = "collect_basic_info" try: data = await self._call_mcp_tool("query_prometheus_basic_info", { "include_pod_status": True, "include_db_metrics": True }) msg = "Prometheus basic OVN info collected successfully" logger.info(f" → {msg}") return { **state, "basic_info_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect basic info: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "basic_info_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_api_metrics_node(self, state: AgentState) -> AgentState: """Collect API server metrics""" logger.info("Collecting Kubernetes API server metrics...") state["current_step"] = "collect_api_metrics" try: params = self._get_time_params() data = await self._call_mcp_tool("query_kube_api_metrics", params) summary = data.get("summary", {}) health_score = summary.get("health_score", 0) status = summary.get("overall_status", "unknown") msg = f"API server metrics collected. Health score: {health_score}, Status: {status}" logger.info(f" → {msg}") return { **state, "api_metrics_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect API server metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "api_metrics_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_multus_metrics_node(self, state: AgentState) -> AgentState: """Collect Multus CNI metrics""" logger.info("Collecting Multus CNI metrics...") state["current_step"] = "collect_multus_metrics" try: params = { "pod_pattern": "multus.*|network-metrics.*", "namespace_pattern": "openshift-multus", **self._get_time_params() } data = await self._call_mcp_tool("query_multus_metrics", params) collection_timestamp = data.get("collection_timestamp", "unknown") msg = f"Multus metrics collected. Timestamp: {collection_timestamp}" logger.info(f" → {msg}") return { **state, "multus_metrics_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect Multus metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "multus_metrics_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_ovnk_pods_node(self, state: AgentState) -> AgentState: """Collect OVN-K pods metrics""" logger.info("Collecting OVN-K pods metrics...") state["current_step"] = "collect_ovnk_pods" try: params = { "pod_pattern": "ovnkube.*", "namespace_pattern": "openshift-ovn-kubernetes", **self._get_time_params() } data = await self._call_mcp_tool("query_ovnk_pods_metrics", params) collection_timestamp = data.get("collection_timestamp", "unknown") msg = f"OVN-K pods metrics collected. Timestamp: {collection_timestamp}" logger.info(f" → {msg}") return { **state, "ovnk_pods_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect OVN-K pods metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "ovnk_pods_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_ovnk_containers_node(self, state: AgentState) -> AgentState: """Collect OVN-K containers metrics""" logger.info("Collecting OVN-K containers metrics...") state["current_step"] = "collect_ovnk_containers" try: params = { "pod_pattern": "ovnkube.*", "container_pattern": "ovnkube-controller", "namespace_pattern": "openshift-ovn-kubernetes", **self._get_time_params() } data = await self._call_mcp_tool("query_ovnk_containers_metrics", params) collection_timestamp = data.get("collection_timestamp", "unknown") msg = f"OVN-K containers metrics collected. Timestamp: {collection_timestamp}" logger.info(f" → {msg}") return { **state, "ovnk_containers_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect OVN-K containers metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "ovnk_containers_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_ovs_metrics_node(self, state: AgentState) -> AgentState: """Collect OVS metrics""" logger.info("Collecting OVS metrics...") state["current_step"] = "collect_ovs_metrics" try: params = { "pod_pattern": "ovnkube.*", "namespace_pattern": "openshift-ovn-kubernetes", **self._get_time_params() } data = await self._call_mcp_tool("query_ovnk_ovs_metrics", params) msg = "OVS metrics collected successfully" logger.info(f" → {msg}") return { **state, "ovs_metrics_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect OVS metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "ovs_metrics_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def _collect_latency_metrics_node(self, state: AgentState) -> AgentState: """Collect OVN-K latency metrics""" logger.info("Collecting OVN-K latency metrics...") state["current_step"] = "collect_latency_metrics" try: params = { "include_controller_metrics": True, "include_node_metrics": True, "include_extended_metrics": True, **self._get_time_params() } data = await self._call_mcp_tool("get_ovnk_latency_metrics", params) summary = data.get("summary", {}) total_metrics = summary.get("total_metrics", 0) msg = f"OVN-K latency metrics collected. Total metrics: {total_metrics}" logger.info(f" → {msg}") return { **state, "latency_metrics_data": data, "messages": state["messages"] + [AIMessage(content=msg)] if self.use_ai else [] } except Exception as e: error_msg = f"Failed to collect latency metrics: {str(e)}" logger.warning(f" ✗ {error_msg}") return { **state, "latency_metrics_data": None, "messages": state["messages"] + [AIMessage(content=f"Error: {error_msg}")] if self.use_ai else [] } async def collect_performance_data(self) -> Dict[str, Any]: """Main method to collect performance data""" try: logger.info(f"\n{'='*60}") logger.info(f"Starting {'AI-driven' if self.use_ai else 'direct'} performance data collection...") logger.info(f"{'='*60}\n") # Initialize storage before starting collection await self._initialize_storage() if self.use_ai: # Use LangGraph workflow query_params = { "duration": self.duration, "start_time": self.start_time, "end_time": self.end_time, "testing_id": self.run_uuid } result = await self.analyze(query_params) else: # Direct collection without AI result = await self._direct_collection() # Close storage connection if self.storage: await self.storage.close() return result except Exception as e: logger.error(f"Critical error in performance data collection: {e}") logger.error(f"Traceback: {traceback.format_exc()}") if self.storage: try: await self.storage.close() except: pass return { "run_id": self.run_uuid, "timestamp": datetime.now(timezone.utc).isoformat(), "error": str(e), "success": False, "success_rate": 0, "mode": "ai" if self.use_ai else "direct", "saved_to_db": False } async def _store_data_node(self, state: AgentState) -> AgentState: """Store collected data in DuckDB""" try: logger.info("Storing data in DuckDB...") storage_results = {} if not self.storage: logger.warning("Storage not initialized, skipping data storage") return { **state, "results": { **state.get("results", {}), "storage_results": {"error": "Storage not initialized"} } } # Prepare collection result for storage collection_result = { "run_id": state["testing_id"], "timestamp": datetime.now(timezone.utc).isoformat(), "duration": state.get("duration"), "start_time": state.get("start_time"), "end_time": state.get("end_time"), "collected_data": {}, "errors": [], "success_count": 0, "total_steps": 9, "mode": "ai" if self.use_ai else "direct" } # Add collected data data_mapping = { "cluster_info": state.get("cluster_info"), "node_usage": state.get("node_usage_data"), "basic_info": state.get("basic_info_data"), "api_server": state.get("api_metrics_data"), "multus": state.get("multus_metrics_data"), "ovnk_pods": state.get("ovnk_pods_data"), "ovnk_containers": state.get("ovnk_containers_data"), "ovs_metrics": state.get("ovs_metrics_data"), "latency_metrics": state.get("latency_metrics_data") } for key, value in data_mapping.items(): if value is not None: collection_result["collected_data"][key] = value collection_result["success_count"] += 1 storage_results[key] = "success" else: storage_results[key] = "skipped" collection_result["success_rate"] = (collection_result["success_count"] / collection_result["total_steps"]) * 100 collection_result["success"] = collection_result["success_count"] > 0 # Save to storage try: run_id = await self.storage.save_collection_result(collection_result) storage_results["saved_to_db"] = True storage_results["db_run_id"] = run_id logger.info(f"✅ Results saved to database - Run ID: {run_id}") except Exception as e: logger.error(f"Failed to save to database: {e}") storage_results["saved_to_db"] = False storage_results["db_error"] = str(e) return { **state, "results": { **state.get("results", {}), "storage_results": storage_results }, "messages": state["messages"] + [AIMessage(content="Successfully stored data in DuckDB")] if self.use_ai else [] } except Exception as e: error_msg = f"Error storing data: {str(e)}" logger.error(error_msg) return { **state, "error": error_msg, "messages": state["messages"] + [AIMessage(content=error_msg)] if self.use_ai else [] } async def _finalize_node(self, state: AgentState) -> AgentState: """Finalize results with comprehensive summary""" try: logger.info("Finalizing performance data collection...") # Count successful collections successful_collections = sum([ 1 if state.get("cluster_info") else 0, 1 if state.get("node_usage_data") else 0, 1 if state.get("basic_info_data") else 0, 1 if state.get("api_metrics_data") else 0, 1 if state.get("multus_metrics_data") else 0, 1 if state.get("ovnk_pods_data") else 0, 1 if state.get("ovnk_containers_data") else 0, 1 if state.get("ovs_metrics_data") else 0, 1 if state.get("latency_metrics_data") else 0 ]) total_steps = 9 success_rate = (successful_collections / total_steps) * 100 results = { "testing_id": state["testing_id"], "timestamp": datetime.now(timezone.utc).isoformat(), "duration": state.get("duration"), "start_time": state.get("start_time"), "end_time": state.get("end_time"), "cluster_info": state.get("cluster_info"), "node_usage": state.get("node_usage_data"), "basic_info": state.get("basic_info_data"), "api_metrics": state.get("api_metrics_data"), "multus_metrics": state.get("multus_metrics_data"), "ovnk_pods": state.get("ovnk_pods_data"), "ovnk_containers": state.get("ovnk_containers_data"), "ovs_metrics": state.get("ovs_metrics_data"), "latency_metrics": state.get("latency_metrics_data"), "storage_results": state.get("results", {}).get("storage_results", {}), "success_count": successful_collections, "total_steps": total_steps, "success_rate": success_rate, "status": "success" if successful_collections > 0 else "failed" } logger.info(f"Analysis completed - Success rate: {success_rate:.1f}% ({successful_collections}/{total_steps})") return { **state, "results": results, "messages": state["messages"] + [AIMessage(content="Analysis completed successfully")] if self.use_ai else [] } except Exception as e: error_msg = f"Error finalizing results: {str(e)}" logger.error(error_msg) return { **state, "error": error_msg, "results": {"status": "error", "error": error_msg} } async def _handle_error_node(self, state: AgentState) -> AgentState: """Handle critical errors during collection""" error_msg = state.get("error", "Unknown error occurred") logger.error(f"Handling critical error: {error_msg}") return { **state, "results": { "testing_id": state.get("testing_id"), "timestamp": datetime.now(timezone.utc).isoformat(), "status": "error", "error": error_msg }, "messages": state["messages"] + [AIMessage(content=f"Analysis failed: {error_msg}")] if self.use_ai else [] } async def _call_mcp_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """Call MCP server tool via Streamable HTTP""" if not MCP_CLIENT_AVAILABLE: raise Exception("MCP client not available. Install with: pip install mcp") try: url = f"{self.mcp_server_url}/mcp" async with streamablehttp_client(url) as ( read_stream, write_stream, get_session_id, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() _ = get_session_id() request_data = {"request": params or {}} result = await session.call_tool(tool_name, request_data) if result.content and len(result.content) > 0: text_content = result.content[0].text return json.loads(text_content) raise Exception(f"Empty response from tool {tool_name}") except json.JSONDecodeError as e: raise Exception(f"Failed to parse JSON response from {tool_name}: {str(e)}") except Exception as e: raise Exception(f"Error calling MCP tool {tool_name}: {str(e)}") async def analyze(self, query_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Run the complete analysis workflow""" initial_state = AgentState( messages=[HumanMessage(content="Start OVN-K performance data collection")], current_step="", cluster_info=None, node_usage_data=None, basic_info_data=None, api_metrics_data=None, multus_metrics_data=None, ovnk_pods_data=None, ovnk_containers_data=None, ovs_metrics_data=None, latency_metrics_data=None, duration=self.duration or "5m", start_time=self.start_time, end_time=self.end_time, testing_id="", query_params=query_params or {}, results={}, error=None ) try: final_state = await self.graph.ainvoke(initial_state) return final_state["results"] except Exception as e: logger.error(f"Analysis workflow failed: {str(e)}") return { "status": "error", "error": str(e), "timestamp": datetime.now(timezone.utc).isoformat() } async def collect_performance_data(self) -> Dict[str, Any]: """Main method to collect performance data""" try: logger.info(f"\n{'='*60}") logger.info(f"Starting {'AI-driven' if self.use_ai else 'direct'} performance data collection...") logger.info(f"{'='*60}\n") # Initialize storage if not already initialized if STORAGE_AVAILABLE and not self.storage: try: self.storage = ovnkMetricStor(self.db_path) await self.storage.initialize() logger.info(f"✅ Storage initialized: {self.db_path}") except Exception as e: err_msg = str(e) logger.warning(f"⚠️ Warning: Failed to initialize storage: {err_msg}") if "Could not set lock on file" in err_msg or "Conflicting lock" in err_msg: try: alt_db_path = f"{os.path.splitext(self.db_path)[0]}_{self.run_uuid}.db" logger.info(f"➡️ Retrying storage with per-run DB: {alt_db_path}") self.storage = ovnkMetricStor(alt_db_path) await self.storage.initialize() self.db_path = alt_db_path logger.info(f"✅ Storage initialized with per-run DB: {self.db_path}") except Exception as e2: logger.warning(f"⚠️ Warning: Fallback storage init failed: {e2}") self.storage = None else: self.storage = None if self.use_ai: # Use LangGraph workflow query_params = { "duration": self.duration, "start_time": self.start_time, "end_time": self.end_time, "testing_id": self.run_uuid } result = await self.analyze(query_params) else: # Direct collection without AI result = await self._direct_collection() # Close storage connection if self.storage: await self.storage.close() return result except Exception as e: logger.error(f"Critical error in performance data collection: {e}") logger.error(f"Traceback: {traceback.format_exc()}") if self.storage: try: await self.storage.close() except: pass return { "run_id": self.run_uuid, "timestamp": datetime.now(timezone.utc).isoformat(), "error": str(e), "success": False, "success_rate": 0, "mode": "ai" if self.use_ai else "direct", "saved_to_db": False } async def _direct_collection(self) -> Dict[str, Any]: """Direct collection without AI workflow""" state = { "testing_id": self.run_uuid, "timestamp": datetime.now(timezone.utc).isoformat(), "duration": self.duration, "start_time": self.start_time, "end_time": self.end_time, "cluster_info": None, "node_usage_data": None, "basic_info_data": None, "api_metrics_data": None, "multus_metrics_data": None, "ovnk_pods_data": None, "ovnk_containers_data": None, "ovs_metrics_data": None, "latency_metrics_data": None, "results": {}, "messages": [], "query_params": {}, "current_step": "", "error": None } # Initialize and collect state = await self._initialize_node(state) state = await self._collect_cluster_info_node(state) state = await self._collect_node_usage_node(state) state = await self._collect_basic_info_node(state) state = await self._collect_api_metrics_node(state) state = await self._collect_multus_metrics_node(state) state = await self._collect_ovnk_pods_node(state) state = await self._collect_ovnk_containers_node(state) state = await self._collect_ovs_metrics_node(state) state = await self._collect_latency_metrics_node(state) # Store and finalize state = await self._store_data_node(state) state = await self._finalize_node(state) return state.get("results", {}) async def query_by_duration(self, duration: str = "1h", query_stored_only: bool = False) -> Dict[str, Any]: """Query metrics data by duration. If query_stored_only is True, returns results from DuckDB without collecting new data. Otherwise, collects new data and stores it. """ if query_stored_only: if not self._storage_initialized: storage_ready = await self._initialize_storage() if not storage_ready: return {"status": "error", "error": "Failed to initialize storage"} try: # Query stored data end_time = datetime.now(timezone.utc) # Parse duration to get start time duration_seconds = self._parse_duration_to_seconds(duration) start_time = end_time - timedelta(seconds=duration_seconds) snapshots = await self.storage.get_performance_snapshots( start_time=start_time.isoformat(), end_time=end_time.isoformat(), limit=10 ) return { "status": "success", "query_mode": "stored_only", "duration": duration, "snapshots": snapshots, "timestamp": datetime.now(timezone.utc).isoformat() } except Exception as e: return {"status": "error", "error": str(e)} # Collect new data self.duration = duration self.start_time = None self.end_time = None return await self.collect_performance_data() async def query_by_time_range(self, start_time: str, end_time: str, query_stored_only: bool = False) -> Dict[str, Any]: """Query metrics data by time range (UTC timezone). If query_stored_only is True, returns results from DuckDB without collecting new data. Otherwise, collects new data and stores it. """ if query_stored_only: if not self._storage_initialized: storage_ready = await self._initialize_storage() if not storage_ready: return {"status": "error", "error": "Failed to initialize storage"} try: # Query stored data snapshots = await self.storage.get_performance_snapshots( start_time=start_time, end_time=end_time, limit=10 ) return { "status": "success", "query_mode": "stored_only", "start_time": start_time, "end_time": end_time, "snapshots": snapshots, "timestamp": datetime.now(timezone.utc).isoformat() } except Exception as e: return {"status": "error", "error": str(e)} # Collect new data self.duration = None self.start_time = start_time self.end_time = end_time return await self.collect_performance_data() def _parse_duration_to_seconds(self, duration: str) -> int: """Parse Prometheus duration string to seconds""" import re match = re.match(r'^(\d+)([smhd])$', duration.lower()) if not match: return 3600 # Default 1 hour value = int(match.group(1)) unit = match.group(2) multipliers = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400} return value * multipliers.get(unit, 60) async def main(): """Main function for standalone execution""" import argparse from datetime import timedelta parser = argparse.ArgumentParser(description="OVN-K Performance Data Collection Agent") parser.add_argument("--mcp-url", default="http://localhost:8000", help="MCP server URL (default: http://localhost:8000)") parser.add_argument("--duration", default="5m", help="Collection duration (e.g., 5m, 1h, 1d) - default: 5m") parser.add_argument("--start-time", default=None, help="Start time in ISO format (YYYY-MM-DDTHH:MM:SSZ)") parser.add_argument("--end-time", default=None, help="End time in ISO format (YYYY-MM-DDTHH:MM:SSZ)") parser.add_argument("--no-ai", action="store_true", help="Disable AI mode and use direct collection") parser.add_argument("--db-path", default="storage/ovnk_benchmark.db", help="Database path (default: storage/ovnk_benchmark.db)") parser.add_argument("--output", default=None, help="Output file for collected data (JSON)") parser.add_argument("--query-stored", action="store_true", help="Query stored data only without collecting new data") args = parser.parse_args() # Get configuration from environment or command line mcp_server_url = os.getenv("MCP_SERVER_URL", args.mcp_url) openai_api_key = os.getenv("OPENAI_API_KEY") # Determine time parameters if args.start_time and args.end_time: duration = None start_time = args.start_time end_time = args.end_time logger.info(f"Using time range: {start_time} to {end_time}") else: duration = args.duration start_time = None end_time = None logger.info(f"Using duration: {duration}") # Create agent agent = PerformanceDataAgent( mcp_server_url=mcp_server_url, openai_api_key=openai_api_key, duration=duration, start_time=start_time, end_time=end_time, use_ai=not args.no_ai, db_path=args.db_path ) # Run data collection or query if args.query_stored: logger.info("Querying stored data only...") if duration: result = await agent.query_by_duration(duration, query_stored_only=True) else: result = await agent.query_by_time_range(start_time, end_time, query_stored_only=True) else: result = await agent.collect_performance_data() # Print results logger.info("\n" + "="*60) logger.info("PERFORMANCE DATA COLLECTION RESULTS") logger.info("="*60) if args.query_stored: logger.info(f"Query Mode: STORED DATA ONLY") logger.info(f"Snapshots Found: {len(result.get('snapshots', []))}") logger.info(f"Duration: {result.get('duration', 'N/A')}") logger.info(f"Time Range: {result.get('start_time', 'N/A')} to {result.get('end_time', 'N/A')}") else: testing_id = result.get('testing_id') or result.get('run_id') logger.info(f"Testing ID: {testing_id}") logger.info(f"Mode: {result.get('mode', 'unknown').upper()}") logger.info(f"Success Rate: {result.get('success_rate', 0):.1f}%") logger.info(f"Successful: {result.get('success_count', 0)}/{result.get('total_steps', 9)}") if result.get('duration'): logger.info(f"Duration: {result['duration']}") else: logger.info(f"Time Range: {result.get('start_time')} to {result.get('end_time')}") storage_results = result.get('storage_results', {}) if storage_results.get('saved_to_db'): logger.info(f"✅ Saved to database: {args.db_path}") logger.info(f" DB Run ID: {storage_results.get('db_run_id')}") else: logger.info(f"⚠️ Not saved to database: {storage_results.get('db_error', 'Unknown error')}") logger.info(f"\nCollected Categories:") categories = ['cluster_info', 'node_usage', 'basic_info', 'api_metrics', 'multus_metrics', 'ovnk_pods', 'ovnk_containers', 'ovs_metrics', 'latency_metrics'] for category in categories: if result.get(category): logger.info(f" ✓ {category}") else: logger.info(f" ✗ {category}") # Save to file if requested if args.output: output_file = args.output with open(output_file, 'w') as f: json.dump(result, f, indent=2, default=str) logger.info(f"\n✓ Results saved to: {output_file}") # Return exit code based on success if args.query_stored: return 0 if result.get('status') == 'success' else 1 else: return 0 if result.get('success_rate', 0) > 50 else 1 if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqcui/ovnk-benchmark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server