Skip to main content
Glama
agarwalvivek29

OpenTelemetry MCP Server

server_http.py6.76 kB
"""HTTP server for MCP with OAuth proxy support.""" import logging from typing import Any, Optional from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from .config import config from .backends.prometheus import PrometheusClient from .backends.loki import LokiClient from .tools import prometheus_tools, loki_tools # Setup logging logging.basicConfig( level=getattr(logging, config.log_level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize clients prometheus_client = PrometheusClient(config.prometheus) loki_client = LokiClient(config.loki) # Create FastAPI app app = FastAPI( title=config.server_name, description="MCP server for Prometheus and Loki with OAuth proxy support", version="1.0.0" ) class ToolCallRequest(BaseModel): """MCP tool call request.""" tool: str arguments: dict[str, Any] class ToolCallResponse(BaseModel): """MCP tool call response.""" success: bool data: Optional[Any] = None error: Optional[str] = None @app.get("/health") async def health_check(): """Health check endpoint for K8s.""" return {"status": "healthy", "server": config.server_name} @app.get("/tools") async def list_tools(): """List available MCP tools.""" tools = [ # Prometheus tools { "name": "query_prometheus", "description": "Execute PromQL instant query", "parameters": ["query", "time (optional)"] }, { "name": "query_prometheus_range", "description": "Execute PromQL range query", "parameters": ["query", "start (optional)", "end (optional)", "step (optional)"] }, { "name": "list_metrics", "description": "List available metrics", "parameters": ["prefix (optional)"] }, { "name": "list_label_values", "description": "Get label values", "parameters": ["label", "metric (optional)"] }, { "name": "list_labels", "description": "List all labels", "parameters": ["metric (optional)"] }, # Loki tools { "name": "query_loki", "description": "Execute LogQL query", "parameters": ["query", "start (optional)", "end (optional)", "limit (optional)", "direction (optional)"] }, { "name": "search_logs", "description": "Search logs with filters", "parameters": ["service (optional)", "namespace (optional)", "search_text (optional)", "level (optional)", "start (optional)", "end (optional)", "limit (optional)"] }, { "name": "list_log_labels", "description": "List log labels", "parameters": ["start (optional)", "end (optional)"] }, { "name": "list_log_label_values", "description": "Get log label values", "parameters": ["label", "start (optional)", "end (optional)"] }, ] return {"tools": tools} @app.post("/call", response_model=ToolCallResponse) async def call_tool(request: ToolCallRequest): """Execute MCP tool call.""" try: tool_name = request.tool arguments = request.arguments logger.info(f"Tool called: {tool_name}") # Route to appropriate tool result = None # Prometheus tools if tool_name == "query_prometheus": result = await prometheus_tools.query_prometheus( prometheus_client, arguments["query"], arguments.get("time") ) elif tool_name == "query_prometheus_range": result = await prometheus_tools.query_prometheus_range( prometheus_client, arguments["query"], arguments.get("start"), arguments.get("end"), arguments.get("step", "15s") ) elif tool_name == "list_metrics": result = await prometheus_tools.list_metrics( prometheus_client, arguments.get("prefix") ) elif tool_name == "list_label_values": result = await prometheus_tools.list_label_values( prometheus_client, arguments["label"], arguments.get("metric") ) elif tool_name == "list_labels": result = await prometheus_tools.list_labels( prometheus_client, arguments.get("metric") ) # Loki tools elif tool_name == "query_loki": result = await loki_tools.query_loki( loki_client, arguments["query"], arguments.get("start"), arguments.get("end"), arguments.get("limit", 100), arguments.get("direction", "backward") ) elif tool_name == "search_logs": result = await loki_tools.search_logs( loki_client, arguments.get("service"), arguments.get("namespace"), arguments.get("search_text"), arguments.get("level"), arguments.get("start"), arguments.get("end"), arguments.get("limit", 100) ) elif tool_name == "list_log_labels": result = await loki_tools.list_log_labels( loki_client, arguments.get("start"), arguments.get("end") ) elif tool_name == "list_log_label_values": result = await loki_tools.list_log_label_values( loki_client, arguments["label"], arguments.get("start"), arguments.get("end") ) else: raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") return ToolCallResponse(success=True, data=result) except Exception as e: logger.error(f"Error executing tool {request.tool}: {e}", exc_info=True) return ToolCallResponse(success=False, error=str(e)) @app.on_event("startup") async def startup_event(): """Log startup information.""" logger.info(f"Starting {config.server_name} HTTP server...") logger.info(f"Prometheus URL: {config.prometheus_url}") logger.info(f"Loki URL: {config.loki_url}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agarwalvivek29/opentelemetry-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server