Skip to main content
Glama
ApoorvBrooklyn

PM Counter Monitoring MCP Server

query_router.py13.8 kB
""" Smart Query Router - Tries MCP first, falls back to RAG if MCP can't fetch proper data """ import re from typing import Dict, Any, Optional, Tuple from datetime import datetime, timedelta from dateutil import parser import logging logger = logging.getLogger(__name__) class QueryRouter: """Routes queries to MCP first, then RAG if needed""" def __init__(self, call_mcp_func, call_rag_func): """ Initialize router with MCP and RAG call functions Args: call_mcp_func: Function to call MCP server call_rag_func: Function to call RAG system """ self.call_mcp = call_mcp_func self.call_rag = call_rag_func def route_query(self, query: str) -> Dict[str, Any]: """ Route query: Try MCP first, fallback to RAG Returns: Dict with 'source' ('mcp' or 'rag'), 'response', and 'data' """ query_lower = query.lower() # Try to determine if this is a query MCP can handle mcp_method, mcp_params = self._determine_mcp_method(query, query_lower) if mcp_method: # Try MCP first logger.info(f"Trying MCP with method: {mcp_method}, params: {mcp_params}") mcp_result = self.call_mcp(mcp_method, mcp_params) # Check if MCP returned valid data if self._is_valid_mcp_response(mcp_result, query): return { "source": "mcp", "response": self._format_mcp_response(mcp_result, query, mcp_method), "data": mcp_result, "method": mcp_method } else: logger.info(f"MCP returned insufficient data, falling back to RAG") # Fallback to RAG logger.info("Using RAG system for query") try: rag_result = self.call_rag(query) return { "source": "rag", "response": rag_result.get("response", "No response from RAG"), "data": rag_result, "method": "rag_query" } except Exception as e: logger.error(f"RAG also failed: {e}") return { "source": "error", "response": f"Both MCP and RAG failed. Error: {str(e)}", "data": None, "method": None } def _determine_mcp_method(self, query: str, query_lower: str) -> Tuple[Optional[str], Dict[str, Any]]: """Determine which MCP method to use and extract parameters""" params = {} # Extract date/time information date_info = self._extract_date_time(query) if date_info and date_info.get("specific_datetime"): # Convert to datetime for MCP date = date_info.get("date") time_tuple = date_info.get("time", (0, 0)) if date: from datetime import datetime, timedelta hour, minute = time_tuple start_time = datetime.combine(date, datetime.min.time().replace(hour=hour, minute=minute)) end_time = start_time + timedelta(minutes=5) # 5-minute window params["start_time"] = start_time.isoformat() params["end_time"] = end_time.isoformat() # CPU queries if ("cpu" in query_lower and "utilization" in query_lower) or \ ("cpu" in query_lower and ("usage" in query_lower or "usage" in query_lower)): hours = self._extract_hours(query_lower, default=24) return "get_cpu_utilization", {"hours": hours} # Memory queries if ("memory" in query_lower and "utilization" in query_lower) or \ ("memory" in query_lower and ("usage" in query_lower or "usage" in query_lower)): hours = self._extract_hours(query_lower, default=24) return "get_memory_utilization", {"hours": hours} # Interface queries if "interface" in query_lower: interface_name = self._extract_interface_name(query_lower) counter_name = self._extract_counter_name(query_lower) hours = self._extract_hours(query_lower, default=24) params = {"hours": hours} if interface_name: params["interface_name"] = interface_name if counter_name: params["counter_name"] = counter_name return "get_interface_counters", params # System counter queries if "system" in query_lower and "counter" in query_lower: counter_name = self._extract_system_counter_name(query_lower) hours = self._extract_hours(query_lower, default=24) params = {"hours": hours} if counter_name: params["counter_name"] = counter_name return "get_system_counters", params # Latest/current metrics if "latest" in query_lower or "current" in query_lower or "now" in query_lower: return "get_latest_metrics", {} # If we have a specific date/time but MCP can't handle it well, return None to use RAG if date_info and date_info.get("specific_datetime"): # MCP works with hours, not specific dates - use RAG for precise date/time queries return None, {} return None, {} def _extract_date_time(self, query: str) -> Optional[Dict[str, Any]]: """Extract date and time from query""" date_patterns = [ r"(\d{4}-\d{2}-\d{2})", # YYYY-MM-DD r"(\d{1,2}/\d{1,2}/\d{4})", # MM/DD/YYYY r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2},?\s+\d{4}", ] time_patterns = [ r"(\d{1,2}):(\d{2})\s*(am|pm)", # 2:10 pm r"(\d{1,2}):(\d{2})", # 14:10 ] date_found = None time_found = None specific_datetime = False # Try to parse dates for pattern in date_patterns: match = re.search(pattern, query, re.IGNORECASE) if match: try: date_str = match.group(1) if match.lastindex >= 1 else match.group(0) parsed_date = parser.parse(date_str, fuzzy=True) date_found = parsed_date.date() specific_datetime = True break except: pass # Try to parse time for pattern in time_patterns: match = re.search(pattern, query, re.IGNORECASE) if match: try: hour = int(match.group(1)) minute = int(match.group(2)) if match.lastindex >= 3: am_pm = match.group(3).lower() if am_pm == 'pm' and hour != 12: hour += 12 elif am_pm == 'am' and hour == 12: hour = 0 time_found = (hour, minute) specific_datetime = True break except: pass if date_found or time_found: result = {"specific_datetime": specific_datetime} if date_found: result["date"] = date_found if time_found: result["time"] = time_found return result return None def _extract_hours(self, text: str, default: int = 24) -> int: """Extract hours from query text""" patterns = [ r"last\s+(\d+)\s+hours?", r"(\d+)\s+hours?", r"past\s+(\d+)\s+hours?", r"(\d+)\s+hrs?" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return int(match.group(1)) return default def _extract_interface_name(self, text: str) -> Optional[str]: """Extract interface name from query""" patterns = [ r"interface\s+([\w/]+)", r"([Gg]igabit[Ee]thernet[\w/]+)", r"([Tt]en[Gg]igabit[Ee]thernet[\w/]+)", r"(ge[\w/]+)", r"(te[\w/]+)" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(1) return None def _extract_counter_name(self, text: str) -> Optional[str]: """Extract interface counter name from query""" counters = { "ifInOctets": ["in octets", "input octets", "rx octets"], "ifOutOctets": ["out octets", "output octets", "tx octets"], "ifInErrors": ["in errors", "input errors", "rx errors"], "ifOutErrors": ["out errors", "output errors", "tx errors"], "ifUtilizationIn": ["utilization in", "input utilization", "rx utilization"], "ifUtilizationOut": ["utilization out", "output utilization", "tx utilization"], "ifInUcastPkts": ["in packets", "input packets", "rx packets"], "ifOutUcastPkts": ["out packets", "output packets", "tx packets"] } text_lower = text.lower() for counter, keywords in counters.items(): for keyword in keywords: if keyword in text_lower: return counter return None def _extract_system_counter_name(self, text: str) -> Optional[str]: """Extract system counter name from query""" if "cpu" in text.lower() and "utilization" in text.lower(): return "cpuUtilization" elif "memory" in text.lower() and "utilization" in text.lower(): return "memoryUtilization" elif "temperature" in text.lower(): return "temperature" return None def _is_valid_mcp_response(self, mcp_result: Any, query: str) -> bool: """Check if MCP response is valid and has data""" if not mcp_result: return False if isinstance(mcp_result, dict): if "error" in mcp_result: return False # Check if it has actual data if "result" in mcp_result: result = mcp_result["result"] else: result = mcp_result # If it's a list, check if it has items if isinstance(result, list): return len(result) > 0 # If it's a dict, check if it has meaningful data if isinstance(result, dict): # Latest metrics should have timestamp and values if "timestamp" in result: return True # Other dicts should have data return len(result) > 0 return True if isinstance(mcp_result, list): return len(mcp_result) > 0 return True def _format_mcp_response(self, mcp_result: Any, query: str, method: str) -> str: """Format MCP response into a readable string""" if isinstance(mcp_result, dict) and "result" in mcp_result: data = mcp_result["result"] else: data = mcp_result if isinstance(data, list): if len(data) == 0: return "No data found matching your query." # Format based on method if method == "get_cpu_utilization": result = f"Found {len(data)} CPU utilization records:\n\n" for item in data[:10]: result += f"- {item.get('cpu_utilization', 'N/A')}% at {item.get('timestamp', 'N/A')}\n" return result elif method == "get_memory_utilization": result = f"Found {len(data)} memory utilization records:\n\n" for item in data[:10]: result += f"- {item.get('memory_utilization', 'N/A')}% at {item.get('timestamp', 'N/A')}\n" return result elif method == "get_interface_counters": result = f"Found {len(data)} interface counter records:\n\n" for item in data[:10]: result += f"- {item.get('interface_name', 'N/A')} - {item.get('counter_name', 'N/A')}: {item.get('value', 'N/A')} {item.get('unit', '')} at {item.get('timestamp', 'N/A')}\n" return result elif method == "get_system_counters": result = f"Found {len(data)} system counter records:\n\n" for item in data[:10]: result += f"- {item.get('counter_name', 'N/A')}: {item.get('value', 'N/A')} {item.get('unit', '')} at {item.get('timestamp', 'N/A')}\n" return result else: # Generic list formatting import json return f"Found {len(data)} records:\n\n{json.dumps(data[:10], indent=2)}" elif isinstance(data, dict): if method == "get_latest_metrics": return f"Latest Metrics (as of {data.get('timestamp', 'N/A')}):\n" \ f"- CPU Utilization: {data.get('cpu_utilization', 'N/A')}%\n" \ f"- Memory Utilization: {data.get('memory_utilization', 'N/A')}%" else: import json return json.dumps(data, indent=2) return str(data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ApoorvBrooklyn/Networking-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server