"""HTTP server for MCP with OAuth proxy support."""
import logging
from typing import Any, Optional
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from .config import config
from .backends.prometheus import PrometheusClient
from .backends.loki import LokiClient
from .tools import prometheus_tools, loki_tools
# Setup logging
logging.basicConfig(
level=getattr(logging, config.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize clients
prometheus_client = PrometheusClient(config.prometheus)
loki_client = LokiClient(config.loki)
# Create FastAPI app
app = FastAPI(
title=config.server_name,
description="MCP server for Prometheus and Loki with OAuth proxy support",
version="1.0.0"
)
class ToolCallRequest(BaseModel):
"""MCP tool call request."""
tool: str
arguments: dict[str, Any]
class ToolCallResponse(BaseModel):
"""MCP tool call response."""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
@app.get("/health")
async def health_check():
"""Health check endpoint for K8s."""
return {"status": "healthy", "server": config.server_name}
@app.get("/tools")
async def list_tools():
"""List available MCP tools."""
tools = [
# Prometheus tools
{
"name": "query_prometheus",
"description": "Execute PromQL instant query",
"parameters": ["query", "time (optional)"]
},
{
"name": "query_prometheus_range",
"description": "Execute PromQL range query",
"parameters": ["query", "start (optional)", "end (optional)", "step (optional)"]
},
{
"name": "list_metrics",
"description": "List available metrics",
"parameters": ["prefix (optional)"]
},
{
"name": "list_label_values",
"description": "Get label values",
"parameters": ["label", "metric (optional)"]
},
{
"name": "list_labels",
"description": "List all labels",
"parameters": ["metric (optional)"]
},
# Loki tools
{
"name": "query_loki",
"description": "Execute LogQL query",
"parameters": ["query", "start (optional)", "end (optional)", "limit (optional)", "direction (optional)"]
},
{
"name": "search_logs",
"description": "Search logs with filters",
"parameters": ["service (optional)", "namespace (optional)", "search_text (optional)", "level (optional)", "start (optional)", "end (optional)", "limit (optional)"]
},
{
"name": "list_log_labels",
"description": "List log labels",
"parameters": ["start (optional)", "end (optional)"]
},
{
"name": "list_log_label_values",
"description": "Get log label values",
"parameters": ["label", "start (optional)", "end (optional)"]
},
]
return {"tools": tools}
@app.post("/call", response_model=ToolCallResponse)
async def call_tool(request: ToolCallRequest):
"""Execute MCP tool call."""
try:
tool_name = request.tool
arguments = request.arguments
logger.info(f"Tool called: {tool_name}")
# Route to appropriate tool
result = None
# Prometheus tools
if tool_name == "query_prometheus":
result = await prometheus_tools.query_prometheus(
prometheus_client,
arguments["query"],
arguments.get("time")
)
elif tool_name == "query_prometheus_range":
result = await prometheus_tools.query_prometheus_range(
prometheus_client,
arguments["query"],
arguments.get("start"),
arguments.get("end"),
arguments.get("step", "15s")
)
elif tool_name == "list_metrics":
result = await prometheus_tools.list_metrics(
prometheus_client,
arguments.get("prefix")
)
elif tool_name == "list_label_values":
result = await prometheus_tools.list_label_values(
prometheus_client,
arguments["label"],
arguments.get("metric")
)
elif tool_name == "list_labels":
result = await prometheus_tools.list_labels(
prometheus_client,
arguments.get("metric")
)
# Loki tools
elif tool_name == "query_loki":
result = await loki_tools.query_loki(
loki_client,
arguments["query"],
arguments.get("start"),
arguments.get("end"),
arguments.get("limit", 100),
arguments.get("direction", "backward")
)
elif tool_name == "search_logs":
result = await loki_tools.search_logs(
loki_client,
arguments.get("service"),
arguments.get("namespace"),
arguments.get("search_text"),
arguments.get("level"),
arguments.get("start"),
arguments.get("end"),
arguments.get("limit", 100)
)
elif tool_name == "list_log_labels":
result = await loki_tools.list_log_labels(
loki_client,
arguments.get("start"),
arguments.get("end")
)
elif tool_name == "list_log_label_values":
result = await loki_tools.list_log_label_values(
loki_client,
arguments["label"],
arguments.get("start"),
arguments.get("end")
)
else:
raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}")
return ToolCallResponse(success=True, data=result)
except Exception as e:
logger.error(f"Error executing tool {request.tool}: {e}", exc_info=True)
return ToolCallResponse(success=False, error=str(e))
@app.on_event("startup")
async def startup_event():
"""Log startup information."""
logger.info(f"Starting {config.server_name} HTTP server...")
logger.info(f"Prometheus URL: {config.prometheus_url}")
logger.info(f"Loki URL: {config.loki_url}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)