"""OpenTelemetry MCP Server - Main server implementation."""
import asyncio
import logging
from typing import Any, Optional
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
from .config import config
from .backends.prometheus import PrometheusClient
from .backends.loki import LokiClient
from .tools import prometheus_tools, loki_tools
# Setup logging
logging.basicConfig(
level=getattr(logging, config.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize clients
prometheus_client = PrometheusClient(config.prometheus)
loki_client = LokiClient(config.loki)
# Create MCP server
app = Server(config.server_name)
# Define tools
TOOLS = [
Tool(
name="query_prometheus",
description="Execute raw PromQL instant query against Prometheus. Returns metric values at a specific point in time.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "PromQL query string (e.g., 'up{job=\"api-server\"}' or 'rate(http_requests_total[5m])')"
},
"time": {
"type": "string",
"description": "Optional evaluation timestamp. Can be: 'now', relative like '5m', RFC3339, or Unix timestamp"
}
},
"required": ["query"]
}
),
Tool(
name="query_prometheus_range",
description="Execute PromQL query over a time range. Returns time series data. Use this for getting metrics over time.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "PromQL query string"
},
"start": {
"type": "string",
"description": "Start time. Can be: relative like '1h' (last hour), '30m', '7d', or absolute RFC3339/Unix timestamp. Default: 1h ago"
},
"end": {
"type": "string",
"description": "End time. Can be: 'now', relative, or absolute. Default: now"
},
"step": {
"type": "string",
"description": "Query resolution step (e.g., '15s', '1m', '5m'). Default: 15s"
}
},
"required": ["query"]
}
),
Tool(
name="list_metrics",
description="List all available metrics in Prometheus. Useful for discovering what metrics are being collected.",
inputSchema={
"type": "object",
"properties": {
"prefix": {
"type": "string",
"description": "Optional prefix to filter metrics (e.g., 'http_', 'cpu_')"
}
}
}
),
Tool(
name="list_label_values",
description="Get all values for a specific label. Common labels: 'service', 'job', 'namespace', 'instance'. Use this to discover what services/namespaces are monitored.",
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Label name to get values for (e.g., 'service', 'job', 'namespace')"
},
"metric": {
"type": "string",
"description": "Optional metric name to filter label values"
}
},
"required": ["label"]
}
),
Tool(
name="list_labels",
description="Get all label names available in Prometheus. Use this to discover what labels you can filter by.",
inputSchema={
"type": "object",
"properties": {
"metric": {
"type": "string",
"description": "Optional metric name to get labels for"
}
}
}
),
Tool(
name="query_loki",
description="Execute raw LogQL query against Loki. Use this to search and filter logs.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "LogQL query string (e.g., '{namespace=\"prod-services\"}', '{job=\"api-server\"} |= \"error\"')"
},
"start": {
"type": "string",
"description": "Start time. Can be: relative like '1h', '30m', or absolute. Default: 1h ago"
},
"end": {
"type": "string",
"description": "End time. Can be: 'now', relative, or absolute. Default: now"
},
"limit": {
"type": "integer",
"description": "Maximum number of log entries to return. Default: 100"
},
"direction": {
"type": "string",
"description": "Query direction: 'forward' (oldest first) or 'backward' (newest first). Default: backward"
}
},
"required": ["query"]
}
),
Tool(
name="search_logs",
description="Search logs with simple filters. Easier than writing raw LogQL queries.",
inputSchema={
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "Service/job name to filter logs"
},
"namespace": {
"type": "string",
"description": "Namespace to filter logs"
},
"search_text": {
"type": "string",
"description": "Text to search for in log messages"
},
"level": {
"type": "string",
"description": "Log level filter: 'error', 'warn', 'info', 'debug'"
},
"start": {
"type": "string",
"description": "Start time (relative like '1h' or absolute). Default: 1h ago"
},
"end": {
"type": "string",
"description": "End time. Default: now"
},
"limit": {
"type": "integer",
"description": "Maximum number of log entries. Default: 100"
}
}
}
),
Tool(
name="list_log_labels",
description="List all log stream labels in Loki. Use this to discover what labels are available for filtering.",
inputSchema={
"type": "object",
"properties": {
"start": {
"type": "string",
"description": "Start time for label discovery"
},
"end": {
"type": "string",
"description": "End time for label discovery"
}
}
}
),
Tool(
name="list_log_label_values",
description="Get all values for a specific log label. Common labels: 'namespace', 'job', 'app'. Use this to discover what namespaces/services have logs.",
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Label name to get values for (e.g., 'namespace', 'job', 'app')"
},
"start": {
"type": "string",
"description": "Start time for value discovery"
},
"end": {
"type": "string",
"description": "End time for value discovery"
}
},
"required": ["label"]
}
),
]
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return TOOLS
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
try:
logger.info(f"Tool called: {name} with arguments: {arguments}")
# Prometheus tools
if name == "query_prometheus":
result = await prometheus_tools.query_prometheus(
prometheus_client,
arguments["query"],
arguments.get("time")
)
elif name == "query_prometheus_range":
result = await prometheus_tools.query_prometheus_range(
prometheus_client,
arguments["query"],
arguments.get("start"),
arguments.get("end"),
arguments.get("step", "15s")
)
elif name == "list_metrics":
result = await prometheus_tools.list_metrics(
prometheus_client,
arguments.get("prefix")
)
elif name == "list_label_values":
result = await prometheus_tools.list_label_values(
prometheus_client,
arguments["label"],
arguments.get("metric")
)
elif name == "list_labels":
result = await prometheus_tools.list_labels(
prometheus_client,
arguments.get("metric")
)
# Loki tools
elif name == "query_loki":
result = await loki_tools.query_loki(
loki_client,
arguments["query"],
arguments.get("start"),
arguments.get("end"),
arguments.get("limit", 100),
arguments.get("direction", "backward")
)
elif name == "search_logs":
result = await loki_tools.search_logs(
loki_client,
arguments.get("service"),
arguments.get("namespace"),
arguments.get("search_text"),
arguments.get("level"),
arguments.get("start"),
arguments.get("end"),
arguments.get("limit", 100)
)
elif name == "list_log_labels":
result = await loki_tools.list_log_labels(
loki_client,
arguments.get("start"),
arguments.get("end")
)
elif name == "list_log_label_values":
result = await loki_tools.list_log_label_values(
loki_client,
arguments["label"],
arguments.get("start"),
arguments.get("end")
)
else:
result = {
"success": False,
"error": f"Unknown tool: {name}"
}
# Format result as text content
import json
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Error executing tool {name}: {e}", exc_info=True)
import json
return [TextContent(
type="text",
text=json.dumps({
"success": False,
"error": str(e)
}, indent=2)
)]
async def main():
"""Run the MCP server."""
logger.info(f"Starting {config.server_name} server...")
logger.info(f"Prometheus URL: {config.prometheus_url}")
logger.info(f"Loki URL: {config.loki_url} (tenant: {config.loki_tenant_id})")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())