"""
MCP Server for PM Data Queries
Simple server that exposes query engine functionality
"""
import os
import logging
from typing import Dict, Any
from datetime import datetime
from dateutil import parser as date_parser
from dotenv import load_dotenv
from query_engine import QueryEngine
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MCPServer:
"""MCP Server for PM data queries"""
def __init__(self):
# Database configuration
self.db_host = os.getenv("POSTGRES_HOST", "postgres")
self.db_database = os.getenv("POSTGRES_DB", "pm_data")
self.db_user = os.getenv("POSTGRES_USER", "postgres")
self.db_password = os.getenv("POSTGRES_PASSWORD", "postgres")
self.db_port = int(os.getenv("POSTGRES_PORT", "5432"))
self.query_engine = QueryEngine(
host=self.db_host,
database=self.db_database,
user=self.db_user,
password=self.db_password,
port=self.db_port
)
def connect(self) -> bool:
"""Connect to database"""
return self.query_engine.connect()
def disconnect(self):
"""Disconnect from database"""
self.query_engine.disconnect()
def query_counter(self, counter_name: str, query_time: str,
interface_name: str = None) -> Dict[str, Any]:
"""Query counter value at a specific time"""
try:
# Handle ISO format with timezone
if 'Z' in query_time:
query_time = query_time.replace('Z', '+00:00')
try:
dt = datetime.fromisoformat(query_time)
except:
# Fallback to dateutil parser
dt = date_parser.parse(query_time)
results = self.query_engine.query_counter_value(
counter_name=counter_name,
query_time=dt,
interface_name=interface_name
)
return {
"success": True,
"results": results,
"count": len(results)
}
except Exception as e:
logger.error(f"Error in query_counter: {e}")
return {
"success": False,
"error": str(e),
"results": []
}
def query_natural_language(self, query: str) -> Dict[str, Any]:
"""Execute natural language query"""
try:
# Ensure database connection
if not self.query_engine.conn:
if not self.query_engine.connect():
return {
"success": False,
"error": "Failed to connect to database",
"query": query,
"results": []
}
result = self.query_engine.execute_natural_language_query(query)
# Mark as success if no error, or if it's a helpful "no data" message
if "error" not in result:
result["success"] = True
elif "Closest measurement interval" in result.get("error", ""):
# This is a helpful message, not a real error
result["success"] = True
else:
result["success"] = False
return result
except Exception as e:
logger.error(f"Error in query_natural_language: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"query": query,
"results": []
}
def list_interfaces(self) -> Dict[str, Any]:
"""List all interfaces"""
try:
interfaces = self.query_engine.list_interfaces()
return {
"success": True,
"interfaces": interfaces,
"count": len(interfaces)
}
except Exception as e:
logger.error(f"Error in list_interfaces: {e}")
return {
"success": False,
"error": str(e),
"interfaces": []
}
def list_counters(self) -> Dict[str, Any]:
"""List all counters"""
try:
counters = self.query_engine.list_counters()
return {
"success": True,
"counters": counters
}
except Exception as e:
logger.error(f"Error in list_counters: {e}")
return {
"success": False,
"error": str(e),
"counters": {}
}
def query_alerts(self, start_time: str = None, end_time: str = None) -> Dict[str, Any]:
"""Query alerts for time range"""
from datetime import datetime
try:
start_dt = None
end_dt = None
if start_time:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
if end_time:
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
alerts = self.query_engine.query_alerts(start_dt, end_dt)
return {
"success": True,
"alerts": alerts,
"count": len(alerts)
}
except Exception as e:
logger.error(f"Error in query_alerts: {e}")
return {
"success": False,
"error": str(e),
"alerts": []
}
# Singleton instance
_server_instance: MCPServer = None
def get_server() -> MCPServer:
"""Get or create MCP server instance"""
global _server_instance
if _server_instance is None:
_server_instance = MCPServer()
if not _server_instance.connect():
logger.error("Failed to connect MCP server to database")
elif not _server_instance.query_engine.conn:
# Reconnect if connection was lost
logger.warning("MCP server connection lost, reconnecting...")
_server_instance.connect()
return _server_instance
if __name__ == "__main__":
# For testing
server = MCPServer()
if server.connect():
logger.info("MCP Server connected")
# Test queries
result = server.query_natural_language("What is the ifUtilizationIn value on 2024-01-16 at 2:10 pm?")
print(result)
server.disconnect()