"""
Smart Query Router - Tries MCP first, falls back to RAG if MCP can't fetch proper data
"""
import re
from typing import Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dateutil import parser
import logging
logger = logging.getLogger(__name__)
class QueryRouter:
"""Routes queries to MCP first, then RAG if needed"""
def __init__(self, call_mcp_func, call_rag_func):
"""
Initialize router with MCP and RAG call functions
Args:
call_mcp_func: Function to call MCP server
call_rag_func: Function to call RAG system
"""
self.call_mcp = call_mcp_func
self.call_rag = call_rag_func
def route_query(self, query: str) -> Dict[str, Any]:
"""
Route query: Try MCP first, fallback to RAG
Returns:
Dict with 'source' ('mcp' or 'rag'), 'response', and 'data'
"""
query_lower = query.lower()
# Try to determine if this is a query MCP can handle
mcp_method, mcp_params = self._determine_mcp_method(query, query_lower)
if mcp_method:
# Try MCP first
logger.info(f"Trying MCP with method: {mcp_method}, params: {mcp_params}")
mcp_result = self.call_mcp(mcp_method, mcp_params)
# Check if MCP returned valid data
if self._is_valid_mcp_response(mcp_result, query):
return {
"source": "mcp",
"response": self._format_mcp_response(mcp_result, query, mcp_method),
"data": mcp_result,
"method": mcp_method
}
else:
logger.info(f"MCP returned insufficient data, falling back to RAG")
# Fallback to RAG
logger.info("Using RAG system for query")
try:
rag_result = self.call_rag(query)
return {
"source": "rag",
"response": rag_result.get("response", "No response from RAG"),
"data": rag_result,
"method": "rag_query"
}
except Exception as e:
logger.error(f"RAG also failed: {e}")
return {
"source": "error",
"response": f"Both MCP and RAG failed. Error: {str(e)}",
"data": None,
"method": None
}
def _determine_mcp_method(self, query: str, query_lower: str) -> Tuple[Optional[str], Dict[str, Any]]:
"""Determine which MCP method to use and extract parameters"""
params = {}
# Extract date/time information
date_info = self._extract_date_time(query)
if date_info and date_info.get("specific_datetime"):
# Convert to datetime for MCP
date = date_info.get("date")
time_tuple = date_info.get("time", (0, 0))
if date:
from datetime import datetime, timedelta
hour, minute = time_tuple
start_time = datetime.combine(date, datetime.min.time().replace(hour=hour, minute=minute))
end_time = start_time + timedelta(minutes=5) # 5-minute window
params["start_time"] = start_time.isoformat()
params["end_time"] = end_time.isoformat()
# CPU queries
if ("cpu" in query_lower and "utilization" in query_lower) or \
("cpu" in query_lower and ("usage" in query_lower or "usage" in query_lower)):
hours = self._extract_hours(query_lower, default=24)
return "get_cpu_utilization", {"hours": hours}
# Memory queries
if ("memory" in query_lower and "utilization" in query_lower) or \
("memory" in query_lower and ("usage" in query_lower or "usage" in query_lower)):
hours = self._extract_hours(query_lower, default=24)
return "get_memory_utilization", {"hours": hours}
# Interface queries
if "interface" in query_lower:
interface_name = self._extract_interface_name(query_lower)
counter_name = self._extract_counter_name(query_lower)
hours = self._extract_hours(query_lower, default=24)
params = {"hours": hours}
if interface_name:
params["interface_name"] = interface_name
if counter_name:
params["counter_name"] = counter_name
return "get_interface_counters", params
# System counter queries
if "system" in query_lower and "counter" in query_lower:
counter_name = self._extract_system_counter_name(query_lower)
hours = self._extract_hours(query_lower, default=24)
params = {"hours": hours}
if counter_name:
params["counter_name"] = counter_name
return "get_system_counters", params
# Latest/current metrics
if "latest" in query_lower or "current" in query_lower or "now" in query_lower:
return "get_latest_metrics", {}
# If we have a specific date/time but MCP can't handle it well, return None to use RAG
if date_info and date_info.get("specific_datetime"):
# MCP works with hours, not specific dates - use RAG for precise date/time queries
return None, {}
return None, {}
def _extract_date_time(self, query: str) -> Optional[Dict[str, Any]]:
"""Extract date and time from query"""
date_patterns = [
r"(\d{4}-\d{2}-\d{2})", # YYYY-MM-DD
r"(\d{1,2}/\d{1,2}/\d{4})", # MM/DD/YYYY
r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2},?\s+\d{4}",
]
time_patterns = [
r"(\d{1,2}):(\d{2})\s*(am|pm)", # 2:10 pm
r"(\d{1,2}):(\d{2})", # 14:10
]
date_found = None
time_found = None
specific_datetime = False
# Try to parse dates
for pattern in date_patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
try:
date_str = match.group(1) if match.lastindex >= 1 else match.group(0)
parsed_date = parser.parse(date_str, fuzzy=True)
date_found = parsed_date.date()
specific_datetime = True
break
except:
pass
# Try to parse time
for pattern in time_patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
try:
hour = int(match.group(1))
minute = int(match.group(2))
if match.lastindex >= 3:
am_pm = match.group(3).lower()
if am_pm == 'pm' and hour != 12:
hour += 12
elif am_pm == 'am' and hour == 12:
hour = 0
time_found = (hour, minute)
specific_datetime = True
break
except:
pass
if date_found or time_found:
result = {"specific_datetime": specific_datetime}
if date_found:
result["date"] = date_found
if time_found:
result["time"] = time_found
return result
return None
def _extract_hours(self, text: str, default: int = 24) -> int:
"""Extract hours from query text"""
patterns = [
r"last\s+(\d+)\s+hours?",
r"(\d+)\s+hours?",
r"past\s+(\d+)\s+hours?",
r"(\d+)\s+hrs?"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return int(match.group(1))
return default
def _extract_interface_name(self, text: str) -> Optional[str]:
"""Extract interface name from query"""
patterns = [
r"interface\s+([\w/]+)",
r"([Gg]igabit[Ee]thernet[\w/]+)",
r"([Tt]en[Gg]igabit[Ee]thernet[\w/]+)",
r"(ge[\w/]+)",
r"(te[\w/]+)"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return None
def _extract_counter_name(self, text: str) -> Optional[str]:
"""Extract interface counter name from query"""
counters = {
"ifInOctets": ["in octets", "input octets", "rx octets"],
"ifOutOctets": ["out octets", "output octets", "tx octets"],
"ifInErrors": ["in errors", "input errors", "rx errors"],
"ifOutErrors": ["out errors", "output errors", "tx errors"],
"ifUtilizationIn": ["utilization in", "input utilization", "rx utilization"],
"ifUtilizationOut": ["utilization out", "output utilization", "tx utilization"],
"ifInUcastPkts": ["in packets", "input packets", "rx packets"],
"ifOutUcastPkts": ["out packets", "output packets", "tx packets"]
}
text_lower = text.lower()
for counter, keywords in counters.items():
for keyword in keywords:
if keyword in text_lower:
return counter
return None
def _extract_system_counter_name(self, text: str) -> Optional[str]:
"""Extract system counter name from query"""
if "cpu" in text.lower() and "utilization" in text.lower():
return "cpuUtilization"
elif "memory" in text.lower() and "utilization" in text.lower():
return "memoryUtilization"
elif "temperature" in text.lower():
return "temperature"
return None
def _is_valid_mcp_response(self, mcp_result: Any, query: str) -> bool:
"""Check if MCP response is valid and has data"""
if not mcp_result:
return False
if isinstance(mcp_result, dict):
if "error" in mcp_result:
return False
# Check if it has actual data
if "result" in mcp_result:
result = mcp_result["result"]
else:
result = mcp_result
# If it's a list, check if it has items
if isinstance(result, list):
return len(result) > 0
# If it's a dict, check if it has meaningful data
if isinstance(result, dict):
# Latest metrics should have timestamp and values
if "timestamp" in result:
return True
# Other dicts should have data
return len(result) > 0
return True
if isinstance(mcp_result, list):
return len(mcp_result) > 0
return True
def _format_mcp_response(self, mcp_result: Any, query: str, method: str) -> str:
"""Format MCP response into a readable string"""
if isinstance(mcp_result, dict) and "result" in mcp_result:
data = mcp_result["result"]
else:
data = mcp_result
if isinstance(data, list):
if len(data) == 0:
return "No data found matching your query."
# Format based on method
if method == "get_cpu_utilization":
result = f"Found {len(data)} CPU utilization records:\n\n"
for item in data[:10]:
result += f"- {item.get('cpu_utilization', 'N/A')}% at {item.get('timestamp', 'N/A')}\n"
return result
elif method == "get_memory_utilization":
result = f"Found {len(data)} memory utilization records:\n\n"
for item in data[:10]:
result += f"- {item.get('memory_utilization', 'N/A')}% at {item.get('timestamp', 'N/A')}\n"
return result
elif method == "get_interface_counters":
result = f"Found {len(data)} interface counter records:\n\n"
for item in data[:10]:
result += f"- {item.get('interface_name', 'N/A')} - {item.get('counter_name', 'N/A')}: {item.get('value', 'N/A')} {item.get('unit', '')} at {item.get('timestamp', 'N/A')}\n"
return result
elif method == "get_system_counters":
result = f"Found {len(data)} system counter records:\n\n"
for item in data[:10]:
result += f"- {item.get('counter_name', 'N/A')}: {item.get('value', 'N/A')} {item.get('unit', '')} at {item.get('timestamp', 'N/A')}\n"
return result
else:
# Generic list formatting
import json
return f"Found {len(data)} records:\n\n{json.dumps(data[:10], indent=2)}"
elif isinstance(data, dict):
if method == "get_latest_metrics":
return f"Latest Metrics (as of {data.get('timestamp', 'N/A')}):\n" \
f"- CPU Utilization: {data.get('cpu_utilization', 'N/A')}%\n" \
f"- Memory Utilization: {data.get('memory_utilization', 'N/A')}%"
else:
import json
return json.dumps(data, indent=2)
return str(data)