Skip to main content
Glama
CLAUDE.md8.51 kB
# Core Module - Development Memory ## Configuration Management Patterns ### Pydantic Settings with Environment Variables ```python class ServerConfig(BaseModel): host: str = Field(default="127.0.0.1", alias="FASTMCP_HOST") port: int = Field(default=8000, alias="FASTMCP_PORT") log_level: str = Field(default="INFO", alias="LOG_LEVEL") class DockerMCPConfig(BaseSettings): hosts: dict[str, DockerHost] = Field(default_factory=dict) server: ServerConfig = Field(default_factory=ServerConfig) model_config = {"env_file": ".env", "extra": "ignore"} ``` ### Configuration Hierarchy 1. Command line arguments (`--config`) 2. Project config (`config/hosts.yml`) 3. User config (`~/.config/docker-mcp/hosts.yml`) 4. Environment variables (Field aliases) ## Modern Docker Context Management (Python 3.11+) ### Async Context Management with Resource Cleanup ```python import asyncio from contextlib import AsyncExitStack, asynccontextmanager from typing import AsyncGenerator, Dict import weakref class ModernDockerContextManager: """Modern async context manager with proper resource management.""" def __init__(self, config: DockerMCPConfig): self.config = config self._context_cache: Dict[str, str] = {} # host_id -> context_name self._connection_pools: Dict[str, AsyncConnectionPool] = {} self._cleanup_tasks: weakref.WeakSet = weakref.WeakSet() @asynccontextmanager async def docker_operation_context( self, host_id: str, timeout: float = 30.0 ) -> AsyncGenerator[DockerOperationContext, None]: """Modern context manager for Docker operations with timeout and cleanup.""" async with AsyncExitStack() as stack: # Ensure context exists with timeout async with asyncio.timeout(timeout): context_name = await self.ensure_context(host_id) # Create operation context op_context = DockerOperationContext( host_id=host_id, context_name=context_name, start_time=time.perf_counter() ) # Register cleanup cleanup_task = asyncio.create_task(self._cleanup_on_exit(op_context)) self._cleanup_tasks.add(cleanup_task) stack.callback(cleanup_task.cancel) try: yield op_context finally: # Ensure proper cleanup even on cancellation if not cleanup_task.done(): cleanup_task.cancel() async def ensure_context(self, host_id: str) -> str: """Ensure Docker context exists with modern async patterns.""" # 1. Check cache with async lock for thread safety async with asyncio.Lock(): if host_id in self._context_cache: context_name = self._context_cache[host_id] if await self._context_exists(context_name): return context_name else: # Clear stale cache entry del self._context_cache[host_id] # 2. Create context with retry logic context_name = f"docker-mcp-{host_id}" max_retries = 3 for attempt in range(max_retries): try: await self._create_docker_context(host_id, context_name) # Cache successful creation async with asyncio.Lock(): self._context_cache[host_id] = context_name return context_name except* (DockerContextError, SSHConnectionError) as eg: if attempt == max_retries - 1: # Log all attempts failed errors = [str(e) for e in eg.exceptions] logger.error( "Failed to create Docker context after retries", host_id=host_id, attempts=max_retries, errors=errors ) raise # Wait before retry with exponential backoff wait_time = 2 ** attempt await asyncio.sleep(wait_time) async def batch_ensure_contexts( self, host_ids: list[str] ) -> dict[str, str | Exception]: """Create multiple contexts concurrently with proper error handling.""" async def ensure_single_context(host_id: str): try: context_name = await self.ensure_context(host_id) return host_id, context_name except Exception as e: return host_id, e # Use TaskGroup for concurrent context creation (Python 3.11+) results = {} async with asyncio.TaskGroup() as tg: tasks = [ tg.create_task(ensure_single_context(host_id)) for host_id in host_ids ] # Process results for task in tasks: host_id, result = await task results[host_id] = result return results class DockerOperationContext: """Context object for Docker operations with automatic resource tracking.""" def __init__(self, host_id: str, context_name: str, start_time: float): self.host_id = host_id self.context_name = context_name self.start_time = start_time self.resources: list[Any] = [] self.logger = structlog.get_logger() def add_resource(self, resource: Any): """Track a resource for automatic cleanup.""" self.resources.append(resource) async def cleanup(self): """Clean up all tracked resources.""" for resource in reversed(self.resources): try: if hasattr(resource, 'aclose'): await resource.aclose() elif hasattr(resource, 'close'): if asyncio.iscoroutinefunction(resource.close): await resource.close() else: resource.close() except Exception as e: await self.logger.awarning( "Error during resource cleanup", resource_type=type(resource).__name__, error=str(e) ) ``` ### SSH URL Construction ```python # Build SSH URL for Docker context ssh_url = f"ssh://{host_config.user}@{host_config.hostname}" if host_config.port != 22: ssh_url += f":{host_config.port}" ``` ### Subprocess Security ```python # Mark legitimate Docker/SSH calls with security comment result = await asyncio.get_event_loop().run_in_executor( None, lambda: subprocess.run( # nosec B603 cmd, check=False, capture_output=True, text=True ) ) ``` ## Compose File Management ### Path Resolution Strategy ```python async def get_compose_path(self, host_id: str) -> str: # 1. Use explicit compose_path if configured if host_config.compose_path: return host_config.compose_path # 2. Auto-discover common paths return await self._auto_discover_compose_path(host_id) ``` ## Hot Reload Pattern ### File Watching with Callbacks ```python class ConfigFileWatcher: def __init__(self, config_path: str, reload_callback: Callable[[DockerMCPConfig], Awaitable[None]]): self.reload_callback = reload_callback async def _watch_loop(self): async for changes in awatch(self.config_path): new_config = load_config(str(self.config_path)) await self.reload_callback(new_config) ``` ## SSH Config Import ### Validation Pattern ```python class SSHConfigEntry: def is_valid(self) -> bool: # Skip wildcard entries if "*" in self.name or "?" in self.name: return False # Require hostname return bool(self.hostname) def to_docker_host(self) -> DockerHost: return DockerHost( hostname=self.hostname or self.name, user=self.user or "root", tags=["imported", "ssh-config"] ) ``` ## Exception Patterns All core exceptions inherit from base: - `DockerMCPError` - Base exception - `DockerContextError` - Context operations - `ConfigurationError` - Config loading/validation

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server