"""
Query Engine for PM Data - Natural language to SQL translation
"""
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
import psycopg2
from dateutil import parser as date_parser
from dateutil.relativedelta import relativedelta
logger = logging.getLogger(__name__)
class QueryEngine:
"""Query engine for translating natural language queries to SQL"""
def __init__(self, host: str, database: str, user: str, password: str, port: int = 5432):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.conn: Optional[psycopg2.extensions.connection] = None
def connect(self) -> bool:
"""Establish database connection"""
try:
self.conn = psycopg2.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port
)
logger.info(f"Query engine connected to database {self.database}")
return True
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.conn:
self.conn.close()
def parse_natural_language_query(self, query: str) -> Dict[str, Any]:
"""
Parse natural language query to extract:
- counter_name
- date/time
- interface_name (optional)
- query_type
"""
parsed = {
"counter_name": None,
"datetime": None,
"interface_name": None,
"query_type": "counter_value"
}
query_lower = query.lower()
# Extract counter name (common patterns)
counter_patterns = [
r'\b(ifUtilizationIn|ifUtilizationOut|ifInOctets|ifOutOctets|'
r'ifInUcastPkts|ifOutUcastPkts|ifInErrors|ifOutErrors|'
r'cpuUtilization|memoryUtilization|temperature|'
r'ipInReceives|tcpActiveOpens|bgpPeerState)\b',
r'(\w+)\s+(value|utilization|counter)',
]
for pattern in counter_patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
parsed["counter_name"] = match.group(1) if match.lastindex else match.group(0)
break
# Extract date/time
# Patterns: "on 2024-01-16 at 2:10 pm", "on 2024-01-16", "at 2:10 pm"
# Try full pattern first: "on 2024-01-16 at 2:10 pm"
full_pattern = r'on\s+(\d{4}-\d{2}-\d{2})\s+at\s+(\d{1,2}):?(\d{2})?\s*(am|pm)'
match = re.search(full_pattern, query_lower)
if match:
try:
date_part = match.group(1)
hour = match.group(2)
minute = match.group(3) if match.group(3) else "00"
am_pm = match.group(4)
date_str = f"{date_part} {hour}:{minute} {am_pm}"
parsed["datetime"] = self._parse_datetime_string(date_str)
except Exception as e:
logger.debug(f"Failed to parse full pattern: {e}")
# If full pattern didn't work, try date only
if not parsed["datetime"]:
date_only_pattern = r'on\s+(\d{4}-\d{2}-\d{2})'
match = re.search(date_only_pattern, query_lower)
if match:
try:
date_str = match.group(1)
parsed["datetime"] = self._parse_datetime_string(date_str)
except Exception as e:
logger.debug(f"Failed to parse date only: {e}")
# If still no datetime, try time only (assumes today)
if not parsed["datetime"]:
time_pattern = r'at\s+(\d{1,2}):?(\d{2})?\s*(am|pm)'
match = re.search(time_pattern, query_lower)
if match:
try:
from datetime import datetime, timezone
hour = match.group(1)
minute = match.group(2) if match.group(2) else "00"
am_pm = match.group(3)
time_str = f"{hour}:{minute} {am_pm}"
# Use today's date
today = datetime.now(timezone.utc).date()
date_str = f"{today} {time_str}"
parsed["datetime"] = self._parse_datetime_string(date_str)
except Exception as e:
logger.debug(f"Failed to parse time only: {e}")
# Extract interface name
interface_match = re.search(r'(GigabitEthernet|TenGigabitEthernet|interface)\s+([\w/]+)', query, re.IGNORECASE)
if interface_match:
parsed["interface_name"] = interface_match.group(2) if interface_match.lastindex >= 2 else interface_match.group(0)
return parsed
def _parse_datetime_string(self, dt_str: str) -> Optional[datetime]:
"""Parse various datetime string formats"""
try:
# Try dateutil parser first (handles most formats)
parsed = date_parser.parse(dt_str, fuzzy=True)
# If no timezone info, assume UTC
if parsed.tzinfo is None:
from datetime import timezone
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except Exception as e:
logger.debug(f"dateutil parse failed for '{dt_str}': {e}")
# Fallback to manual parsing
try:
# Format: "2024-01-16 2:10 pm" or "2024-01-16 14:10"
if 'pm' in dt_str.lower() or 'am' in dt_str.lower():
# 12-hour format - handle "2:10 pm" or "2 pm"
import re
# Match date and time separately
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', dt_str)
time_match = re.search(r'(\d{1,2}):?(\d{2})?\s*(am|pm)', dt_str.lower())
if date_match and time_match:
date_part = date_match.group(1)
hour = int(time_match.group(1))
minute = int(time_match.group(2)) if time_match.group(2) else 0
am_pm = time_match.group(3)
# Convert to 24-hour format
if am_pm == 'pm' and hour != 12:
hour += 12
elif am_pm == 'am' and hour == 12:
hour = 0
from datetime import timezone
dt = datetime.strptime(f"{date_part} {hour:02d}:{minute:02d}", "%Y-%m-%d %H:%M")
dt = dt.replace(tzinfo=timezone.utc)
return dt
else:
# 24-hour format
from datetime import timezone
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M")
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception as e:
logger.debug(f"Manual parse failed for '{dt_str}': {e}")
try:
# Just date
from datetime import timezone
dt = datetime.strptime(dt_str, "%Y-%m-%d")
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception as e2:
logger.debug(f"Date-only parse failed for '{dt_str}': {e2}")
pass
return None
def query_counter_value(self, counter_name: str, query_time: datetime,
interface_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Query counter value at a specific time
Returns the value from the measurement interval that contains the query_time
"""
results = []
if not self.conn:
raise ConnectionError("Not connected to database")
try:
with self.conn.cursor() as cur:
# Find measurement interval containing the query time
# Use >= for start_time and <= for end_time to include boundary times
cur.execute("""
SELECT id, start_time, end_time
FROM measurement_intervals
WHERE start_time <= %s AND end_time >= %s
ORDER BY start_time DESC
LIMIT 1
""", (query_time, query_time))
interval_row = cur.fetchone()
if not interval_row:
# Try to find the closest interval (for better error messages)
cur.execute("""
SELECT id, start_time, end_time,
ABS(EXTRACT(EPOCH FROM (start_time - %s))) as distance
FROM measurement_intervals
ORDER BY distance
LIMIT 1
""", (query_time,))
closest = cur.fetchone()
if closest:
logger.warning(
f"No measurement interval found for time {query_time}. "
f"Closest interval: {closest[1]} to {closest[2]}"
)
else:
logger.warning(f"No measurement intervals found in database")
return results
interval_id = interval_row[0]
# Query interface counter if interface_name is provided
if interface_name:
cur.execute("""
SELECT i.name, ic.counter_name, ic.value, ic.unit, mi.start_time, mi.end_time
FROM interface_counters ic
JOIN interfaces i ON ic.interface_id = i.id
JOIN measurement_intervals mi ON i.interval_id = mi.id
WHERE i.interval_id = %s
AND i.name = %s
AND ic.counter_name = %s
""", (interval_id, interface_name, counter_name))
for row in cur.fetchall():
results.append({
"interface_name": row[0],
"counter_name": row[1],
"value": float(row[2]) if row[2] else None,
"unit": row[3],
"start_time": row[4],
"end_time": row[5],
"query_time": query_time.isoformat()
})
else:
# Try interface counters first
cur.execute("""
SELECT i.name, ic.counter_name, ic.value, ic.unit, mi.start_time, mi.end_time
FROM interface_counters ic
JOIN interfaces i ON ic.interface_id = i.id
JOIN measurement_intervals mi ON i.interval_id = mi.id
WHERE i.interval_id = %s
AND ic.counter_name = %s
""", (interval_id, counter_name))
for row in cur.fetchall():
results.append({
"interface_name": row[0],
"counter_name": row[1],
"value": float(row[2]) if row[2] else None,
"unit": row[3],
"start_time": row[4],
"end_time": row[5],
"query_time": query_time.isoformat()
})
# If no interface counter found, try system/IP/TCP counters
if not results:
for table in ['system_counters', 'ip_counters', 'tcp_counters']:
cur.execute(f"""
SELECT %s as counter_name, value, unit, mi.start_time, mi.end_time
FROM {table} sc
JOIN measurement_intervals mi ON sc.interval_id = mi.id
WHERE sc.interval_id = %s
AND sc.counter_name = %s
""", (counter_name, interval_id, counter_name))
for row in cur.fetchall():
results.append({
"counter_name": row[0],
"value": float(row[1]) if row[1] else None,
"unit": row[2],
"start_time": row[3],
"end_time": row[4],
"query_time": query_time.isoformat()
})
if results:
break
except Exception as e:
logger.error(f"Error querying counter value: {e}")
raise
return results
def list_interfaces(self) -> List[Dict[str, Any]]:
"""List all available interfaces"""
results = []
if not self.conn:
raise ConnectionError("Not connected to database")
try:
with self.conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT name, if_index, if_type
FROM interfaces
ORDER BY name
""")
for row in cur.fetchall():
results.append({
"name": row[0],
"if_index": row[1],
"if_type": row[2]
})
except Exception as e:
logger.error(f"Error listing interfaces: {e}")
raise
return results
def list_counters(self) -> Dict[str, List[str]]:
"""List all available counter types grouped by category"""
counters = {
"interface": [],
"ip": [],
"tcp": [],
"system": [],
"bgp": []
}
if not self.conn:
raise ConnectionError("Not connected to database")
try:
with self.conn.cursor() as cur:
# Interface counters
cur.execute("SELECT DISTINCT counter_name FROM interface_counters ORDER BY counter_name")
counters["interface"] = [row[0] for row in cur.fetchall()]
# IP counters
cur.execute("SELECT DISTINCT counter_name FROM ip_counters ORDER BY counter_name")
counters["ip"] = [row[0] for row in cur.fetchall()]
# TCP counters
cur.execute("SELECT DISTINCT counter_name FROM tcp_counters ORDER BY counter_name")
counters["tcp"] = [row[0] for row in cur.fetchall()]
# System counters
cur.execute("SELECT DISTINCT counter_name FROM system_counters ORDER BY counter_name")
counters["system"] = [row[0] for row in cur.fetchall()]
# BGP counters
cur.execute("SELECT DISTINCT counter_name FROM bgp_counters ORDER BY counter_name")
counters["bgp"] = [row[0] for row in cur.fetchall()]
except Exception as e:
logger.error(f"Error listing counters: {e}")
raise
return counters
def query_alerts(self, start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> List[Dict[str, Any]]:
"""Query threshold alerts for a time range"""
results = []
if not self.conn:
raise ConnectionError("Not connected to database")
try:
with self.conn.cursor() as cur:
if start_time and end_time:
cur.execute("""
SELECT severity, timestamp, parameter, value, threshold, description
FROM threshold_alerts
WHERE timestamp >= %s AND timestamp <= %s
ORDER BY timestamp DESC
""", (start_time, end_time))
else:
cur.execute("""
SELECT severity, timestamp, parameter, value, threshold, description
FROM threshold_alerts
ORDER BY timestamp DESC
LIMIT 100
""")
for row in cur.fetchall():
results.append({
"severity": row[0],
"timestamp": row[1].isoformat() if row[1] else None,
"parameter": row[2],
"value": float(row[3]) if row[3] else None,
"threshold": float(row[4]) if row[4] else None,
"description": row[5]
})
except Exception as e:
logger.error(f"Error querying alerts: {e}")
raise
return results
def execute_natural_language_query(self, query: str) -> Dict[str, Any]:
"""Execute a natural language query"""
try:
parsed = self.parse_natural_language_query(query)
if not parsed["counter_name"]:
return {
"query": query,
"parsed": parsed,
"error": "Could not extract counter name from query. Please specify a counter like 'ifUtilizationIn', 'cpuUtilization', etc.",
"results": []
}
if not parsed["datetime"]:
return {
"query": query,
"parsed": parsed,
"error": "Could not extract datetime from query. Please specify a date and time like 'on 2024-01-16 at 2:10 pm'.",
"results": []
}
results = self.query_counter_value(
counter_name=parsed["counter_name"],
query_time=parsed["datetime"],
interface_name=parsed.get("interface_name")
)
# If no results, provide helpful error message
if not results:
# Check if there are any intervals near this time
try:
with self.conn.cursor() as cur:
cur.execute("""
SELECT start_time, end_time,
ABS(EXTRACT(EPOCH FROM (start_time - %s))) as distance_seconds
FROM measurement_intervals
ORDER BY distance_seconds
LIMIT 1
""", (parsed["datetime"],))
closest = cur.fetchone()
if closest:
closest_start = closest[0]
closest_end = closest[1]
distance_minutes = int(closest[2] / 60)
if distance_minutes > 0:
error_msg = (
f"No data found for {parsed['datetime'].strftime('%Y-%m-%d %I:%M %p')}. "
f"Closest measurement interval: {closest_start.strftime('%Y-%m-%d %I:%M %p')} to {closest_end.strftime('%I:%M %p')} "
f"({distance_minutes} minutes away)."
)
else:
error_msg = (
f"No data found for {parsed['datetime'].strftime('%Y-%m-%d %I:%M %p')}. "
f"Available measurement intervals start at {closest_start.strftime('%Y-%m-%d %I:%M %p')}."
)
else:
error_msg = "No measurement intervals found in database."
return {
"query": query,
"parsed": {
"counter_name": parsed["counter_name"],
"datetime": parsed["datetime"].isoformat(),
"interface_name": parsed.get("interface_name")
},
"error": error_msg,
"results": [],
"count": 0
}
except Exception as e:
logger.debug(f"Error getting closest interval: {e}")
return {
"query": query,
"parsed": {
"counter_name": parsed["counter_name"],
"datetime": parsed["datetime"].isoformat() if parsed["datetime"] else None,
"interface_name": parsed.get("interface_name")
},
"results": results,
"count": len(results)
}
except Exception as e:
logger.error(f"Error executing natural language query: {e}", exc_info=True)
return {
"query": query,
"error": f"Query execution failed: {str(e)}",
"results": []
}