Skip to main content
Glama
ntk148v

alertmanager-mcp-server

server.py14.2 kB
#!/usr/bin/env python import os from dataclasses import dataclass from typing import Any, Dict, Optional, List from mcp.server import Server from mcp.server.fastmcp import FastMCP from mcp.server.sse import SseServerTransport from mcp.server.streamable_http import StreamableHTTPServerTransport from requests.compat import urljoin from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount, Route import dotenv import requests import uvicorn dotenv.load_dotenv() mcp = FastMCP("Alertmanager MCP") @dataclass class AlertmanagerConfig: url: str # Optional credentials username: Optional[str] = None password: Optional[str] = None config = AlertmanagerConfig( url=os.environ.get("ALERTMANAGER_URL", ""), username=os.environ.get("ALERTMANAGER_USERNAME", ""), password=os.environ.get("ALERTMANAGER_PASSWORD", ""), ) def make_request(method="GET", route="/", **kwargs): """Make HTTP request and return a requests.Response object. Parameters ---------- method : str HTTP method to use for the request. route : str (Default value = "/") This is the url we are making our request to. **kwargs : dict Arbitrary keyword arguments. Returns ------- dict: The response from the Alertmanager API. This is a dictionary containing the response data. """ route = urljoin(config.url, route) auth = ( requests.auth.HTTPBasicAuth(config.username, config.password) if config.username and config.password else None ) response = requests.request( method=method.upper(), url=route, auth=auth, timeout=60, **kwargs ) response.raise_for_status() return response.json() @mcp.tool(description="Get current status of an Alertmanager instance and its cluster") async def get_status(): """Get current status of an Alertmanager instance and its cluster Returns ------- dict: The response from the Alertmanager API. This is a dictionary containing the response data. """ return make_request(method="GET", route="/api/v2/status") @mcp.tool(description="Get list of all receivers (name of notification integrations)") async def get_receivers(): """Get list of all receivers (name of notification integrations) Returns ------- list: Return a list of Receiver objects from Alertmanager instance. """ return make_request(method="GET", route="/api/v2/receivers") @mcp.tool(description="Get list of all silences") async def get_silences(filter: Optional[str] = None): """Get list of all silences Parameters ---------- filter Filtering query (e.g. alertname=~'.*CPU.*')"), Returns ------- list: Return a list of Silence objects from Alertmanager instance. """ params = None if filter: params = {"filter": filter} return make_request(method="GET", route="/api/v2/silences", params=params) @mcp.tool(description="Post a new silence or update an existing one") async def post_silence(silence: Dict[str, Any]): """Post a new silence or update an existing one Parameters ---------- silence : dict A dict representing the silence to be posted. This dict should contain the following keys: - matchers: list of matchers to match alerts to silence - startsAt: start time of the silence - endsAt: end time of the silence - createdBy: name of the user creating the silence - comment: comment for the silence Returns ------- dict: Create / update silence response from Alertmanager API. """ return make_request(method="POST", route="/api/v2/silences", json=silence) @mcp.tool(description="Get a silence by its ID") async def get_silence(silence_id: str): """Get a silence by its ID Parameters ---------- silence_id : str The ID of the silence to be retrieved. Returns ------- dict: The Silence object from Alertmanager instance. """ return make_request(method="GET", route=urljoin("/api/v2/silences/", silence_id)) @mcp.tool(description="Delete a silence by its ID") async def delete_silence(silence_id: str): """Delete a silence by its ID Parameters ---------- silence_id : str The ID of the silence to be deleted. Returns ------- dict: The response from the Alertmanager API. """ return make_request( method="DELETE", route=urljoin("/api/v2/silences/", silence_id) ) @mcp.tool(description="Get a list of alerts") async def get_alerts(filter: Optional[str] = None, silenced: Optional[bool] = None, inhibited: Optional[bool] = None, active: Optional[bool] = None): """Get a list of alerts currently in Alertmanager. Params ------ filter Filtering query (e.g. alertname=~'.*CPU.*')"), silenced If true, include silenced alerts. inhibited If true, include inhibited alerts. active If true, include active alerts. Returns ------- list Return a list of Alert objects from Alertmanager instance. """ params = {"active": True} if filter: params = {"filter": filter} if silenced is not None: params["silenced"] = silenced if inhibited is not None: params["inhibited"] = inhibited if active is not None: params["active"] = active return make_request(method="GET", route="/api/v2/alerts", params=params) @mcp.tool(description="Create new alerts") async def post_alerts(alerts: List[Dict]): """Create new alerts Parameters ---------- alerts A list of Alert object. [ { "startsAt": datetime, "endsAt": datetime, "annotations": labelSet } ] Returns ------- dict: Create alert response from Alertmanager API. """ return make_request(method="POST", route="/api/v2/alerts", json=alerts) @mcp.tool(description="Get a list of alert groups") async def get_alert_groups(silenced: Optional[bool] = None, inhibited: Optional[bool] = None, active: Optional[bool] = None): """Get a list of alert groups Params ------ silenced If true, include silenced alerts. inhibited If true, include inhibited alerts. active If true, include active alerts. Returns ------- list Return a list of AlertGroup objects from Alertmanager instance. """ params = {"active": True} if silenced is not None: params["silenced"] = silenced if inhibited is not None: params["inhibited"] = inhibited if active is not None: params["active"] = active return make_request(method="GET", route="/api/v2/alerts/groups", params=params) def setup_environment(): if dotenv.load_dotenv(): print("Loaded environment variables from .env file") else: print("No .env file found or could not load it - using environment variables") if not config.url: print("ERROR: ALERTMANAGER_URL environment variable is not set") print("Please set it to your Alertmanager server URL") print("Example: http://your-alertmanager:9093") return False print("Alertmanager configuration:") print(f" Server URL: {config.url}") if config.username and config.password: print(" Authentication: Using basic auth") else: print(" Authentication: None (no credentials provided)") return True def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: """Create a Starlette application that can serve the provided MCP server with SSE. Sets up a Starlette web application with routes for SSE (Server-Sent Events) communication with the MCP server. Args: mcp_server: The MCP server instance to connect debug: Whether to enable debug mode for the Starlette app Returns: A configured Starlette application """ # Create an SSE transport with a base path for messages sse = SseServerTransport("/messages/") async def handle_sse(request: Request) -> None: """Handler for SSE connections. Establishes an SSE connection and connects it to the MCP server. Args: request: The incoming HTTP request """ # Connect the SSE transport to the request async with sse.connect_sse( request.scope, request.receive, request._send, # noqa: SLF001 ) as (read_stream, write_stream): # Run the MCP server with the SSE streams await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options(), ) # Create and return the Starlette application with routes return Starlette( debug=debug, routes=[ Route("/sse", endpoint=handle_sse), # Endpoint for SSE connections # Endpoint for posting messages Mount("/messages/", app=sse.handle_post_message), ], ) def create_streamable_app(mcp_server: Server, *, debug: bool = False) -> Starlette: """Create a Starlette application that serves the Streamable HTTP transport. This starts the MCP server inside an application startup task using the transport.connect() context manager so the transport's in-memory streams are connected to the MCP server. The transport's ASGI handler is mounted at the '/mcp' path for GET/POST/DELETE requests. """ transport = StreamableHTTPServerTransport(None) routes = [ Mount("/mcp", app=transport.handle_request), ] app = Starlette(debug=debug, routes=routes) async def _startup() -> None: # Run the MCP server in a background asyncio task so the lifespan # event doesn't block. Store the task on app.state so shutdown can # cancel it. import asyncio async def _run_mcp() -> None: # Create the transport-backed streams and run the MCP server async with transport.connect() as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options() ) app.state._mcp_task = asyncio.create_task(_run_mcp()) async def _shutdown() -> None: task = getattr(app.state, "_mcp_task", None) if task: task.cancel() try: await task except Exception: # Task cancelled or errored during shutdown is fine pass # Attempt to terminate the transport cleanly try: await transport.terminate() except Exception: pass app.add_event_handler("startup", _startup) app.add_event_handler("shutdown", _shutdown) return app def run_server(): """Main entry point for the Prometheus Alertmanager MCP Server""" setup_environment() # Get the underlying MCP server from the FastMCP instance mcp_server = mcp._mcp_server # noqa: WPS437 import argparse # Set up command-line argument parsing parser = argparse.ArgumentParser( description='Run MCP server with configurable transport') # Allow configuring defaults from environment variables. CLI arguments # (when provided) will override these environment values. env_transport = os.environ.get("MCP_TRANSPORT") env_host = os.environ.get("MCP_HOST") env_port = os.environ.get("MCP_PORT") transport_default = env_transport if env_transport is not None else 'stdio' host_default = env_host if env_host is not None else '0.0.0.0' try: port_default = int(env_port) if env_port is not None else 8000 except (TypeError, ValueError): print(f"Invalid MCP_PORT value '{env_port}', falling back to 8000") port_default = 8000 # Allow choosing between stdio and SSE transport modes parser.add_argument('--transport', choices=['stdio', 'http', 'sse'], default=transport_default, help='Transport mode (stdio, http or sse) — can also be set via $MCP_TRANSPORT') # Host configuration for SSE mode parser.add_argument('--host', default=host_default, help='Host to bind to (for SSE mode) — can also be set via $MCP_HOST') # Port configuration for SSE mode parser.add_argument('--port', type=int, default=port_default, help='Port to listen on (for SSE mode) — can also be set via $MCP_PORT') args = parser.parse_args() print("\nStarting Prometheus Alertmanager MCP Server...") # Launch the server with the selected transport mode if args.transport == 'sse': print("Running server with SSE transport (web-based)") # Run with SSE transport (web-based) # Create a Starlette app to serve the MCP server starlette_app = create_starlette_app(mcp_server, debug=True) # Start the web server with the configured host and port uvicorn.run(starlette_app, host=args.host, port=args.port) elif args.transport == 'http': print("Running server with http transport (streamable HTTP)") # Run with streamable-http transport served by uvicorn so host/port # CLI/env variables control the listening socket (same pattern as SSE). starlette_app = create_streamable_app(mcp_server, debug=True) uvicorn.run(starlette_app, host=args.host, port=args.port) else: print("Running server with stdio transport (default)") # Run with stdio transport (default) # This mode communicates through standard input/output mcp.run(transport='stdio') if __name__ == "__main__": run_server()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ntk148v/alertmanager-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server