main.pyā¢18.1 kB
#!/usr/bin/env python3
"""
Podman MCP Server - Manage containers through MCP protocol
Runs in HTTP mode with streamable-http transport for discovery
Broadcasts itself on multicast for automatic discovery
"""
import os
import subprocess
import logging
import json
import socket
import asyncio
import uuid
import threading
import time
from typing import List, Dict, Any
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
# --- Load env ---
load_dotenv()
# Configure logging
logging.basicConfig(
filename="podman_mcp.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger("podman_mcp")
# --- Configuration ---
transport = os.getenv("MCP_TRANSPORT", "stdio")
server_name = os.getenv("MCP_SERVER_NAME", "Podman MCP Server")
enable_broadcast = os.getenv("MCP_ENABLE_BROADCAST", "true").lower() == "true"
broadcast_interval = int(os.getenv("MCP_BROADCAST_INTERVAL", "30"))
# Multicast settings
SSDP_ADDR = "239.255.255.250"
MCP_DISCOVERY_PORT = 5353
if transport == "http" or transport == "streamable-http":
host = os.getenv("MCP_HOST", "0.0.0.0")
port = int(os.getenv("MCP_PORT", "3000"))
logger.info(f"Starting MCP server in {transport} mode at {host}:{port}")
mcp = FastMCP(name=server_name, host=host, port=port, debug=True)
else:
mcp = FastMCP(name=server_name)
# --- Multicast Broadcaster ---
class MCPBroadcaster:
"""Broadcasts MCP server presence on multicast"""
def __init__(self, server_name: str, port: int, transport: str):
self.server_name = server_name
self.port = port
self.transport = transport
self.uuid = str(uuid.uuid4())
self.running = False
self.sock = None
def get_local_ip(self):
"""Get local IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return '127.0.0.1'
def create_announcement(self):
"""Create MCP discovery announcement"""
local_ip = self.get_local_ip()
announcement = {
"type": "mcp-announcement",
"protocol": "MCP-DISCOVERY-v1",
"uuid": self.uuid,
"name": self.server_name,
"host": local_ip,
"port": self.port,
"endpoint": "/mcp",
"protocol_type": "MCP-HTTP",
"transport": self.transport,
"version": "1.0.0"
}
return json.dumps(announcement).encode('utf-8')
def start_broadcasting(self):
"""Start broadcasting server presence"""
if not enable_broadcast:
logger.info("Broadcasting disabled")
return
self.running = True
# Create UDP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
logger.info(f"Starting multicast broadcaster on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
logger.info(f"Broadcasting every {broadcast_interval} seconds")
print(f"š Broadcasting {self.server_name} on {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
while self.running:
try:
message = self.create_announcement()
self.sock.sendto(message, (SSDP_ADDR, MCP_DISCOVERY_PORT))
logger.debug(f"Broadcast sent: {message.decode('utf-8')}")
except Exception as e:
logger.error(f"Broadcast error: {e}")
# Sleep in small intervals so shutdown is responsive
for _ in range(broadcast_interval * 10):
if not self.running:
break
time.sleep(0.1)
def stop_broadcasting(self):
"""Stop broadcasting"""
self.running = False
if self.sock:
self.sock.close()
logger.info("Broadcasting stopped")
# --- Helper to run podman commands ---
def run_podman(args: List[str]) -> Dict[str, Any]:
"""Run a podman command and capture output"""
try:
cmd = ["podman"] + args
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {"success": False, "stdout": "", "stderr": "Command timed out", "returncode": -1}
except Exception as e:
return {"success": False, "stdout": "", "stderr": str(e), "returncode": -1}
# --- Tools ---
@mcp.tool(title="List containers", description="List containers (running by default, all if requested).")
def list_containers(all: bool = Field(False, description="Show all containers, not just running")) -> str:
args = ["ps", "--format", "json"]
if all:
args.append("--all")
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container info", description="Inspect a container by name or ID.")
def container_info(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["inspect", container])
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Start container", description="Start a stopped container.")
def start_container(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["start", container])
return f"Started container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Stop container", description="Stop a running container.")
def stop_container(
container: str = Field(..., description="Container name or ID"),
timeout: int = Field(10, description="Seconds to wait before killing container"),
) -> str:
result = run_podman(["stop", "-t", str(timeout), container])
return f"Stopped container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Restart container", description="Restart a container.")
def restart_container(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["restart", container])
return f"Restarted container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container logs", description="Get logs from a container.")
def container_logs(
container: str = Field(..., description="Container name or ID"),
tail: int = Field(100, description="Number of lines to show from end of log"),
) -> str:
result = run_podman(["logs", "--tail", str(tail), container])
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Run container", description="Run a new container.")
def run_container(
image: str = Field(..., description="Container image"),
name: str = Field(None, description="Optional name"),
detach: bool = Field(True, description="Run in background"),
ports: List[str] = Field(default_factory=list, description="Port mappings (e.g., ['8080:80'])"),
env: List[str] = Field(default_factory=list, description="Environment variables (KEY=VAL)"),
volumes: List[str] = Field(default_factory=list, description="Volumes (e.g., ['/host:/container'])"),
) -> str:
args = ["run"]
if detach:
args.append("-d")
if name:
args.extend(["--name", name])
for p in ports:
args.extend(["-p", p])
for e in env:
args.extend(["-e", e])
for v in volumes:
args.extend(["-v", v])
args.append(image)
result = run_podman(args)
return f"Started container: {result['stdout']}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Remove container", description="Remove a container.")
def remove_container(
container: str = Field(..., description="Container name or ID"),
force: bool = Field(False, description="Force remove running container"),
) -> str:
args = ["rm"]
if force:
args.append("-f")
args.append(container)
result = run_podman(args)
return f"Removed container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Exec in container", description="Execute a command inside a container.")
def exec_container(
container: str = Field(...),
command: List[str] = Field(..., description="Command to execute"),
) -> str:
args = ["exec", container] + command
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="List images", description="List container images.")
def list_images(all: bool = Field(False, description="Show all images including intermediate")) -> str:
args = ["images", "--format", "json"]
if all:
args.append("--all")
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Pull image", description="Pull a container image from a registry.")
def pull_image(image: str = Field(..., description="Image name with optional tag")) -> str:
result = run_podman(["pull", image])
return f"Pulled image: {image}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container stats", description="Get resource usage statistics for containers.")
def container_stats(
container: str = Field(None, description="Container name or ID (all containers if not specified)"),
no_stream: bool = Field(True, description="Disable streaming stats and only pull the first result")
) -> str:
args = ["stats", "--format", "json"]
if no_stream:
args.append("--no-stream")
if container:
args.append(container)
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
# --- Entrypoint ---
if __name__ == "__main__":
logger.info(f"Starting MCP server with transport={transport}")
print("=" * 60)
print(f"Starting {server_name}")
print("=" * 60)
print(f"Transport: {transport}")
if transport in ["http", "streamable-http"]:
print(f"Host: 0.0.0.0")
print(f"Port: {os.getenv('MCP_PORT', '3000')}")
print(f"Endpoint: http://localhost:{os.getenv('MCP_PORT', '3000')}/mcp")
print(f"Broadcasting: {'ENABLED' if enable_broadcast else 'DISABLED'}")
if enable_broadcast:
print(f"Broadcast interval: {broadcast_interval}s")
print(f"Multicast: {SSDP_ADDR}:{MCP_DISCOVERY_PORT}")
print("=" * 60)
# Start broadcaster if in HTTP mode
broadcaster = None
if transport in ["http", "streamable-http"] and enable_broadcast:
broadcaster = MCPBroadcaster(server_name, int(os.getenv("MCP_PORT", "3000")), transport)
broadcaster_thread = threading.Thread(target=broadcaster.start_broadcasting, daemon=True)
broadcaster_thread.start()
try:
mcp.run(transport=transport)
finally:
if broadcaster:
broadcaster.stop_broadcasting()
# --- Helper to run podman commands ---
def run_podman(args: List[str]) -> Dict[str, Any]:
"""Run a podman command and capture output"""
try:
cmd = ["podman"] + args
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {"success": False, "stdout": "", "stderr": "Command timed out", "returncode": -1}
except Exception as e:
return {"success": False, "stdout": "", "stderr": str(e), "returncode": -1}
# --- Tools ---
@mcp.tool(title="List containers", description="List containers (running by default, all if requested).")
def list_containers(all: bool = Field(False, description="Show all containers, not just running")) -> str:
args = ["ps", "--format", "json"]
if all:
args.append("--all")
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container info", description="Inspect a container by name or ID.")
def container_info(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["inspect", container])
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Start container", description="Start a stopped container.")
def start_container(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["start", container])
return f"Started container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Stop container", description="Stop a running container.")
def stop_container(
container: str = Field(..., description="Container name or ID"),
timeout: int = Field(10, description="Seconds to wait before killing container"),
) -> str:
result = run_podman(["stop", "-t", str(timeout), container])
return f"Stopped container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Restart container", description="Restart a container.")
def restart_container(container: str = Field(..., description="Container name or ID")) -> str:
result = run_podman(["restart", container])
return f"Restarted container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container logs", description="Get logs from a container.")
def container_logs(
container: str = Field(..., description="Container name or ID"),
tail: int = Field(100, description="Number of lines to show from end of log"),
) -> str:
result = run_podman(["logs", "--tail", str(tail), container])
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Run container", description="Run a new container.")
def run_container(
image: str = Field(..., description="Container image"),
name: str = Field(None, description="Optional name"),
detach: bool = Field(True, description="Run in background"),
ports: List[str] = Field(default_factory=list, description="Port mappings (e.g., ['8080:80'])"),
env: List[str] = Field(default_factory=list, description="Environment variables (KEY=VAL)"),
volumes: List[str] = Field(default_factory=list, description="Volumes (e.g., ['/host:/container'])"),
) -> str:
args = ["run"]
if detach:
args.append("-d")
if name:
args.extend(["--name", name])
for p in ports:
args.extend(["-p", p])
for e in env:
args.extend(["-e", e])
for v in volumes:
args.extend(["-v", v])
args.append(image)
result = run_podman(args)
return f"Started container: {result['stdout']}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Remove container", description="Remove a container.")
def remove_container(
container: str = Field(..., description="Container name or ID"),
force: bool = Field(False, description="Force remove running container"),
) -> str:
args = ["rm"]
if force:
args.append("-f")
args.append(container)
result = run_podman(args)
return f"Removed container: {container}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Exec in container", description="Execute a command inside a container.")
def exec_container(
container: str = Field(...),
command: List[str] = Field(..., description="Command to execute"),
) -> str:
args = ["exec", container] + command
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="List images", description="List container images.")
def list_images(all: bool = Field(False, description="Show all images including intermediate")) -> str:
args = ["images", "--format", "json"]
if all:
args.append("--all")
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Pull image", description="Pull a container image from a registry.")
def pull_image(image: str = Field(..., description="Image name with optional tag")) -> str:
result = run_podman(["pull", image])
return f"Pulled image: {image}" if result["success"] else f"Error: {result['stderr']}"
@mcp.tool(title="Container stats", description="Get resource usage statistics for containers.")
def container_stats(
container: str = Field(None, description="Container name or ID (all containers if not specified)"),
no_stream: bool = Field(True, description="Disable streaming stats and only pull the first result")
) -> str:
args = ["stats", "--format", "json"]
if no_stream:
args.append("--no-stream")
if container:
args.append(container)
result = run_podman(args)
return result["stdout"] if result["success"] else f"Error: {result['stderr']}"
# --- Entrypoint ---
if __name__ == "__main__":
logger.info(f"Starting MCP server with transport={transport}")
print("=" * 60)
print(f"Starting {server_name}")
print("=" * 60)
print(f"Transport: {transport}")
if transport in ["http", "streamable-http"]:
print(f"Host: 0.0.0.0")
print(f"Port: {os.getenv('MCP_PORT', '3000')}")
print(f"Endpoint: http://localhost:{os.getenv('MCP_PORT', '3000')}/mcp")
print("=" * 60)
mcp.run(transport=transport)