"""
MCP (Model Context Protocol) server for PM counter data access
"""
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from database import get_db, MeasurementInterval, InterfaceCounter, SystemCounter
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
from config import Config
import json
app = FastAPI(title="PM Counter MCP Server", version="1.0.0")
class MCPRequest(BaseModel):
"""MCP request model"""
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
"""MCP response model"""
result: Optional[Any] = None
error: Optional[str] = None
@app.post("/mcp")
async def mcp_endpoint(request: MCPRequest, db: Session = Depends(get_db)):
"""Main MCP endpoint"""
try:
params = request.params or {}
# Parse date/time if provided
start_time = None
end_time = None
if "start_time" in params:
start_time = datetime.fromisoformat(params["start_time"].replace("Z", "+00:00"))
if "end_time" in params:
end_time = datetime.fromisoformat(params["end_time"].replace("Z", "+00:00"))
if request.method == "get_interface_counters":
result = get_interface_counters_mcp(
db=db,
interface_name=params.get("interface_name"),
counter_name=params.get("counter_name"),
hours=params.get("hours", 24),
start_time=start_time,
end_time=end_time
)
elif request.method == "get_system_counters":
result = get_system_counters_mcp(
db=db,
counter_name=params.get("counter_name"),
hours=params.get("hours", 24),
start_time=start_time,
end_time=end_time
)
elif request.method == "get_cpu_utilization":
result = get_cpu_utilization_mcp(
db=db,
hours=params.get("hours", 24),
start_time=start_time,
end_time=end_time
)
elif request.method == "get_memory_utilization":
result = get_memory_utilization_mcp(
db=db,
hours=params.get("hours", 24),
start_time=start_time,
end_time=end_time
)
elif request.method == "get_latest_metrics":
result = get_latest_metrics_mcp(db=db)
else:
return MCPResponse(error=f"Unknown method: {request.method}")
return MCPResponse(result=result)
except Exception as e:
import traceback
return MCPResponse(error=f"{str(e)}\n{traceback.format_exc()}")
def get_interface_counters_mcp(
db: Session,
interface_name: Optional[str] = None,
counter_name: Optional[str] = None,
hours: int = 24,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get interface counters via MCP"""
if start_time and end_time:
# Use specific time range
time_filter = and_(
MeasurementInterval.start_time >= start_time,
MeasurementInterval.end_time <= end_time
)
else:
# Use hours-based filter
time_start = datetime.utcnow() - timedelta(hours=hours)
time_filter = MeasurementInterval.start_time >= time_start
query = db.query(
InterfaceCounter.interface_name,
InterfaceCounter.counter_name,
InterfaceCounter.value,
InterfaceCounter.unit,
MeasurementInterval.start_time
).join(MeasurementInterval).filter(time_filter)
if interface_name:
query = query.filter(InterfaceCounter.interface_name.ilike(f"%{interface_name}%"))
if counter_name:
query = query.filter(InterfaceCounter.counter_name.ilike(f"%{counter_name}%"))
results = query.order_by(MeasurementInterval.start_time.desc()).limit(100).all()
return [
{
"interface_name": r.interface_name,
"counter_name": r.counter_name,
"value": r.value,
"unit": r.unit,
"timestamp": r.start_time.isoformat()
}
for r in results
]
def get_system_counters_mcp(
db: Session,
counter_name: Optional[str] = None,
hours: int = 24,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get system counters via MCP"""
if start_time and end_time:
# Use specific time range
from sqlalchemy import and_
time_filter = and_(
MeasurementInterval.start_time >= start_time,
MeasurementInterval.end_time <= end_time
)
else:
# Use hours-based filter
time_start = datetime.utcnow() - timedelta(hours=hours)
time_filter = MeasurementInterval.start_time >= time_start
query = db.query(
SystemCounter.counter_name,
SystemCounter.value,
SystemCounter.unit,
MeasurementInterval.start_time
).join(MeasurementInterval).filter(time_filter)
if counter_name:
query = query.filter(SystemCounter.counter_name.ilike(f"%{counter_name}%"))
results = query.order_by(MeasurementInterval.start_time.desc()).limit(100).all()
return [
{
"counter_name": r.counter_name,
"value": r.value,
"unit": r.unit,
"timestamp": r.start_time.isoformat()
}
for r in results
]
def get_cpu_utilization_mcp(
db: Session,
hours: int = 24,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get CPU utilization via MCP"""
if start_time and end_time:
from sqlalchemy import and_
time_filter = and_(
MeasurementInterval.start_time >= start_time,
MeasurementInterval.end_time <= end_time
)
else:
time_start = datetime.utcnow() - timedelta(hours=hours)
time_filter = MeasurementInterval.start_time >= time_start
results = db.query(
SystemCounter.value,
MeasurementInterval.start_time
).join(MeasurementInterval).filter(
SystemCounter.counter_name == 'cpuUtilization',
time_filter
).order_by(MeasurementInterval.start_time.desc()).limit(100).all()
return [
{
"cpu_utilization": r.value,
"timestamp": r.start_time.isoformat()
}
for r in results
]
def get_memory_utilization_mcp(
db: Session,
hours: int = 24,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get memory utilization via MCP"""
if start_time and end_time:
from sqlalchemy import and_
time_filter = and_(
MeasurementInterval.start_time >= start_time,
MeasurementInterval.end_time <= end_time
)
else:
time_start = datetime.utcnow() - timedelta(hours=hours)
time_filter = MeasurementInterval.start_time >= time_start
results = db.query(
SystemCounter.value,
MeasurementInterval.start_time
).join(MeasurementInterval).filter(
SystemCounter.counter_name == 'memoryUtilization',
time_filter
).order_by(MeasurementInterval.start_time.desc()).limit(100).all()
return [
{
"memory_utilization": r.value,
"timestamp": r.start_time.isoformat()
}
for r in results
]
def get_latest_metrics_mcp(db: Session) -> Dict[str, Any]:
"""Get latest metrics summary via MCP"""
latest_interval = db.query(MeasurementInterval).order_by(
MeasurementInterval.start_time.desc()
).first()
if not latest_interval:
return {"error": "No data available"}
# Get latest CPU
latest_cpu = db.query(SystemCounter).join(MeasurementInterval).filter(
SystemCounter.counter_name == 'cpuUtilization',
MeasurementInterval.id == latest_interval.id
).first()
# Get latest memory
latest_memory = db.query(SystemCounter).join(MeasurementInterval).filter(
SystemCounter.counter_name == 'memoryUtilization',
MeasurementInterval.id == latest_interval.id
).first()
return {
"timestamp": latest_interval.start_time.isoformat(),
"cpu_utilization": latest_cpu.value if latest_cpu else None,
"memory_utilization": latest_memory.value if latest_memory else None
}
@app.get("/mcp/methods")
def list_methods():
"""List available MCP methods"""
return {
"methods": [
{
"name": "get_interface_counters",
"description": "Get interface performance counters",
"params": ["interface_name (optional)", "counter_name (optional)", "hours (default: 24)"]
},
{
"name": "get_system_counters",
"description": "Get system performance counters",
"params": ["counter_name (optional)", "hours (default: 24)"]
},
{
"name": "get_cpu_utilization",
"description": "Get CPU utilization metrics",
"params": ["hours (default: 24)"]
},
{
"name": "get_memory_utilization",
"description": "Get memory utilization metrics",
"params": ["hours (default: 24)"]
},
{
"name": "get_latest_metrics",
"description": "Get latest metrics summary",
"params": []
}
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=Config.MCP_HOST, port=Config.MCP_PORT)