Skip to main content
Glama

MCP Dockerized Server

by antpavlenko
main.py15.4 kB
import os import logging import secrets import json from datetime import datetime from typing import Dict, List, Any, Optional from pathlib import Path import importlib import importlib.util import inspect import sys import threading import uvicorn from fastapi import FastAPI, HTTPException, Depends, Security, status # type: ignore from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # type: ignore from pydantic import BaseModel from dotenv import load_dotenv # Optional hot-reload dependencies try: from watchdog.observers import Observer # type: ignore from watchdog.events import FileSystemEventHandler # type: ignore except Exception: Observer = None # type: ignore FileSystemEventHandler = object # type: ignore from mcp_tools.base import BaseMCPTool from mcp_tools.console_tool import ConsoleTool from mcp_tools.system_info_tool import SystemInfoTool # Load environment variables load_dotenv() VERSION = "1.1.0" class MCPServer: def __init__(self): self.port = int(os.getenv("MCPD_PORT", 666)) self.log_level = os.getenv("MCPD_LOG_LEVEL", "INFO").upper() self.api_key_length = int(os.getenv("MCPD_API_KEY_LENGTH", 32)) # Setup logging self.setup_logging() # Initialize data directory and API keys self.data_dir = Path("data") self.data_dir.mkdir(exist_ok=True) self.api_keys_file = self.data_dir / "api_keys.json" # Load or generate initial API key self.api_keys = self.load_api_keys() if not self.api_keys: initial_key = self.generate_api_key() self.logger.info(f"🔑 Initial API Key generated: {initial_key}") # Initialize MCP tools self.tools: Dict[str, BaseMCPTool] = {} self._tool_meta: Dict[str, Dict[str, Any]] = {} self.tools_lock = threading.RLock() self.tools_dir = Path("mcp_tools") self.tools_dir.mkdir(exist_ok=True) self.load_tools() # Setup FastAPI self.app = FastAPI( title="MCP Dockerized Server", description="A Model Context Protocol server with extensible tools", version=VERSION ) self.setup_routes() # Security self.security = HTTPBearer() # Start file watcher for hot-loading tools if watchdog is available if Observer is not None: self._start_tools_watcher() else: self.logger.warning("watchdog not installed; hot-loading of python tools is disabled. Install 'watchdog' to enable.") def setup_logging(self): """Setup logging with timestamp and configurable level""" logging.basicConfig( level=getattr(logging, self.log_level), format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.logger = logging.getLogger("mcp-server") def load_api_keys(self) -> List[str]: """Load API keys from file""" if self.api_keys_file.exists(): try: with open(self.api_keys_file, 'r') as f: data = json.load(f) return data.get('keys', []) except Exception as e: self.logger.error(f"Error loading API keys: {e}") return [] return [] def save_api_keys(self): """Save API keys to file""" try: with open(self.api_keys_file, 'w') as f: json.dump({'keys': self.api_keys}, f, indent=2) except Exception as e: self.logger.error(f"Error saving API keys: {e}") def generate_api_key(self) -> str: """Generate a new API key""" api_key = secrets.token_urlsafe(self.api_key_length) self.api_keys.append(api_key) self.save_api_keys() self.logger.info(f"New API key generated and saved") return api_key def validate_api_key(self, credentials: HTTPAuthorizationCredentials = Security(HTTPBearer())): """Validate API key""" if credentials.credentials not in self.api_keys: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) return credentials.credentials def _load_tool_from_module(self, module, src_path: Path) -> Optional[BaseMCPTool]: """Given a loaded module, discover the first BaseMCPTool subclass and instantiate it.""" tool_cls = None for _, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, BaseMCPTool) and obj is not BaseMCPTool: tool_cls = obj break if not tool_cls: return None tool: BaseMCPTool = tool_cls() # type: ignore with self.tools_lock: # Replace if exists self.tools[tool.name] = tool self._tool_meta[tool.name] = {"path": str(src_path), "module": module.__name__} self.logger.info(f"Loaded tool: {tool.name} from {src_path}") return tool def _import_module_from_path(self, path: Path) -> Optional[object]: """Dynamically import a Python module from a file path with a unique module name.""" try: unique_name = f"mcp_dynamic.{path.stem}_{int(path.stat().st_mtime)}" spec = importlib.util.spec_from_file_location(unique_name, str(path)) if spec and spec.loader: module = importlib.util.module_from_spec(spec) sys.modules[unique_name] = module spec.loader.exec_module(module) # type: ignore return module except Exception as e: self.logger.error(f"Failed to import module from {path}: {e}") return None def _load_tool_from_file(self, path: Path) -> Optional[BaseMCPTool]: module = self._import_module_from_path(path) if not module: return None return self._load_tool_from_module(module, path) def _unload_tools_from_path(self, path: Path): """Unload any tool previously loaded from the given path.""" with self.tools_lock: to_remove = [name for name, meta in self._tool_meta.items() if meta.get("path") == str(path)] for name in to_remove: self.tools.pop(name, None) mod_name = self._tool_meta[name].get("module") if mod_name and mod_name in sys.modules: try: del sys.modules[mod_name] except Exception: pass del self._tool_meta[name] self.logger.info(f"Unloaded tool: {name} from {path}") def load_tools(self): """Load all available MCP tools (built-ins + dynamically discovered).""" # Built-in tools with self.tools_lock: console_tool = ConsoleTool() self.tools[console_tool.name] = console_tool self._tool_meta[console_tool.name] = {"path": str(self.tools_dir / "console_tool.py"), "module": ConsoleTool.__module__} self.logger.info(f"Loaded tool: {console_tool.name}") system_info_tool = SystemInfoTool() self.tools[system_info_tool.name] = system_info_tool self._tool_meta[system_info_tool.name] = {"path": str(self.tools_dir / "system_info_tool.py"), "module": SystemInfoTool.__module__} self.logger.info(f"Loaded tool: {system_info_tool.name}") # Discover other tools in mcp_tools directory if self.tools_dir.exists(): for tool_file in self.tools_dir.glob("*_tool.py"): # Skip the built-ins already loaded statically if tool_file.name in {"console_tool.py", "system_info_tool.py", "base.py", "__init__.py"}: continue try: self._load_tool_from_file(tool_file) except Exception as e: self.logger.error(f"Error loading tool {tool_file}: {e}") def _start_tools_watcher(self): class _Handler(FileSystemEventHandler): def __init__(self, outer: "MCPServer"): self._outer = outer def on_created(self, event): # type: ignore[override] p = Path(event.src_path) if p.is_file() and p.name.endswith("_tool.py"): self._outer.logger.info(f"Detected new tool file: {p}") self._outer._load_tool_from_file(p) def on_modified(self, event): # type: ignore[override] p = Path(event.src_path) if p.is_file() and p.name.endswith("_tool.py"): self._outer.logger.info(f"Detected modified tool file: {p}") self._outer._unload_tools_from_path(p) self._outer._load_tool_from_file(p) def on_deleted(self, event): # type: ignore[override] p = Path(event.src_path) if p.name.endswith("_tool.py"): self._outer.logger.info(f"Detected deleted tool file: {p}") self._outer._unload_tools_from_path(p) self._observer = Observer() handler = _Handler(self) self._observer.schedule(handler, str(self.tools_dir), recursive=False) self._observer.daemon = True self._observer.start() self.logger.info(f"Started watchdog observer for tools in {self.tools_dir}") def setup_routes(self): """Setup FastAPI routes""" @self.app.get("/health") async def health_check(): """Health check endpoint""" with self.tools_lock: tools_count = len(self.tools) return { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "version": VERSION, "tools_count": tools_count } @self.app.post("/api/generate-key") async def generate_new_api_key(api_key: str = Depends(self.validate_api_key)): """Generate a new API key""" new_key = self.generate_api_key() return {"api_key": new_key, "message": "New API key generated successfully"} @self.app.get("/api/tools") async def list_tools(api_key: str = Depends(self.validate_api_key)): """Get description of all available tools""" with self.tools_lock: tools_info = [] for tool_name, tool in self.tools.items(): tools_info.append({ "name": tool.name, "description": tool.description, "parameters": tool.get_parameters_schema() }) return {"tools": tools_info} @self.app.post("/api/tools/{tool_name}") async def execute_tool( tool_name: str, request: Dict[str, Any], api_key: str = Depends(self.validate_api_key) ): """Execute a specific tool""" with self.tools_lock: tool = self.tools.get(tool_name) if not tool: raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found") try: result = await tool.execute(request) return { "tool": tool_name, "result": result, "timestamp": datetime.utcnow().isoformat() } except Exception as e: self.logger.error(f"Error executing tool {tool_name}: {e}") raise HTTPException( status_code=500, detail=f"Tool execution failed: {str(e)}" ) @self.app.get("/api/mcp/initialize") async def mcp_initialize(api_key: str = Depends(self.validate_api_key)): """MCP Protocol: Initialize connection""" return { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": True}}, "serverInfo": {"name": "mcp-dockerized", "version": VERSION} } @self.app.get("/api/mcp/tools/list") async def mcp_list_tools(api_key: str = Depends(self.validate_api_key)): """MCP Protocol: List available tools""" with self.tools_lock: tools_list = [ {"name": t.name, "description": t.description, "inputSchema": t.get_parameters_schema()} for t in self.tools.values() ] return {"tools": tools_list} @self.app.post("/api/mcp/tools/call") async def mcp_call_tool( request: Dict[str, Any], api_key: str = Depends(self.validate_api_key) ): """MCP Protocol: Call a tool""" tool_name = request.get("name") arguments = request.get("arguments", {}) with self.tools_lock: tool = self.tools.get(tool_name) if not tool: return {"isError": True, "content": [{"type": "text", "text": f"Tool '{tool_name}' not found"}]} try: result = await tool.execute(arguments) return {"content": [{"type": "text", "text": str(result)}]} except Exception as e: return {"isError": True, "content": [{"type": "text", "text": f"Tool execution failed: {str(e)}"}]} @self.app.post("/api/tools/reload") async def reload_tools(api_key: str = Depends(self.validate_api_key)): """Manually rescan the tools directory and reload python tools (useful when watchdog is unavailable).""" reloaded = [] errors = [] if self.tools_dir.exists(): for tool_file in self.tools_dir.glob("*_tool.py"): if tool_file.name in {"console_tool.py", "system_info_tool.py", "base.py", "__init__.py"}: continue try: self._unload_tools_from_path(tool_file) tool = self._load_tool_from_file(tool_file) if tool: reloaded.append(tool.name) except Exception as e: errors.append({"file": str(tool_file), "error": str(e)}) return {"reloaded": reloaded, "errors": errors, "timestamp": datetime.utcnow().isoformat()} def run(self): """Start the MCP server""" self.logger.info(f"🚀 Starting MCP Dockerized Server on port {self.port}") self.logger.info(f"📊 Log level: {self.log_level}") self.logger.info(f"🔧 Loaded {len(self.tools)} tools: {list(self.tools.keys())}") if self.api_keys: self.logger.info(f"🔑 Active API keys: {len(self.api_keys)}") self.logger.info(f"🔑 First API key: {self.api_keys[0]}") uvicorn.run( self.app, host="0.0.0.0", port=self.port, log_level=self.log_level.lower() ) if __name__ == "__main__": server = MCPServer() server.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/antpavlenko/mcp_dockerized'

If you have feedback or need assistance with the MCP directory API, please join our Discord server