Skip to main content
Glama

Podman MCP Server

by kunwarmahen
main_b.py•28.9 kB
#!/usr/bin/env python3 """ Podman MCP Server - Manage containers through MCP protocol Compatible with MCP Discovery Hub """ import os import subprocess import logging import json import socket import struct import asyncio import uuid from typing import List, Dict, Any, Optional, AsyncGenerator from dataclasses import dataclass from fastapi import FastAPI, Request, HTTPException, Header from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv # --- Load env --- load_dotenv() # Configure logging logging.basicConfig( filename="podman_mcp.log", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger("podman_mcp") # --- Configuration --- SERVER_PORT = int(os.getenv("MCP_PORT", "3000")) SERVER_NAME = os.getenv("MCP_SERVER_NAME", "Podman MCP Server") ENABLE_BROADCAST = os.getenv("MCP_ENABLE_BROADCAST", "true").lower() == "true" BROADCAST_INTERVAL = int(os.getenv("MCP_BROADCAST_INTERVAL", "30")) DEBUG = os.getenv("DEBUG_LOGGING", "false").lower() == "true" # SSDP/Multicast settings SSDP_ADDR = "239.255.255.250" MCP_DISCOVERY_PORT = 5353 def debug_print(*args, **kwargs): if DEBUG: print("[DEBUG]", *args, **kwargs) logger.debug(" ".join(map(str, args))) # --- Helper to run podman commands --- def run_podman(args: List[str]) -> Dict[str, Any]: """Run a podman command and capture output""" try: cmd = ["podman"] + args logger.info(f"Running command: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) return { "success": result.returncode == 0, "stdout": result.stdout.strip(), "stderr": result.stderr.strip(), "returncode": result.returncode, } except subprocess.TimeoutExpired: logger.error("Command timed out") return {"success": False, "stdout": "", "stderr": "Command timed out", "returncode": -1} except Exception as e: logger.error(f"Command error: {e}") return {"success": False, "stdout": "", "stderr": str(e), "returncode": -1} @dataclass class Tool: name: str description: str inputSchema: Dict[str, Any] class MCPBroadcaster: """Handles SSDP-style broadcasting for MCP server discovery""" def __init__(self, server_name: str, port: int): self.server_name = server_name self.port = port self.uuid = str(uuid.uuid4()) self.running = False self.sock = None def get_local_ip(self): """Get the local IP address""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] s.close() debug_print(f"Local IP address determined: {ip}") return ip except: return '127.0.0.1' def create_announcement(self): """Create MCP discovery announcement message""" local_ip = self.get_local_ip() announcement = { "type": "mcp-announcement", "protocol": "MCP-DISCOVERY-v1", "uuid": self.uuid, "name": self.server_name, "host": local_ip, "port": self.port, "endpoint": "/mcp", "protocol_type": "MCP-HTTP", "version": "1.0.0", "timestamp": asyncio.get_event_loop().time() } return json.dumps(announcement).encode('utf-8') async def start_broadcasting(self): """Start broadcasting server presence""" if not ENABLE_BROADCAST: logger.info("Broadcasting disabled via MCP_ENABLE_BROADCAST=false") return self.running = True # Create UDP socket for multicast self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) logger.info(f"Starting MCP broadcaster on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}") logger.info(f"Broadcasting every {BROADCAST_INTERVAL} seconds") print(f"šŸ”Š Broadcasting enabled - {self.server_name} at {self.get_local_ip()}:{self.port}") while self.running: try: message = self.create_announcement() debug_print(f"Broadcasting MCP announcement: {message.decode('utf-8')}") self.sock.sendto(message, (SSDP_ADDR, MCP_DISCOVERY_PORT)) await asyncio.sleep(BROADCAST_INTERVAL) except Exception as e: logger.error(f"Broadcast error: {e}") await asyncio.sleep(5) def stop_broadcasting(self): """Stop broadcasting""" self.running = False if self.sock: self.sock.close() logger.info("Broadcasting stopped") class PodmanMCPServer: """MCP Server for Podman container management""" def __init__(self, name: str, version: str): self.name = name self.version = version self.tools: List[Tool] = [] self.sessions: Dict[str, Dict[str, Any]] = {} self.message_queues: Dict[str, asyncio.Queue] = {} self._register_tools() def _register_tools(self): """Register all Podman tools""" self.tools = [ Tool( name="list_containers", description="List containers (running by default, all if requested)", inputSchema={ "type": "object", "properties": { "all": { "type": "boolean", "description": "Show all containers, not just running", "default": False } } } ), Tool( name="container_info", description="Inspect a container by name or ID", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" } }, "required": ["container"] } ), Tool( name="start_container", description="Start a stopped container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" } }, "required": ["container"] } ), Tool( name="stop_container", description="Stop a running container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" }, "timeout": { "type": "integer", "description": "Seconds to wait before killing container", "default": 10 } }, "required": ["container"] } ), Tool( name="restart_container", description="Restart a container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" } }, "required": ["container"] } ), Tool( name="container_logs", description="Get logs from a container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" }, "tail": { "type": "integer", "description": "Number of lines to show from end of log", "default": 100 } }, "required": ["container"] } ), Tool( name="run_container", description="Run a new container", inputSchema={ "type": "object", "properties": { "image": { "type": "string", "description": "Container image" }, "name": { "type": "string", "description": "Optional container name" }, "detach": { "type": "boolean", "description": "Run in background", "default": True }, "ports": { "type": "array", "items": {"type": "string"}, "description": "Port mappings (e.g., ['8080:80'])", "default": [] }, "env": { "type": "array", "items": {"type": "string"}, "description": "Environment variables (KEY=VAL)", "default": [] }, "volumes": { "type": "array", "items": {"type": "string"}, "description": "Volumes (e.g., ['/host:/container'])", "default": [] } }, "required": ["image"] } ), Tool( name="remove_container", description="Remove a container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" }, "force": { "type": "boolean", "description": "Force remove running container", "default": False } }, "required": ["container"] } ), Tool( name="exec_container", description="Execute a command inside a container", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID" }, "command": { "type": "array", "items": {"type": "string"}, "description": "Command to execute" } }, "required": ["container", "command"] } ), Tool( name="list_images", description="List container images", inputSchema={ "type": "object", "properties": { "all": { "type": "boolean", "description": "Show all images including intermediate", "default": False } } } ), Tool( name="pull_image", description="Pull a container image from a registry", inputSchema={ "type": "object", "properties": { "image": { "type": "string", "description": "Image name with optional tag" } }, "required": ["image"] } ), Tool( name="container_stats", description="Get resource usage statistics for containers", inputSchema={ "type": "object", "properties": { "container": { "type": "string", "description": "Container name or ID (all containers if not specified)" }, "no_stream": { "type": "boolean", "description": "Disable streaming stats and only pull the first result", "default": True } } } ) ] async def handle_message(self, message: Dict[str, Any], session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: jsonrpc = message.get("jsonrpc") if jsonrpc != "2.0": return self.error_response(None, -32600, "Invalid Request") method = message.get("method") params = message.get("params", {}) msg_id = message.get("id") logger.info(f"Handling method: {method}") if method == "initialize": return await self.handle_initialize(msg_id, params, session_id) elif method == "tools/list": return await self.handle_tools_list(msg_id) elif method == "tools/call": return await self.handle_tools_call(msg_id, params) elif method == "ping": return self.success_response(msg_id, {}) elif method == "notifications/initialized": return None else: return self.error_response(msg_id, -32601, f"Method not found: {method}") async def handle_initialize(self, msg_id: Any, params: Dict[str, Any], session_id: Optional[str]) -> tuple: if not session_id: session_id = str(uuid.uuid4()) self.sessions[session_id] = { "initialized": True, "client_info": params.get("clientInfo", {}) } self.message_queues[session_id] = asyncio.Queue() logger.info(f"Initialized session: {session_id}") return self.success_response(msg_id, { "protocolVersion": "2024-11-05", "serverInfo": { "name": self.name, "version": self.version }, "capabilities": { "tools": {} } }), session_id async def handle_tools_list(self, msg_id: Any) -> Dict[str, Any]: tools_list = [ { "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema } for tool in self.tools ] return self.success_response(msg_id, { "tools": tools_list }) async def handle_tools_call(self, msg_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Calling tool: {tool_name} with args: {arguments}") # Map tool names to handler functions tool_handlers = { "list_containers": self.list_containers, "container_info": self.container_info, "start_container": self.start_container, "stop_container": self.stop_container, "restart_container": self.restart_container, "container_logs": self.container_logs, "run_container": self.run_container, "remove_container": self.remove_container, "exec_container": self.exec_container, "list_images": self.list_images, "pull_image": self.pull_image, "container_stats": self.container_stats, } tool_func = tool_handlers.get(tool_name) if not tool_func: return self.error_response(msg_id, -32602, f"Unknown tool: {tool_name}") try: result = await tool_func(arguments) return self.success_response(msg_id, { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] }) except Exception as e: logger.error(f"Tool execution error: {e}") return self.error_response(msg_id, -32603, f"Tool execution error: {str(e)}") # Tool implementations async def list_containers(self, args: Dict[str, Any]) -> Dict[str, Any]: all_containers = args.get("all", False) cmd_args = ["ps", "--format", "json"] if all_containers: cmd_args.append("--all") result = run_podman(cmd_args) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} async def container_info(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") result = run_podman(["inspect", container]) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} async def start_container(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") result = run_podman(["start", container]) return {"output": f"Started container: {container}" if result["success"] else f"Error: {result['stderr']}"} async def stop_container(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") timeout = args.get("timeout", 10) result = run_podman(["stop", "-t", str(timeout), container]) return {"output": f"Stopped container: {container}" if result["success"] else f"Error: {result['stderr']}"} async def restart_container(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") result = run_podman(["restart", container]) return {"output": f"Restarted container: {container}" if result["success"] else f"Error: {result['stderr']}"} async def container_logs(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") tail = args.get("tail", 100) result = run_podman(["logs", "--tail", str(tail), container]) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} async def run_container(self, args: Dict[str, Any]) -> Dict[str, Any]: image = args.get("image") name = args.get("name") detach = args.get("detach", True) ports = args.get("ports", []) env = args.get("env", []) volumes = args.get("volumes", []) cmd_args = ["run"] if detach: cmd_args.append("-d") if name: cmd_args.extend(["--name", name]) for p in ports: cmd_args.extend(["-p", p]) for e in env: cmd_args.extend(["-e", e]) for v in volumes: cmd_args.extend(["-v", v]) cmd_args.append(image) result = run_podman(cmd_args) return {"output": f"Started container: {result['stdout']}" if result["success"] else f"Error: {result['stderr']}"} async def remove_container(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") force = args.get("force", False) cmd_args = ["rm"] if force: cmd_args.append("-f") cmd_args.append(container) result = run_podman(cmd_args) return {"output": f"Removed container: {container}" if result["success"] else f"Error: {result['stderr']}"} async def exec_container(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") command = args.get("command", []) cmd_args = ["exec", container] + command result = run_podman(cmd_args) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} async def list_images(self, args: Dict[str, Any]) -> Dict[str, Any]: all_images = args.get("all", False) cmd_args = ["images", "--format", "json"] if all_images: cmd_args.append("--all") result = run_podman(cmd_args) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} async def pull_image(self, args: Dict[str, Any]) -> Dict[str, Any]: image = args.get("image") result = run_podman(["pull", image]) return {"output": f"Pulled image: {image}" if result["success"] else f"Error: {result['stderr']}"} async def container_stats(self, args: Dict[str, Any]) -> Dict[str, Any]: container = args.get("container") no_stream = args.get("no_stream", True) cmd_args = ["stats", "--format", "json"] if no_stream: cmd_args.append("--no-stream") if container: cmd_args.append(container) result = run_podman(cmd_args) return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"} def success_response(self, msg_id: Any, result: Dict[str, Any]) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": msg_id, "result": result } def error_response(self, msg_id: Any, code: int, message: str) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": code, "message": message } } def cleanup_session(self, session_id: str): if session_id in self.sessions: del self.sessions[session_id] if session_id in self.message_queues: del self.message_queues[session_id] # Create instances app = FastAPI(title=SERVER_NAME) mcp_server = PodmanMCPServer(SERVER_NAME, "1.0.0") broadcaster = MCPBroadcaster(SERVER_NAME, SERVER_PORT) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def startup_event(): """Start broadcaster on server startup""" if ENABLE_BROADCAST: asyncio.create_task(broadcaster.start_broadcasting()) @app.on_event("shutdown") async def shutdown_event(): """Stop broadcaster on shutdown""" broadcaster.stop_broadcasting() @app.get("/") async def root(): return { "name": mcp_server.name, "version": mcp_server.version, "protocol": "MCP Streamable HTTP", "endpoint": "/mcp", "broadcasting": ENABLE_BROADCAST, "broadcast_interval": BROADCAST_INTERVAL if ENABLE_BROADCAST else None } @app.post("/mcp") @app.get("/mcp") async def mcp_endpoint( request: Request, mcp_session_id: Optional[str] = Header(None, alias="Mcp-Session-Id") ): accept_header = request.headers.get("accept", "") wants_sse = "text/event-stream" in accept_header wants_json = "application/json" in accept_header if request.method == "GET": if not wants_sse: raise HTTPException(status_code=405, detail="GET requires text/event-stream") if not mcp_session_id or mcp_session_id not in mcp_server.sessions: raise HTTPException(status_code=400, detail="Invalid session") async def sse_generator() -> AsyncGenerator[str, None]: queue = mcp_server.message_queues.get(mcp_session_id) if not queue: return try: while True: if await request.is_disconnected(): break try: message = await asyncio.wait_for(queue.get(), timeout=30.0) yield f"data: {json.dumps(message)}\n\n" except asyncio.TimeoutError: yield ": keepalive\n\n" finally: mcp_server.cleanup_session(mcp_session_id) return StreamingResponse( sse_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) try: body = await request.json() except json.JSONDecodeError: error_response = mcp_server.error_response(None, -32700, "Parse error") return JSONResponse(content=error_response, status_code=400) is_initialize = body.get("method") == "initialize" if not is_initialize: if not mcp_session_id or mcp_session_id not in mcp_server.sessions: raise HTTPException(status_code=400, detail="Invalid session") result = await mcp_server.handle_message(body, mcp_session_id) if is_initialize and isinstance(result, tuple): response, new_session_id = result if wants_sse and not wants_json: async def init_sse_generator() -> AsyncGenerator[str, None]: yield f"data: {json.dumps(response)}\n\n" queue = mcp_server.message_queues.get(new_session_id) if queue: try: while True: if await request.is_disconnected(): break try: message = await asyncio.wait_for(queue.get(), timeout=30.0) yield f"data: {json.dumps(message)}\n\n" except asyncio.TimeoutError: yield ": keepalive\n\n" finally: mcp_server.cleanup_session(new_session_id) return StreamingResponse( init_sse_generator(), media_type="text/event-stream", headers={ "Mcp-Session-Id": new_session_id, "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) else: return JSONResponse( content=response, headers={"Mcp-Session-Id": new_session_id} ) if result is None: return JSONResponse(content={"status": "ok"}) if mcp_session_id and mcp_session_id in mcp_server.message_queues: await mcp_server.message_queues[mcp_session_id].put(result) return JSONResponse(content=result) @app.delete("/mcp") async def mcp_delete( mcp_session_id: Optional[str] = Header(None, alias="Mcp-Session-Id") ): if mcp_session_id: mcp_server.cleanup_session(mcp_session_id) return JSONResponse(content={"status": "ok"}) if __name__ == "__main__": import uvicorn print("=" * 60) print(f"šŸš€ {SERVER_NAME}") print("=" * 60) print(f"Port: {SERVER_PORT}") print(f"Endpoint: http://localhost:{SERVER_PORT}/mcp") print(f"Broadcasting: {'ENABLED' if ENABLE_BROADCAST else 'DISABLED'}") if ENABLE_BROADCAST: print(f"Broadcast Interval: {BROADCAST_INTERVAL}s") print(f"Multicast: {SSDP_ADDR}:{MCP_DISCOVERY_PORT}") print("=" * 60) uvicorn.run(app, host="0.0.0.0", port=SERVER_PORT)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarmahen/podman-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server