main_b.pyā¢28.9 kB
#!/usr/bin/env python3
"""
Podman MCP Server - Manage containers through MCP protocol
Compatible with MCP Discovery Hub
"""
import os
import subprocess
import logging
import json
import socket
import struct
import asyncio
import uuid
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
# --- Load env ---
load_dotenv()
# Configure logging
logging.basicConfig(
filename="podman_mcp.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger("podman_mcp")
# --- Configuration ---
SERVER_PORT = int(os.getenv("MCP_PORT", "3000"))
SERVER_NAME = os.getenv("MCP_SERVER_NAME", "Podman MCP Server")
ENABLE_BROADCAST = os.getenv("MCP_ENABLE_BROADCAST", "true").lower() == "true"
BROADCAST_INTERVAL = int(os.getenv("MCP_BROADCAST_INTERVAL", "30"))
DEBUG = os.getenv("DEBUG_LOGGING", "false").lower() == "true"
# SSDP/Multicast settings
SSDP_ADDR = "239.255.255.250"
MCP_DISCOVERY_PORT = 5353
def debug_print(*args, **kwargs):
if DEBUG:
print("[DEBUG]", *args, **kwargs)
logger.debug(" ".join(map(str, args)))
# --- Helper to run podman commands ---
def run_podman(args: List[str]) -> Dict[str, Any]:
"""Run a podman command and capture output"""
try:
cmd = ["podman"] + args
logger.info(f"Running command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
logger.error("Command timed out")
return {"success": False, "stdout": "", "stderr": "Command timed out", "returncode": -1}
except Exception as e:
logger.error(f"Command error: {e}")
return {"success": False, "stdout": "", "stderr": str(e), "returncode": -1}
@dataclass
class Tool:
name: str
description: str
inputSchema: Dict[str, Any]
class MCPBroadcaster:
"""Handles SSDP-style broadcasting for MCP server discovery"""
def __init__(self, server_name: str, port: int):
self.server_name = server_name
self.port = port
self.uuid = str(uuid.uuid4())
self.running = False
self.sock = None
def get_local_ip(self):
"""Get the local IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
debug_print(f"Local IP address determined: {ip}")
return ip
except:
return '127.0.0.1'
def create_announcement(self):
"""Create MCP discovery announcement message"""
local_ip = self.get_local_ip()
announcement = {
"type": "mcp-announcement",
"protocol": "MCP-DISCOVERY-v1",
"uuid": self.uuid,
"name": self.server_name,
"host": local_ip,
"port": self.port,
"endpoint": "/mcp",
"protocol_type": "MCP-HTTP",
"version": "1.0.0",
"timestamp": asyncio.get_event_loop().time()
}
return json.dumps(announcement).encode('utf-8')
async def start_broadcasting(self):
"""Start broadcasting server presence"""
if not ENABLE_BROADCAST:
logger.info("Broadcasting disabled via MCP_ENABLE_BROADCAST=false")
return
self.running = True
# Create UDP socket for multicast
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
logger.info(f"Starting MCP broadcaster on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
logger.info(f"Broadcasting every {BROADCAST_INTERVAL} seconds")
print(f"š Broadcasting enabled - {self.server_name} at {self.get_local_ip()}:{self.port}")
while self.running:
try:
message = self.create_announcement()
debug_print(f"Broadcasting MCP announcement: {message.decode('utf-8')}")
self.sock.sendto(message, (SSDP_ADDR, MCP_DISCOVERY_PORT))
await asyncio.sleep(BROADCAST_INTERVAL)
except Exception as e:
logger.error(f"Broadcast error: {e}")
await asyncio.sleep(5)
def stop_broadcasting(self):
"""Stop broadcasting"""
self.running = False
if self.sock:
self.sock.close()
logger.info("Broadcasting stopped")
class PodmanMCPServer:
"""MCP Server for Podman container management"""
def __init__(self, name: str, version: str):
self.name = name
self.version = version
self.tools: List[Tool] = []
self.sessions: Dict[str, Dict[str, Any]] = {}
self.message_queues: Dict[str, asyncio.Queue] = {}
self._register_tools()
def _register_tools(self):
"""Register all Podman tools"""
self.tools = [
Tool(
name="list_containers",
description="List containers (running by default, all if requested)",
inputSchema={
"type": "object",
"properties": {
"all": {
"type": "boolean",
"description": "Show all containers, not just running",
"default": False
}
}
}
),
Tool(
name="container_info",
description="Inspect a container by name or ID",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
}
},
"required": ["container"]
}
),
Tool(
name="start_container",
description="Start a stopped container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
}
},
"required": ["container"]
}
),
Tool(
name="stop_container",
description="Stop a running container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
},
"timeout": {
"type": "integer",
"description": "Seconds to wait before killing container",
"default": 10
}
},
"required": ["container"]
}
),
Tool(
name="restart_container",
description="Restart a container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
}
},
"required": ["container"]
}
),
Tool(
name="container_logs",
description="Get logs from a container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
},
"tail": {
"type": "integer",
"description": "Number of lines to show from end of log",
"default": 100
}
},
"required": ["container"]
}
),
Tool(
name="run_container",
description="Run a new container",
inputSchema={
"type": "object",
"properties": {
"image": {
"type": "string",
"description": "Container image"
},
"name": {
"type": "string",
"description": "Optional container name"
},
"detach": {
"type": "boolean",
"description": "Run in background",
"default": True
},
"ports": {
"type": "array",
"items": {"type": "string"},
"description": "Port mappings (e.g., ['8080:80'])",
"default": []
},
"env": {
"type": "array",
"items": {"type": "string"},
"description": "Environment variables (KEY=VAL)",
"default": []
},
"volumes": {
"type": "array",
"items": {"type": "string"},
"description": "Volumes (e.g., ['/host:/container'])",
"default": []
}
},
"required": ["image"]
}
),
Tool(
name="remove_container",
description="Remove a container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
},
"force": {
"type": "boolean",
"description": "Force remove running container",
"default": False
}
},
"required": ["container"]
}
),
Tool(
name="exec_container",
description="Execute a command inside a container",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID"
},
"command": {
"type": "array",
"items": {"type": "string"},
"description": "Command to execute"
}
},
"required": ["container", "command"]
}
),
Tool(
name="list_images",
description="List container images",
inputSchema={
"type": "object",
"properties": {
"all": {
"type": "boolean",
"description": "Show all images including intermediate",
"default": False
}
}
}
),
Tool(
name="pull_image",
description="Pull a container image from a registry",
inputSchema={
"type": "object",
"properties": {
"image": {
"type": "string",
"description": "Image name with optional tag"
}
},
"required": ["image"]
}
),
Tool(
name="container_stats",
description="Get resource usage statistics for containers",
inputSchema={
"type": "object",
"properties": {
"container": {
"type": "string",
"description": "Container name or ID (all containers if not specified)"
},
"no_stream": {
"type": "boolean",
"description": "Disable streaming stats and only pull the first result",
"default": True
}
}
}
)
]
async def handle_message(self, message: Dict[str, Any], session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
jsonrpc = message.get("jsonrpc")
if jsonrpc != "2.0":
return self.error_response(None, -32600, "Invalid Request")
method = message.get("method")
params = message.get("params", {})
msg_id = message.get("id")
logger.info(f"Handling method: {method}")
if method == "initialize":
return await self.handle_initialize(msg_id, params, session_id)
elif method == "tools/list":
return await self.handle_tools_list(msg_id)
elif method == "tools/call":
return await self.handle_tools_call(msg_id, params)
elif method == "ping":
return self.success_response(msg_id, {})
elif method == "notifications/initialized":
return None
else:
return self.error_response(msg_id, -32601, f"Method not found: {method}")
async def handle_initialize(self, msg_id: Any, params: Dict[str, Any], session_id: Optional[str]) -> tuple:
if not session_id:
session_id = str(uuid.uuid4())
self.sessions[session_id] = {
"initialized": True,
"client_info": params.get("clientInfo", {})
}
self.message_queues[session_id] = asyncio.Queue()
logger.info(f"Initialized session: {session_id}")
return self.success_response(msg_id, {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": self.name,
"version": self.version
},
"capabilities": {
"tools": {}
}
}), session_id
async def handle_tools_list(self, msg_id: Any) -> Dict[str, Any]:
tools_list = [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema
}
for tool in self.tools
]
return self.success_response(msg_id, {
"tools": tools_list
})
async def handle_tools_call(self, msg_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
tool_name = params.get("name")
arguments = params.get("arguments", {})
logger.info(f"Calling tool: {tool_name} with args: {arguments}")
# Map tool names to handler functions
tool_handlers = {
"list_containers": self.list_containers,
"container_info": self.container_info,
"start_container": self.start_container,
"stop_container": self.stop_container,
"restart_container": self.restart_container,
"container_logs": self.container_logs,
"run_container": self.run_container,
"remove_container": self.remove_container,
"exec_container": self.exec_container,
"list_images": self.list_images,
"pull_image": self.pull_image,
"container_stats": self.container_stats,
}
tool_func = tool_handlers.get(tool_name)
if not tool_func:
return self.error_response(msg_id, -32602, f"Unknown tool: {tool_name}")
try:
result = await tool_func(arguments)
return self.success_response(msg_id, {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
})
except Exception as e:
logger.error(f"Tool execution error: {e}")
return self.error_response(msg_id, -32603, f"Tool execution error: {str(e)}")
# Tool implementations
async def list_containers(self, args: Dict[str, Any]) -> Dict[str, Any]:
all_containers = args.get("all", False)
cmd_args = ["ps", "--format", "json"]
if all_containers:
cmd_args.append("--all")
result = run_podman(cmd_args)
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
async def container_info(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
result = run_podman(["inspect", container])
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
async def start_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
result = run_podman(["start", container])
return {"output": f"Started container: {container}" if result["success"] else f"Error: {result['stderr']}"}
async def stop_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
timeout = args.get("timeout", 10)
result = run_podman(["stop", "-t", str(timeout), container])
return {"output": f"Stopped container: {container}" if result["success"] else f"Error: {result['stderr']}"}
async def restart_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
result = run_podman(["restart", container])
return {"output": f"Restarted container: {container}" if result["success"] else f"Error: {result['stderr']}"}
async def container_logs(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
tail = args.get("tail", 100)
result = run_podman(["logs", "--tail", str(tail), container])
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
async def run_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
image = args.get("image")
name = args.get("name")
detach = args.get("detach", True)
ports = args.get("ports", [])
env = args.get("env", [])
volumes = args.get("volumes", [])
cmd_args = ["run"]
if detach:
cmd_args.append("-d")
if name:
cmd_args.extend(["--name", name])
for p in ports:
cmd_args.extend(["-p", p])
for e in env:
cmd_args.extend(["-e", e])
for v in volumes:
cmd_args.extend(["-v", v])
cmd_args.append(image)
result = run_podman(cmd_args)
return {"output": f"Started container: {result['stdout']}" if result["success"] else f"Error: {result['stderr']}"}
async def remove_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
force = args.get("force", False)
cmd_args = ["rm"]
if force:
cmd_args.append("-f")
cmd_args.append(container)
result = run_podman(cmd_args)
return {"output": f"Removed container: {container}" if result["success"] else f"Error: {result['stderr']}"}
async def exec_container(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
command = args.get("command", [])
cmd_args = ["exec", container] + command
result = run_podman(cmd_args)
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
async def list_images(self, args: Dict[str, Any]) -> Dict[str, Any]:
all_images = args.get("all", False)
cmd_args = ["images", "--format", "json"]
if all_images:
cmd_args.append("--all")
result = run_podman(cmd_args)
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
async def pull_image(self, args: Dict[str, Any]) -> Dict[str, Any]:
image = args.get("image")
result = run_podman(["pull", image])
return {"output": f"Pulled image: {image}" if result["success"] else f"Error: {result['stderr']}"}
async def container_stats(self, args: Dict[str, Any]) -> Dict[str, Any]:
container = args.get("container")
no_stream = args.get("no_stream", True)
cmd_args = ["stats", "--format", "json"]
if no_stream:
cmd_args.append("--no-stream")
if container:
cmd_args.append(container)
result = run_podman(cmd_args)
return {"output": result["stdout"] if result["success"] else f"Error: {result['stderr']}"}
def success_response(self, msg_id: Any, result: Dict[str, Any]) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": result
}
def error_response(self, msg_id: Any, code: int, message: str) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": code,
"message": message
}
}
def cleanup_session(self, session_id: str):
if session_id in self.sessions:
del self.sessions[session_id]
if session_id in self.message_queues:
del self.message_queues[session_id]
# Create instances
app = FastAPI(title=SERVER_NAME)
mcp_server = PodmanMCPServer(SERVER_NAME, "1.0.0")
broadcaster = MCPBroadcaster(SERVER_NAME, SERVER_PORT)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
"""Start broadcaster on server startup"""
if ENABLE_BROADCAST:
asyncio.create_task(broadcaster.start_broadcasting())
@app.on_event("shutdown")
async def shutdown_event():
"""Stop broadcaster on shutdown"""
broadcaster.stop_broadcasting()
@app.get("/")
async def root():
return {
"name": mcp_server.name,
"version": mcp_server.version,
"protocol": "MCP Streamable HTTP",
"endpoint": "/mcp",
"broadcasting": ENABLE_BROADCAST,
"broadcast_interval": BROADCAST_INTERVAL if ENABLE_BROADCAST else None
}
@app.post("/mcp")
@app.get("/mcp")
async def mcp_endpoint(
request: Request,
mcp_session_id: Optional[str] = Header(None, alias="Mcp-Session-Id")
):
accept_header = request.headers.get("accept", "")
wants_sse = "text/event-stream" in accept_header
wants_json = "application/json" in accept_header
if request.method == "GET":
if not wants_sse:
raise HTTPException(status_code=405, detail="GET requires text/event-stream")
if not mcp_session_id or mcp_session_id not in mcp_server.sessions:
raise HTTPException(status_code=400, detail="Invalid session")
async def sse_generator() -> AsyncGenerator[str, None]:
queue = mcp_server.message_queues.get(mcp_session_id)
if not queue:
return
try:
while True:
if await request.is_disconnected():
break
try:
message = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"data: {json.dumps(message)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
mcp_server.cleanup_session(mcp_session_id)
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
try:
body = await request.json()
except json.JSONDecodeError:
error_response = mcp_server.error_response(None, -32700, "Parse error")
return JSONResponse(content=error_response, status_code=400)
is_initialize = body.get("method") == "initialize"
if not is_initialize:
if not mcp_session_id or mcp_session_id not in mcp_server.sessions:
raise HTTPException(status_code=400, detail="Invalid session")
result = await mcp_server.handle_message(body, mcp_session_id)
if is_initialize and isinstance(result, tuple):
response, new_session_id = result
if wants_sse and not wants_json:
async def init_sse_generator() -> AsyncGenerator[str, None]:
yield f"data: {json.dumps(response)}\n\n"
queue = mcp_server.message_queues.get(new_session_id)
if queue:
try:
while True:
if await request.is_disconnected():
break
try:
message = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"data: {json.dumps(message)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
mcp_server.cleanup_session(new_session_id)
return StreamingResponse(
init_sse_generator(),
media_type="text/event-stream",
headers={
"Mcp-Session-Id": new_session_id,
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
else:
return JSONResponse(
content=response,
headers={"Mcp-Session-Id": new_session_id}
)
if result is None:
return JSONResponse(content={"status": "ok"})
if mcp_session_id and mcp_session_id in mcp_server.message_queues:
await mcp_server.message_queues[mcp_session_id].put(result)
return JSONResponse(content=result)
@app.delete("/mcp")
async def mcp_delete(
mcp_session_id: Optional[str] = Header(None, alias="Mcp-Session-Id")
):
if mcp_session_id:
mcp_server.cleanup_session(mcp_session_id)
return JSONResponse(content={"status": "ok"})
if __name__ == "__main__":
import uvicorn
print("=" * 60)
print(f"š {SERVER_NAME}")
print("=" * 60)
print(f"Port: {SERVER_PORT}")
print(f"Endpoint: http://localhost:{SERVER_PORT}/mcp")
print(f"Broadcasting: {'ENABLED' if ENABLE_BROADCAST else 'DISABLED'}")
if ENABLE_BROADCAST:
print(f"Broadcast Interval: {BROADCAST_INTERVAL}s")
print(f"Multicast: {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
print("=" * 60)
uvicorn.run(app, host="0.0.0.0", port=SERVER_PORT)