Skip to main content
Glama
container.py63.2 kB
""" Container Management Service Business logic for Docker container operations with formatted output. """ from datetime import UTC, datetime from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from docker_mcp.core.docker_context import DockerContextManager import structlog from fastmcp.tools.tool import ToolResult from mcp.types import TextContent from ..constants import CONTAINER_ID, HOST_ID from ..core.config_loader import DockerMCPConfig from ..tools.containers import ContainerTools from ..utils import validate_host from .logs import LogsService class ContainerService: """Service for Docker container management operations.""" def __init__( self, config: DockerMCPConfig, context_manager: "DockerContextManager", logs_service: LogsService | None = None, ): self.config = config self.context_manager = context_manager self.container_tools = ContainerTools(config, context_manager) self.logs_service = logs_service or LogsService(config, context_manager) self.logger = structlog.get_logger() def _build_error_response( self, host_id: str, container_id: str | None, action: str | None, error: Exception, message: str, ) -> dict[str, Any]: """Build a standardized error response.""" self.logger.error( "container service error", host_id=host_id, container_id=container_id, action=action, error=str(error), error_type=type(error).__name__, ) error_text = str(error) if error else "" formatted_message = f"❌ {message}" if error_text and error_text not in message: formatted_message = f"{formatted_message}\nDetails: {error_text}" return { "success": False, "message": message, "host_id": host_id, "container_id": container_id, "action": action, "error": str(error), "error_type": type(error).__name__, "timestamp": datetime.now(UTC).isoformat(), "formatted_output": formatted_message, } def _validate_container_safety(self, container_id: str) -> tuple[bool, str]: """Validate container is safe for testing operations.""" # List of production containers that should not be operated on during tests production_containers = { "opengist", "nextcloud", "plex", "portainer", "traefik", "mysql", "postgres", "redis", "mongodb", "elasticsearch", "grafana", "prometheus", "nginx-proxy", "ssl-companion", } # Check if this looks like a production container if container_id.lower() in production_containers: return ( False, f"Safety check failed: '{container_id}' appears to be a production container. Use test containers for testing.", ) # Allow test containers (those with "test" prefix or specific test patterns) if ( container_id.startswith("test-") or "test" in container_id.lower() or container_id.startswith("mcp-") ): return True, "" # For other containers, issue a warning but allow operation # This preserves backward compatibility while encouraging safe practices self.logger.warning( "Operating on container that may be production", container_id=container_id, recommendation="Use test containers (test-*) for safer testing", ) return True, "" async def _check_container_exists(self, host_id: str, container_id: str) -> dict[str, Any]: """Check if a container exists on the host before performing operations.""" try: # Use container tools to get container info (which checks existence) container_result = await self.container_tools.get_container_info(host_id, container_id) if "error" in container_result: # Try to provide helpful suggestions suggestion = "" error_lower = container_result["error"].lower() if "not found" in error_lower: # Get list of available containers to suggest alternatives containers_result = await self.container_tools.list_containers( host_id, all_containers=True, limit=1000, offset=0, ) if containers_result.get("success") and containers_result.get("containers"): container_names = [ c.get("name", "") for c in containers_result["containers"] ] # Find similar names similar_names = [ name for name in container_names if container_id.lower() in name.lower() or name.lower() in container_id.lower() ] if similar_names: suggestion = f"Did you mean one of: {', '.join(similar_names)}?" elif container_names: suggestion = ( f"Available containers: {', '.join(container_names)}" ) return { "exists": False, "error": container_result["error"], "suggestion": suggestion } # Container exists, extract info from result container_info = container_result.get("info", container_result) return {"exists": True, "info": container_info} except Exception as e: return { "exists": False, "error": f"Failed to check container existence: {str(e)}", "suggestion": "Verify the container name and try again" } def _enhance_operation_result(self, result: dict[str, Any], host_id: str, container_id: str, action: str) -> dict[str, Any]: """Enhance operation result with context and user-friendly messaging.""" from datetime import UTC, datetime enhanced = result.copy() # Add operation context enhanced["operation_context"] = { "host_id": host_id, "container_id": container_id, "action": action, "timestamp": datetime.now(UTC).isoformat(), "operation_type": "container_management" } # Create user-friendly messages if result.get("success"): action_messages = { "start": f"Container '{container_id}' started successfully on {host_id}", "stop": f"Container '{container_id}' stopped successfully on {host_id}", "restart": f"Container '{container_id}' restarted successfully on {host_id}", "remove": f"Container '{container_id}' removed successfully from {host_id}", "pause": f"Container '{container_id}' paused successfully on {host_id}", "unpause": f"Container '{container_id}' unpaused successfully on {host_id}" } enhanced["user_message"] = action_messages.get(action, f"Container operation '{action}' completed successfully") # Add helpful next steps if action == "start": enhanced["next_steps"] = [ "Check container logs if needed: docker_container logs", "Monitor container status: docker_container info" ] elif action == "stop": enhanced["next_steps"] = [ "Container can be restarted with: docker_container start", "Remove container if no longer needed: docker_container remove" ] else: error_message = result.get("message", result.get("error", "Unknown error")) enhanced["user_message"] = f"Failed to {action} container '{container_id}' on {host_id}: {error_message}" # Add troubleshooting hints enhanced["troubleshooting_hints"] = [ "Verify the container name is correct", "Check if you have sufficient permissions", "Ensure the container is in the correct state for this operation" ] if "permission denied" in error_message.lower(): enhanced["troubleshooting_hints"].insert(0, "Check Docker daemon permissions and user group membership") elif "already" in error_message.lower(): enhanced["troubleshooting_hints"].insert(0, "Container may already be in the target state") # Preserve original message in raw_message for debugging enhanced["raw_message"] = result.get("message", "") return enhanced async def list_containers( self, host_id: str, all_containers: bool = False, limit: int = 20, offset: int = 0 ) -> ToolResult: """List containers on a specific Docker host with pagination.""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use container tools to get containers with pagination result = await self.container_tools.list_containers( host_id, all_containers, limit, offset ) # Create clean, professional summary containers = result["containers"] pagination = result["pagination"] summary_lines = [ f"Docker Containers on {host_id}", f"Showing {pagination['returned']} of {pagination['total']} containers", "", " Container Ports Project State", " ---------------------------------------- -------------------------------- ---------------------- ----------------", ] for container in containers: summary_lines.append(self._format_container_summary(container)) if pagination["has_next"]: summary_lines.append("") summary_lines.append( f"Next page: Use offset={pagination['offset'] + pagination['limit']}" ) formatted_text = "\n".join(summary_lines) return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": True, HOST_ID: host_id, "containers": containers, "pagination": pagination, "formatted_output": formatted_text, }, ) except Exception as e: self.logger.error("Failed to list containers", host_id=host_id, error=str(e)) formatted_text = f"❌ Failed to list containers: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), HOST_ID: host_id, "formatted_output": formatted_text, }, ) def _format_container_summary(self, container: dict[str, Any]) -> str: """Format container information for display in a single table row.""" # Enhanced status indicators with more states state = container.get("state", "unknown") status_indicators = { "running": "●", "exited": "○", "stopped": "○", "paused": "⏸", "restarting": "◐", "created": "◯", "dead": "✗", "removing": "⊗" } status_indicator = status_indicators.get(state, "?") # Show ALL ports without truncation ports = container.get("ports", []) if ports: port_mappings: list[str] = [] for port in ports: if ":" in port and "→" in port: host_part = port.split(":", 1)[1].split("→", 1)[0] container_part = port.split("→", 1)[1] if "→" in port else "" port_mappings.append( f"{host_part}→{container_part}" if container_part else host_part ) else: port_mappings.append(port) ports_display = ", ".join(port_mappings) if port_mappings else ", ".join(ports) else: ports_display = "-" name = container.get("name", "-") project = container.get("compose_project") or "-" state_display = container.get("state", "-") return ( f"{status_indicator} {name:<38}" f" {ports_display:<32}" f" {project:<22}" f" {state_display}" ) async def get_container_info(self, host_id: str, container_id: str) -> ToolResult: """Get detailed information about a specific container.""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use container tools to get container info container_result = await self.container_tools.get_container_info(host_id, container_id) if "error" in container_result: return ToolResult( content=[TextContent(type="text", text=f"Error: {container_result['error']}")], structured_content={ "success": False, "error": container_result["error"], HOST_ID: host_id, CONTAINER_ID: container_id, }, ) info_payload = container_result.get("data") container_info = info_payload if isinstance(info_payload, dict) else container_result summary_lines = self._format_container_details(container_info, container_id) formatted_text = "\n".join(summary_lines) return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": True, HOST_ID: host_id, CONTAINER_ID: container_id, "info": container_info, "timestamp": container_result.get("timestamp"), "formatted_output": formatted_text, }, ) except Exception as e: self.logger.error( "Failed to get container info", host_id=host_id, container_id=container_id, error=str(e), ) formatted_text = f"❌ Failed to get container info: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), HOST_ID: host_id, CONTAINER_ID: container_id, "formatted_output": formatted_text, }, ) def _format_container_info(self, container_info: dict[str, Any], container_id: str) -> list[str]: """Format comprehensive container information with ALL details in clean, structured format.""" summary_lines = self._format_basic_container_info(container_info, container_id) self._add_runtime_info(summary_lines, container_info) self._add_resource_limits(summary_lines, container_info) self._add_network_info(summary_lines, container_info) self._add_port_info(summary_lines, container_info) self._add_volume_info(summary_lines, container_info) self._add_environment_info(summary_lines, container_info) self._add_compose_info(summary_lines, container_info) self._add_labels_info(summary_lines, container_info) return summary_lines def _format_basic_container_info(self, container_info: dict[str, Any], container_id: str) -> list[str]: """Format basic container information header.""" name = container_info.get("name", container_id) state = container_info.get("state", "unknown") image = container_info.get("image", "unknown") created = container_info.get("created", "unknown") status_display = self._get_status_display(state) return [ f"━━━ Container Details: {name} ━━━", f"Container ID: {container_id}", f"Short ID: {container_id[:12]}", f"Status: {status_display}", f"Image: {image}", f"Created: {created}", "" ] def _get_status_display(self, state: str) -> str: """Get formatted status display with indicators.""" status_indicators = { "running": "● Running", "exited": "○ Exited", "stopped": "○ Stopped", "paused": "⏸ Paused", "restarting": "◐ Restarting", "created": "◯ Created", "dead": "✗ Dead", "removing": "⊗ Removing" } return status_indicators.get(state, f"? {state.title()}") def _add_runtime_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add runtime configuration information.""" runtime_info = [] if container_info.get("command"): runtime_info.append(f"Command: {container_info['command']}") if container_info.get("args"): runtime_info.append(f"Args: {', '.join(container_info['args'])}") if container_info.get("working_dir"): runtime_info.append(f"Working Dir: {container_info['working_dir']}") if container_info.get("user"): runtime_info.append(f"User: {container_info['user']}") if runtime_info: lines.extend(["Runtime Configuration:"] + [f" {info}" for info in runtime_info] + [""]) def _add_resource_limits(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add resource limits information.""" if container_info.get("memory_limit") or container_info.get("cpu_limit"): lines.append("Resource Limits:") if container_info.get("memory_limit"): lines.append(f" Memory: {container_info['memory_limit']}") if container_info.get("cpu_limit"): lines.append(f" CPU: {container_info['cpu_limit']}") lines.append("") def _add_network_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add network information.""" networks = container_info.get("networks", []) if networks: lines.append("Networks:") for network in networks: if isinstance(network, dict): network_name = network.get("name", "unknown") network_ip = network.get("ip", "") lines.append(f" • {network_name}" + (f" ({network_ip})" if network_ip else "")) else: lines.append(f" • {network}") lines.append("") def _add_port_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add port mapping information.""" ports = container_info.get("ports", {}) if ports: lines.extend(self._format_port_mappings(ports)) def _add_volume_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add volume mount information.""" volumes = container_info.get("volumes", []) mounts = container_info.get("mounts", []) all_mounts = volumes + mounts if all_mounts: lines.append("Volume Mounts:") for mount in all_mounts: if isinstance(mount, dict): source = mount.get("source", mount.get("Source", "")) target = mount.get("target", mount.get("Destination", "")) mount_type = mount.get("type", mount.get("Type", "bind")) mode = mount.get("mode", mount.get("Mode", "rw")) lines.append(f" • {source} → {target} ({mount_type}, {mode})") else: lines.append(f" • {mount}") lines.append("") def _add_environment_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add environment variables information.""" env_vars = container_info.get("environment", []) if env_vars: lines.append("Environment Variables:") for env_var in env_vars: if self._is_sensitive_env_var(env_var): var_name = env_var.split("=")[0] if "=" in env_var else env_var lines.append(f" • {var_name}=[REDACTED]") else: lines.append(f" • {env_var}") lines.append("") def _is_sensitive_env_var(self, env_var: str) -> bool: """Check if environment variable contains sensitive information.""" sensitive_keywords = ["PASSWORD", "SECRET", "TOKEN", "KEY", "PRIVATE"] return any(sensitive in env_var.upper() for sensitive in sensitive_keywords) def _add_compose_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add Docker Compose information.""" compose_project = container_info.get("compose_project", "") if compose_project: lines.append("Docker Compose:") lines.append(f" • Project: {compose_project}") compose_file = container_info.get("compose_file", "") if compose_file: lines.append(f" • File: {compose_file}") compose_service = container_info.get("compose_service", "") if compose_service: lines.append(f" • Service: {compose_service}") lines.append("") def _add_labels_info(self, lines: list[str], container_info: dict[str, Any]) -> None: """Add container labels information.""" labels = container_info.get("labels", {}) if labels: lines.append("Labels:") important_labels, other_labels = self._categorize_labels(labels) for key, value in important_labels: lines.append(f" • {key}: {value}") for key, value in other_labels: lines.append(f" • {key}: {value}") lines.append("") def _categorize_labels(self, labels: dict[str, Any]) -> tuple[list[tuple[str, Any]], list[tuple[str, Any]]]: """Categorize labels into important and other groups.""" important_labels = [] other_labels = [] important_prefixes = ["com.docker.compose", "traefik", "org.label-schema"] for key, value in labels.items(): if any(prefix in key for prefix in important_prefixes): important_labels.append((key, value)) else: other_labels.append((key, value)) return important_labels, other_labels def _format_container_details( self, container_info: dict[str, Any], container_id: str ) -> list[str]: """Format detailed container information for display (legacy method - delegates to _format_container_info).""" return self._format_container_info(container_info, container_id) def _format_port_mappings(self, ports: dict[str, Any]) -> list[str]: """Format port mappings for display.""" lines = ["Port Mappings:"] for container_port, host_mappings in ports.items(): if host_mappings: for mapping in host_mappings: host_ip = mapping.get("HostIp", "0.0.0.0") # nosec B104 - Docker port mapping host_port = mapping.get("HostPort", "") lines.append(f" {host_ip}:{host_port} -> {container_port}") else: lines.append(f" {container_port} (not exposed)") return lines def _format_pull_progress(self, image_name: str, host_id: str, phase: str) -> str: """Format image pull progress with visual indicators.""" if phase == "starting": return f"◐ Starting image pull: {image_name}\n → Host: {host_id}\n → Status: Initiating pull operation..." return f"◐ Pulling {image_name} on {host_id}" def _format_pull_success(self, result: dict[str, Any], image_name: str, host_id: str) -> str: """Format successful image pull with detailed feedback.""" message = result.get("message", "Image pulled successfully") # Extract useful information from result size = result.get("size", "") digest = result.get("digest", "") layers = result.get("layers", 0) formatted_lines = [ f"✅ Image pull completed: {image_name}", f" → Host: {host_id}", f" → Status: {message}" ] # Add additional details if available if size: formatted_lines.append(f" → Size: {size}") if layers and layers > 0: formatted_lines.append(f" → Layers: {layers}") if digest: formatted_lines.append(f" → Digest: {digest}") formatted_lines.append(" ✓ Ready for use") return "\n".join(formatted_lines) def _format_pull_error(self, result: dict[str, Any], image_name: str, host_id: str) -> str: """Format image pull error with helpful context.""" error_msg = result.get("message", result.get("error", "Unknown error")) formatted_lines = [ f"❌ Image pull failed: {image_name}", f" → Host: {host_id}", f" → Error: {error_msg}" ] # Add troubleshooting hints based on error type if "not found" in error_msg.lower(): formatted_lines.extend([ "", "Troubleshooting:", " • Verify the image name and tag are correct", " • Check if the image exists in the registry", " • Ensure you have access to the registry" ]) elif "permission" in error_msg.lower() or "unauthorized" in error_msg.lower(): formatted_lines.extend([ "", "Troubleshooting:", " • Check Docker registry authentication", " • Verify access permissions for the image", " • Try logging in to the registry first" ]) elif "network" in error_msg.lower() or "timeout" in error_msg.lower(): formatted_lines.extend([ "", "Troubleshooting:", " • Check network connectivity to the registry", " • Verify DNS resolution", " • Try again - network issues may be temporary" ]) return "\n".join(formatted_lines) def _format_container_logs(self, logs: list[str], container_id: str, host_id: str, lines_requested: int, truncated: bool) -> str: """Format container logs with enhanced structure and visual indicators.""" if not logs: return f"📝 No logs found for {container_id} on {host_id}" # Create header with status indicators status_icon = "⚠️" if truncated else "📝" header_lines = [ f"{status_icon} Container Logs: {container_id}", f" → Host: {host_id}", f" → Lines: {len(logs)}/{lines_requested}" + (" (truncated)" if truncated else " (complete)"), "─" * 60 ] # Process log lines with optional enhancements processed_logs = [] for i, log_line in enumerate(logs): # Add line numbers for better reference (optional, only for short logs) if len(logs) <= 50: processed_logs.append(f"{i+1:3d} | {log_line}") else: processed_logs.append(log_line) # Add footer with additional information footer_lines = [ "─" * 60 ] if truncated: footer_lines.append("⚠️ Logs were truncated - use a larger 'lines' parameter to see more") footer_lines.append(f"🔍 Use 'docker_container logs {container_id} --lines <N>' for more control") all_lines = header_lines + processed_logs + footer_lines return "\n".join(all_lines) def _format_operation_result(self, result: dict[str, Any], operation: str, context: dict[str, Any]) -> str: """Format consistent operation results for start/stop/restart responses with visual indicators.""" host_id = context.get("host_id", "unknown") container_id = context.get("container_id", "unknown") if result.get("success"): # Success formatting with operation-specific icons and messages operation_details = { "start": { "icon": "▶️", "action": "started", "next_steps": [ "Check logs: docker_container logs", "Monitor status: docker_container info", "View ports: docker_hosts ports" ] }, "stop": { "icon": "⏹️", "action": "stopped", "next_steps": [ "Restart if needed: docker_container start", "Remove if done: docker_container remove" ] }, "restart": { "icon": "🔄", "action": "restarted", "next_steps": [ "Check logs: docker_container logs", "Verify functionality: docker_container info" ] }, "pause": { "icon": "⏸️", "action": "paused", "next_steps": [ "Resume with: docker_container unpause" ] }, "unpause": { "icon": "▶️", "action": "resumed", "next_steps": [ "Check status: docker_container info" ] }, "remove": { "icon": "🗑️", "action": "removed", "next_steps": [ "Deploy new instance if needed" ] } } details = operation_details.get(operation, { "icon": "✅", "action": f"{operation}ped", "next_steps": [] }) formatted_lines = [ f"{details['icon']} Container {details['action']}: {container_id}", f" → Host: {host_id}", f" → Operation: {operation.title()}", ] # Add timing information if available if result.get("duration"): formatted_lines.append(f" → Duration: {result['duration']}s") # Add next steps if details["next_steps"]: formatted_lines.extend(["", "Next steps:"] + [f" • {step}" for step in details["next_steps"]]) return "\n".join(formatted_lines) else: # Error formatting with troubleshooting hints error_msg = result.get("message", result.get("error", "Unknown error")) formatted_lines = [ f"❌ Container {operation} failed: {container_id}", f" → Host: {host_id}", f" → Error: {error_msg}" ] # Add operation-specific troubleshooting if operation == "start": formatted_lines.extend([ "", "Troubleshooting:", " • Check if container is already running", " • Verify port conflicts: docker_hosts ports", " • Check container logs for errors", " • Ensure sufficient resources (CPU, memory)" ]) elif operation == "stop": formatted_lines.extend([ "", "Troubleshooting:", " • Try with force flag if container is unresponsive", " • Check if container is already stopped", " • Verify container exists: docker_container info" ]) elif operation == "restart": formatted_lines.extend([ "", "Troubleshooting:", " • Try stop then start separately", " • Check container health before restart", " • Verify no resource conflicts" ]) return "\n".join(formatted_lines) async def manage_container( self, host_id: str, container_id: str, action: str, force: bool = False, timeout: int = 10 ) -> ToolResult: """Unified container action management.""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Safety check for production containers is_safe, safety_msg = self._validate_container_safety(container_id) if not is_safe: self.logger.warning( "Container operation blocked by safety check", host_id=host_id, container_id=container_id, action=action, reason=safety_msg, ) return ToolResult( content=[TextContent(type="text", text=f"⚠️ {safety_msg}")], structured_content={ "success": False, "error": safety_msg, "safety_blocked": True, }, ) # Use container tools to manage container result = await self.container_tools.manage_container( host_id, container_id, action, force, timeout ) # Enhance response with operation context and user-friendly formatting enhanced_result = self._enhance_operation_result(result, host_id, container_id, action) # Use new _format_operation_result for consistent formatting context = {"host_id": host_id, "container_id": container_id} formatted_text = self._format_operation_result(enhanced_result, action, context) enhanced_result["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=enhanced_result, ) except Exception as e: self.logger.error( "Failed to manage container", host_id=host_id, container_id=container_id, action=action, error=str(e), ) formatted_text = f"❌ Failed to {action} container: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), HOST_ID: host_id, CONTAINER_ID: container_id, "action": action, "formatted_output": formatted_text, }, ) async def pull_image(self, host_id: str, image_name: str) -> ToolResult: """Pull a Docker image on a remote host with enhanced progress indicators.""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Enhanced formatting for pull operation with progress indicators formatted_text = self._format_pull_progress(image_name, host_id, "starting") # Use container tools to pull image result = await self.container_tools.pull_image(host_id, image_name) if result["success"]: formatted_text = self._format_pull_success(result, image_name, host_id) result = dict(result) result["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=result, ) else: formatted_text = self._format_pull_error(result, image_name, host_id) result = dict(result) result["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=result, ) except Exception as e: self.logger.error( "Failed to pull image", host_id=host_id, image_name=image_name, error=str(e), ) formatted_text = f"❌ Image pull failed: {image_name}\n Host: {host_id}\n Error: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), HOST_ID: host_id, "image_name": image_name, "formatted_output": formatted_text, }, ) async def list_host_ports(self, host_id: str) -> ToolResult: """List all ports currently in use by containers on a Docker host (includes stopped containers).""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use container tools to get port information (always include stopped containers) result = await self.container_tools.list_host_ports(host_id) # Extract data from the response structure data = result.get("data", {}) summary_lines = self._format_port_usage_summary(result, host_id) formatted_text = "\n".join(summary_lines) return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": True, HOST_ID: host_id, "total_ports": data.get("total_ports", 0), "total_containers": data.get("total_containers", 0), "port_mappings": data.get("port_mappings", []), "conflicts": data.get("conflicts", []), "summary": data.get("summary", {}), "cached": result.get("cached", False), "timestamp": result.get("timestamp"), "formatted_output": formatted_text, }, ) except Exception as e: self.logger.error("Failed to list host ports", host_id=host_id, error=str(e)) formatted_text = f"❌ Failed to list host ports: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), HOST_ID: host_id, "formatted_output": formatted_text, }, ) def _format_port_usage_summary(self, result: dict[str, Any], host_id: str) -> list[str]: """Format comprehensive port usage summary.""" data = result.get("data", {}) port_mappings = data.get("port_mappings", []) conflicts = data.get("conflicts", []) summary = data.get("summary", {}) summary_lines = [ f"Port Usage on {host_id}", f"Found {data.get('total_ports', 0)} exposed ports across {data.get('total_containers', 0)} containers", "", ] # Show summary statistics if summary.get("protocol_counts"): protocol_info = ", ".join( [f"{protocol}: {count}" for protocol, count in summary["protocol_counts"].items()] ) summary_lines.append(f"Protocols: {protocol_info}") if summary.get("port_range_usage"): ranges = summary["port_range_usage"] range_info = f"System: {ranges.get('0-1023', 0)}, User: {ranges.get('1024-49151', 0)}, Dynamic: {ranges.get('49152-65535', 0)}" summary_lines.append(f"Port ranges: {range_info}") if conflicts: summary_lines.append(f"⚠️ {len(conflicts)} port conflicts detected!") summary_lines.append("") # Show port conflicts first (if any) if conflicts: summary_lines.extend(self._format_port_conflicts(conflicts)) # Show all port mappings if port_mappings: summary_lines.extend(self._format_port_mapping_details(port_mappings)) else: summary_lines.append("No exposed ports found.") # Add helpful notes if conflicts: summary_lines.extend( [ "", "Note: Port conflicts occur when multiple containers try to bind to the same host port.", "Only one container can successfully bind - others may fail to start or function incorrectly.", ] ) return summary_lines def _format_port_conflicts(self, conflicts: list[dict[str, Any]]) -> list[str]: """Format port conflict information.""" lines = ["PORT CONFLICTS:"] for conflict in conflicts: host_port = conflict["host_port"] protocol = conflict["protocol"] host_ip = conflict["host_ip"] containers = conflict["affected_containers"] lines.append(f"❌ {host_ip}:{host_port}/{protocol} used by: {', '.join(containers)}") lines.append("") return lines def _format_port_mapping_details(self, port_mappings: list[dict[str, Any]]) -> list[str]: """Format port mapping information grouped by container for efficiency.""" if not port_mappings: return ["No exposed ports found."] lines = ["PORT MAPPINGS:"] # Group ports by container for efficient display by_container = {} conflicts_found = [] for mapping in port_mappings: container_key = mapping.get("container_name", "unknown") if container_key not in by_container: by_container[container_key] = { "ports": [], "compose_project": mapping.get("compose_project", ""), "container_id": mapping.get("container_id", ""), } # Format: host_port→container_port/protocol using safe defaults host_port = mapping.get("host_port", "") container_port = mapping.get("container_port", "") protocol = mapping.get("protocol", "") port_str = f"{host_port}→{container_port}/{protocol}" if mapping.get("is_conflict", False): port_str = f"⚠️{port_str}" conflicts_found.append(f"{host_port}/{protocol}") by_container[container_key]["ports"].append(port_str) # Display grouped by container for container_name, container_data in sorted(by_container.items()): ports_str = ", ".join(container_data["ports"]) project_info = ( f" [{container_data['compose_project']}]" if container_data["compose_project"] else "" ) lines.append(f" {container_name}{project_info}: {ports_str}") # Add conflicts summary if any if conflicts_found: lines.append("") lines.append(f"⚠️ Conflicts detected on ports: {', '.join(conflicts_found)}") return lines async def check_port_availability(self, host_id: str, port: int) -> ToolResult: """Check if a specific port is available on a host. Args: host_id: Host identifier to check port: Port number to check Returns: Port availability information """ try: # Validate host validation_result = self._validate_port_check_host(host_id) if validation_result is not None: return validation_result # Get port usage data port_data = await self._get_port_usage_data(host_id) if "error" in port_data: return self._build_port_check_error_toolresult( host_id, port, port_data["error"] ) # Find conflicts conflicts = self._find_port_conflicts(port_data.get("port_mappings", []), port) is_available = len(conflicts) == 0 # Build response response = self._build_port_availability_response( host_id, port, is_available, conflicts ) formatted_output = response.get("formatted_output", "") return ToolResult( content=[TextContent(type="text", text=formatted_output)], structured_content=response, ) except Exception as e: return self._handle_port_check_error(host_id, port, e) def _validate_port_check_host(self, host_id: str) -> ToolResult | None: """Validate host for port checking.""" is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: formatted_output = f"❌ Port check failed: {error_msg}" return ToolResult( content=[TextContent(type="text", text=formatted_output)], structured_content={ "success": False, "error": error_msg, "formatted_output": formatted_output, }, ) return None async def _get_port_usage_data(self, host_id: str) -> dict[str, Any]: """Get current port usage data for the host.""" return await self.container_tools.list_host_ports(host_id) def _find_port_conflicts(self, port_mappings: list[dict[str, Any]], port: int) -> list[dict[str, Any]]: """Find conflicts for the specified port.""" conflicts = [] for mapping in port_mappings: if mapping.get("host_port") == str(port): conflicts.append( { "container_name": mapping.get("container_name"), CONTAINER_ID: mapping.get(CONTAINER_ID), "image": mapping.get("image"), "protocol": mapping.get("protocol", "tcp"), } ) return conflicts def _build_port_availability_response( self, host_id: str, port: int, is_available: bool, conflicts: list[dict[str, Any]] ) -> dict[str, Any]: """Build port availability response.""" response: dict[str, Any] = { "success": True, HOST_ID: host_id, "port": port, "available": is_available, "conflicts": conflicts, "message": f"Port {port} is {'available' if is_available else 'in use'}", } response["formatted_output"] = self._format_port_availability_output( host_id, port, is_available, conflicts ) return response def _format_port_availability_output( self, host_id: str, port: int, is_available: bool, conflicts: list[dict[str, Any]] ) -> str: """Format port availability output.""" if is_available: return f"Port {port} is available on {host_id}" formatted_lines = [f"Port {port} is in use on {host_id}"] conflicts_preview = self._format_conflicts_preview(conflicts) if conflicts_preview: formatted_lines.append("Conflicts:") formatted_lines.extend(conflicts_preview) return "\n".join(formatted_lines) def _format_conflicts_preview(self, conflicts: list[dict[str, Any]]) -> list[str]: """Format conflicts preview for display.""" if not conflicts: return [] conflicts_preview = [] for conflict in conflicts: name = conflict.get("container_name", "unknown") protocol = conflict.get("protocol", "tcp").upper() conflicts_preview.append(f" • {name} ({protocol})") return conflicts_preview def _build_port_check_error_toolresult( self, host_id: str, port: int, error_message: str ) -> ToolResult: formatted_output = f"❌ Port check failed: {error_message}" return ToolResult( content=[TextContent(type="text", text=formatted_output)], structured_content={ "success": False, "error": error_message, HOST_ID: host_id, "port": port, "formatted_output": formatted_output, }, ) def _handle_port_check_error(self, host_id: str, port: int, error: Exception) -> ToolResult: """Handle port check errors.""" self.logger.error( "Failed to check port availability", host_id=host_id, port=port, error=str(error) ) return self._build_port_check_error_toolresult(host_id, port, str(error)) async def handle_action(self, action, **params) -> dict[str, Any]: """Unified action handler for all container operations. This method consolidates all dispatcher logic from server.py into the service layer. """ try: # Import dependencies for this handler from ..models.enums import ContainerAction # Extract common parameters host_id = params.get("host_id", "") container_id = params.get("container_id", "") image_name = params.get("image_name", "") all_containers = params.get("all_containers", False) limit = params.get("limit", 20) offset = params.get("offset", 0) follow = params.get("follow", False) lines = params.get("lines", 100) force = params.get("force", False) timeout = params.get("timeout", 10) # Route to appropriate handler if action == ContainerAction.LIST: return await self._handle_list_action(host_id, all_containers, limit, offset) elif action == ContainerAction.INFO: return await self._handle_info_action(host_id, container_id) elif action in [ ContainerAction.START, ContainerAction.STOP, ContainerAction.RESTART, ContainerAction.REMOVE, ]: return await self._handle_management_actions( action, host_id, container_id, force, timeout ) elif action == ContainerAction.LOGS: return await self._handle_logs_action(host_id, container_id, lines, follow) elif action == ContainerAction.PULL or (isinstance(action, str) and action == "pull"): return await self._handle_pull_action(host_id, image_name or container_id) else: return self._handle_unknown_action(action) except Exception as e: self.logger.error("container service action error", action=action, error=str(e)) return {"success": False, "error": f"Service action failed: {str(e)}", "action": action} async def _handle_list_action( self, host_id: str, all_containers: bool, limit: int, offset: int ) -> dict[str, Any]: """Handle list container action.""" if not host_id: return self._build_error_response( host_id="", container_id=None, action="list", error=ValueError("host_id missing"), message="host_id is required for list action", ) # Validate pagination parameters if limit < 1 or limit > 1000: return self._build_error_response( host_id=host_id, container_id=None, action="list", error=ValueError("invalid limit"), message="limit must be between 1 and 1000", ) if offset < 0: return self._build_error_response( host_id=host_id, container_id=None, action="list", error=ValueError("invalid offset"), message="offset must be >= 0", ) result = await self.list_containers(host_id, all_containers, limit, offset) return self._extract_structured_content(result) async def _handle_info_action(self, host_id: str, container_id: str) -> dict[str, Any]: """Handle container info action.""" if not host_id: return self._build_error_response( host_id="", container_id=None, action="info", error=ValueError("host_id missing"), message="host_id is required for info action", ) if not container_id: return self._build_error_response( host_id=host_id, container_id=None, action="info", error=ValueError("container_id missing"), message="container_id is required for info action", ) info_result = await self.get_container_info(host_id, container_id) return self._extract_structured_content(info_result) async def _handle_management_actions( self, action, host_id: str, container_id: str, force: bool, timeout: int ) -> dict[str, Any]: """Handle container management actions (start, stop, restart, etc.).""" if not host_id: return self._build_error_response( host_id="", container_id=None, action=str(action), error=ValueError("host_id missing"), message=f"host_id is required for {action} action", ) if not container_id: return self._build_error_response( host_id=host_id, container_id=None, action=str(action), error=ValueError("container_id missing"), message=f"container_id is required for {action} action", ) # Validate timeout parameter if timeout < 1 or timeout > 300: return self._build_error_response( host_id=host_id, container_id=container_id, action=str(action), error=ValueError("invalid timeout"), message="timeout must be between 1 and 300 seconds", ) result = await self.manage_container(host_id, container_id, action.value, force, timeout) return self._extract_structured_content(result) async def _handle_logs_action( self, host_id: str, container_id: str, lines: int, follow: bool ) -> dict[str, Any]: """Handle container logs action.""" if not host_id: return self._build_error_response( host_id="", container_id=None, action="logs", error=ValueError("host_id missing"), message="host_id is required for logs action", ) if not container_id: return self._build_error_response( host_id=host_id, container_id=None, action="logs", error=ValueError("container_id missing"), message="container_id is required for logs action", ) # Validate lines parameter if lines < 1 or lines > 10000: return self._build_error_response( host_id=host_id, container_id=container_id, action="logs", error=ValueError("invalid lines parameter"), message="lines must be between 1 and 10000", ) try: logs_result = await self.logs_service.get_container_logs( host_id=host_id, container_id=container_id, lines=lines, since=None, timestamps=False, ) logs: list[str] = [] truncated = False if isinstance(logs_result, dict): # Preferred shape: success response with payload under "data" if isinstance(logs_result.get("data"), dict): data = logs_result["data"] logs = data.get("logs", []) or [] truncated = data.get("truncated", False) # Legacy shape: logs returned at the top level elif "logs" in logs_result: logs = logs_result.get("logs", []) or [] truncated = logs_result.get("truncated", False) # Ensure we always return a list even if upstream gave us something unexpected if not isinstance(logs, list): logs = [] # Enhanced logs formatting with better structure and visual indicators formatted_text = self._format_container_logs(logs, container_id, host_id, lines, truncated) return { "success": True, "host_id": host_id, "container_id": container_id, "logs": logs, "lines_requested": lines, "lines_returned": len(logs), "truncated": truncated, "follow": follow, "formatted_output": formatted_text, } except Exception as e: return self._build_error_response( host_id=host_id, container_id=container_id, action="logs", error=e, message="Failed to get container logs", ) async def _handle_pull_action(self, host_id: str, image_name: str) -> dict[str, Any]: """Handle image pull action.""" if not host_id: return self._build_error_response( host_id="", container_id=None, action="pull", error=ValueError("host_id missing"), message="host_id is required for pull action", ) if not image_name: return self._build_error_response( host_id=host_id, container_id=None, action="pull", error=ValueError("image name missing"), message="image_name is required for pull action", ) result = await self.pull_image(host_id, image_name) return self._extract_structured_content(result) def _handle_unknown_action(self, action) -> dict[str, Any]: """Handle unknown action.""" formatted_text = f"❌ Unknown action: {action}" return { "success": False, "error": f"Unknown action: {action}", "valid_actions": [ "list", "info", "start", "stop", "restart", "remove", "logs", "pull", ], "formatted_output": formatted_text, } def _extract_structured_content(self, result) -> dict[str, Any]: """Extract structured content from ToolResult.""" if hasattr(result, "structured_content") and result.structured_content is not None: structured = result.structured_content if not isinstance(structured, dict): structured = dict(structured) else: structured = dict(structured) formatted_text = "" if hasattr(result, "content") and result.content: first_content = result.content[0] formatted_text = getattr(first_content, "text", "") or "" if formatted_text and "formatted_output" not in structured: structured["formatted_output"] = formatted_text if "formatted_output" in structured: formatted_value = structured["formatted_output"] ordered = {"formatted_output": formatted_value} for key, value in structured.items(): if key == "formatted_output": continue ordered[key] = value return ordered return structured return {"success": False, "error": "Invalid result format"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server