Skip to main content
Glama
host.py94.3 kB
""" Host Management Service Business logic for Docker host management operations. """ import asyncio import shlex import textwrap from pathlib import Path from typing import TYPE_CHECKING, Any, cast if TYPE_CHECKING: from docker_mcp.core.docker_context import DockerContextManager else: DockerContextManager = "DockerContextManager" import structlog from ..constants import APPDATA_PATH, COMPOSE_PATH, DOCKER_COMPOSE_WORKING_DIR, HOST_ID from ..core.config_loader import DockerHost, DockerMCPConfig, load_config, save_config from ..utils import build_ssh_command class HostService: """Service for Docker host management operations.""" def __init__( self, config: DockerMCPConfig, context_manager: "DockerContextManager | None" = None, cache_manager=None, ): self.config = config self.context_manager = context_manager self.logger = structlog.get_logger() self._config_lock = asyncio.Lock() async def add_docker_host( self, host_id: str, ssh_host: str, ssh_user: str, ssh_port: int = 22, ssh_key_path: str | None = None, description: str = "", tags: list[str] | None = None, compose_path: str | None = None, enabled: bool = True, ) -> dict[str, Any]: """Add a new Docker host for management. Args: host_id: Unique identifier for the host ssh_host: SSH hostname or IP address ssh_user: SSH username ssh_port: SSH port (default: 22) ssh_key_path: Path to SSH private key description: Human-readable description tags: Tags for host categorization compose_path: Path where compose files are stored on this host enabled: Whether the host is enabled for use Returns: Operation result """ tags = tags or [] try: host_config = DockerHost( hostname=ssh_host, user=ssh_user, port=ssh_port, identity_file=ssh_key_path, description=description, tags=tags, compose_path=compose_path, enabled=enabled, ) # Always test connection (auto-enabled) connection_tested = await self._test_ssh_connection( ssh_host, ssh_user, ssh_port, ssh_key_path ) if not connection_tested: error_message = ( f"SSH connection test failed for {ssh_user}@{ssh_host}:{ssh_port}" ) result = { "success": False, "error": error_message, HOST_ID: host_id, "hostname": ssh_host, "user": ssh_user, "port": ssh_port, "connection_tested": False, } result["formatted_output"] = self._format_error_output( f"Host add failed ({host_id})", error_message, ) return result # Add to configuration self.config.hosts[host_id] = host_config # Always save configuration changes to disk await asyncio.to_thread( save_config, self.config, getattr(self.config, "config_file", None) ) self.logger.info( "Docker host added", host_id=host_id, hostname=ssh_host, tested=connection_tested ) result = { "success": True, "message": f"Host {host_id} added successfully (SSH connection verified)", HOST_ID: host_id, "hostname": ssh_host, "user": ssh_user, "port": ssh_port, "description": description, "tags": tags, "compose_path": compose_path, "enabled": enabled, "connection_tested": connection_tested, } if ssh_key_path: result["identity_file"] = ssh_key_path result["formatted_output"] = self._format_add_host_output( host_id, ssh_host, ssh_user, ssh_port, connection_tested, tags, compose_path, description, enabled, ) return result except Exception as e: error_message = str(e) self.logger.error("Failed to add host", host_id=host_id, error=error_message) result = {"success": False, "error": error_message, HOST_ID: host_id} result["formatted_output"] = self._format_error_output( f"Host add failed ({host_id})", error_message ) return result async def list_docker_hosts(self) -> dict[str, Any]: """List all configured Docker hosts. Returns: List of host configurations """ try: hosts: list[dict[str, Any]] = [] enabled_hosts = 0 for host_id, host_config in sorted(self.config.hosts.items()): host_data = self._serialize_host_config(host_id, host_config) if host_config.enabled: enabled_hosts += 1 hosts.append(host_data) summary_lines = self._format_host_list_output(hosts, enabled_hosts) formatted_output = "\n".join(summary_lines) return { "formatted_output": formatted_output, "success": True, "hosts": hosts, "count": len(hosts), "enabled": enabled_hosts, } except Exception as e: self.logger.error("Failed to list hosts", error=str(e)) error_message = str(e) return { "formatted_output": self._format_error_output( "Host list failed", error_message ), "success": False, "error": error_message, } def validate_host_exists(self, host_id: str) -> tuple[bool, str]: """Validate that a host exists in configuration. Args: host_id: Host identifier to validate Returns: Tuple of (is_valid, error_message) """ if host_id not in self.config.hosts: return False, f"Host '{host_id}' not found" return True, "" def get_host_config(self, host_id: str) -> DockerHost | None: """Get host configuration by ID. Args: host_id: Host identifier Returns: Host configuration or None if not found """ return self.config.hosts.get(host_id) async def edit_docker_host( self, host_id: str, ssh_host: str | None = None, ssh_user: str | None = None, ssh_port: int | None = None, ssh_key_path: str | None = None, description: str | None = None, tags: list[str] | None = None, compose_path: str | None = None, appdata_path: str | None = None, enabled: bool | None = None, ) -> dict[str, Any]: """Edit an existing Docker host configuration. Args: host_id: Unique identifier for the host ssh_host: SSH hostname or IP address (optional update) ssh_user: SSH username (optional update) ssh_port: SSH port (optional update) ssh_key_path: Path to SSH private key (optional update) description: Human-readable description (optional update) tags: Tags for host categorization (optional update) compose_path: Path where compose files are stored (optional update) appdata_path: Path where container data is stored (optional update) enabled: Whether the host is enabled (optional update) Returns: Operation result """ try: if host_id not in self.config.hosts: error_message = f"Host '{host_id}' not found" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host edit failed", error_message ), } current_host = self.config.hosts[host_id] updated_config: dict[str, Any] = { "hostname": ssh_host if ssh_host is not None and ssh_host != "" else current_host.hostname, "user": ssh_user if ssh_user is not None and ssh_user != "" else current_host.user, "port": ssh_port if ssh_port is not None else current_host.port, "identity_file": ssh_key_path if ssh_key_path is not None and ssh_key_path != "" else current_host.identity_file, "description": description if description is not None and description != "" else current_host.description, "tags": tags if tags is not None else current_host.tags, COMPOSE_PATH: compose_path if compose_path is not None and compose_path != "" else current_host.compose_path, APPDATA_PATH: appdata_path if appdata_path is not None and appdata_path != "" else current_host.appdata_path, "enabled": enabled if enabled is not None else current_host.enabled, } try: new_host_config = DockerHost(**updated_config) except Exception as validation_error: error_message = f"Configuration validation failed: {validation_error}" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host edit failed", error_message ), } changes = self._calculate_host_changes(current_host, new_host_config) self.config.hosts[host_id] = new_host_config try: await asyncio.to_thread( save_config, self.config, getattr(self.config, "config_file", None) ) except Exception as save_error: self.config.hosts[host_id] = current_host error_message = f"Failed to save configuration: {save_error}" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host edit failed", error_message ), } self.logger.info("Docker host updated", host_id=host_id) host_snapshot = self._serialize_host_config(host_id, new_host_config) message = "Host {host_id} updated successfully" if changes else "Host {host_id} unchanged" message = message.format(host_id=host_id) result = { "success": True, "message": message, HOST_ID: host_id, "changes": changes, "updated_fields": list(changes.keys()), "host": host_snapshot, } result["formatted_output"] = self._format_edit_host_output( host_id, new_host_config, changes ) return result except Exception as e: error_message = str(e) self.logger.error("Failed to edit host", host_id=host_id, error=error_message) return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host edit failed", error_message ), } async def remove_docker_host(self, host_id: str) -> dict[str, Any]: """Remove a Docker host from configuration. Args: host_id: Unique identifier for the host to remove Returns: Operation result """ try: if host_id not in self.config.hosts: error_message = f"Host '{host_id}' not found" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host remove failed", error_message ), } host_config = self.config.hosts[host_id] hostname = host_config.hostname del self.config.hosts[host_id] await asyncio.to_thread( save_config, self.config, getattr(self.config, "config_file", None) ) self.logger.info("Docker host removed", host_id=host_id, hostname=hostname) result = { "success": True, "message": f"Host {host_id} ({hostname}) removed successfully", HOST_ID: host_id, "hostname": hostname, } result["formatted_output"] = self._format_remove_host_output(host_id, hostname) return result except Exception as e: error_message = str(e) self.logger.error("Failed to remove host", host_id=host_id, error=error_message) return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Host remove failed", error_message ), } async def test_connection(self, host_id: str) -> dict[str, Any]: """Test SSH connection to a Docker host. Args: host_id: Host identifier to test connection for Returns: Connection test result """ try: if host_id not in self.config.hosts: error_message = f"Host '{host_id}' not found" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Connection test failed", error_message ), } host = self.config.hosts[host_id] # Build SSH command for connection test ssh_cmd = [ "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=accept-new", ] if host.port != 22: ssh_cmd.extend(["-p", str(host.port)]) if host.identity_file: ssh_cmd.extend(["-i", host.identity_file]) ssh_cmd.append(f"{host.user}@{host.hostname}") ssh_cmd.append( "echo 'connection_test_ok' && docker version --format '{{.Server.Version}}' 2>/dev/null && docker info --format '{{.ServerVersion}}' >/dev/null 2>&1 && echo 'docker_daemon_ok' || echo 'docker_daemon_error'" ) # Execute SSH test process = await asyncio.create_subprocess_exec( *ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode().strip() error_output = stderr.decode().strip() if process.returncode == 0 and "connection_test_ok" in output: # Enhanced Docker availability and daemon checks docker_version = None docker_daemon_accessible = "docker_daemon_ok" in output docker_version_available = "docker_daemon_error" not in output # Extract Docker version if available lines = output.split("\n") for line in lines: if line and line not in ["connection_test_ok", "docker_daemon_ok", "docker_daemon_error"]: docker_version = line.strip() break # Determine overall Docker status if docker_daemon_accessible and docker_version: docker_status = "fully_available" docker_message = "Docker daemon is running and accessible" elif docker_version_available and docker_version: docker_status = "version_only" docker_message = "Docker installed but daemon may not be accessible" else: docker_status = "not_available" docker_message = "Docker not found or not accessible" result = { "success": True, "message": "SSH connection successful", HOST_ID: host_id, "hostname": host.hostname, "port": host.port, "docker_available": docker_version is not None, "docker_daemon_accessible": docker_daemon_accessible, "docker_version": docker_version, "docker_status": docker_status, "docker_message": docker_message, } result["formatted_output"] = self._format_test_connection_output( host_id, host.hostname, host.port, docker_status, docker_version, docker_message, ) return result else: # Enhanced SSH error handling with specific guidance detailed_error = self._analyze_ssh_error(error_output, process.returncode or 0, host) error_message = detailed_error["error"] result = { "success": False, "error": error_message, "error_type": detailed_error["error_type"], "troubleshooting_guidance": detailed_error["guidance"], HOST_ID: host_id, "hostname": host.hostname, "port": host.port, } result["formatted_output"] = self._format_error_output( "Connection test failed", error_message, detailed_error.get("guidance"), ) return result except Exception as e: error_message = f"Connection test failed: {str(e)}" return { "success": False, "error": error_message, HOST_ID: host_id, "formatted_output": self._format_error_output( "Connection test failed", error_message ), } async def discover_host_capabilities(self, host_id: str) -> dict[str, Any]: """Discover host capabilities including paths. Args: host_id: Host identifier to discover capabilities for Returns: Discovery results with recommendations """ try: # Force reload configuration from disk to avoid stale in-memory state await self._reload_config(host_id) # Check if host exists if host_id not in self.config.hosts: return {"success": False, "error": f"Host '{host_id}' not found", HOST_ID: host_id} host = self.config.hosts[host_id] # Run discovery operations in parallel with timeout discovery_results = await self._run_parallel_discovery(host, host_id) if "error" in discovery_results: return discovery_results # Process discovery results compose_result, appdata_result = self._process_discovery_results( discovery_results ) # Compile base capabilities capabilities = { "success": True, HOST_ID: host_id, "compose_discovery": compose_result, "appdata_discovery": appdata_result, "recommendations": [], } # Generate recommendations await self._generate_recommendations( capabilities, compose_result, appdata_result, host_id ) # Add overall guidance if needed self._add_overall_guidance( capabilities, compose_result, appdata_result, host_id ) self.logger.info( "Host capabilities discovered", host_id=host_id, compose_paths_found=len(compose_result["paths"]), appdata_paths_found=len(appdata_result["paths"]), ) return capabilities except Exception as e: self.logger.error("Failed to discover host capabilities", host_id=host_id, error=str(e)) return { "success": False, "error": f"Discovery failed: {str(e)}", HOST_ID: host_id, } async def _reload_config(self, host_id: str) -> None: """Reload configuration from disk to avoid stale in-memory state.""" try: config_file_path = getattr(self.config, "config_file", None) fresh_config = await asyncio.to_thread(load_config, config_file_path) async with self._config_lock: self.config = fresh_config self.logger.info( "Reloaded configuration from disk before discovery", host_id=host_id, config_file_path=config_file_path, ) except Exception as reload_error: self.logger.warning( "Failed to reload config from disk, using in-memory config", host_id=host_id, error=str(reload_error), ) async def _run_parallel_discovery(self, host, host_id: str) -> dict[str, Any]: """Run discovery operations in parallel with timeout.""" try: results = await asyncio.wait_for( asyncio.gather( self._discover_compose_paths(host), self._discover_appdata_paths(host), return_exceptions=True, ), timeout=30.0, # 30 second timeout per host ) return {"results": results} except TimeoutError: self.logger.warning(f"Discovery timed out for host {host_id}") return { "success": False, "error": "Discovery timed out after 30 seconds", HOST_ID: host_id, } def _process_discovery_results( self, discovery_data: dict[str, Any] ) -> tuple[dict[str, Any], dict[str, Any]]: """Process discovery results into structured data.""" results = discovery_data["results"] compose_result: dict[str, Any] = ( {**results[0], "success": True} if not isinstance(results[0], Exception) else {"paths": [], "recommended": None, "success": False} ) appdata_result: dict[str, Any] = ( {**results[1], "success": True} if not isinstance(results[1], Exception) else {"paths": [], "recommended": None, "success": False} ) return compose_result, appdata_result async def _generate_recommendations( self, capabilities: dict[str, Any], compose_result: dict[str, Any], appdata_result: dict[str, Any], host_id: str, ) -> None: """Generate configuration recommendations.""" # Add compose path recommendation if compose_result["recommended"]: capabilities["recommendations"].append( { "type": COMPOSE_PATH, "message": f"Set compose_path to '{compose_result['recommended']}'", "value": compose_result["recommended"], } ) # Add appdata path recommendation if appdata_result["recommended"]: capabilities["recommendations"].append( { "type": APPDATA_PATH, "message": f"Set appdata_path to '{appdata_result['recommended']}'", "value": appdata_result["recommended"], } ) def _add_overall_guidance( self, capabilities: dict[str, Any], compose_result: dict[str, Any], appdata_result: dict[str, Any], host_id: str, ) -> None: """Add overall guidance if discovery found nothing useful.""" total_paths_found = len(cast(list, compose_result["paths"])) + len( cast(list, appdata_result["paths"]) ) has_useful_discovery = ( total_paths_found > 0 or len(cast(list, capabilities["recommendations"])) > 0 ) if not has_useful_discovery: capabilities["overall_guidance"] = ( "Discovery found no automatic configuration. This is common for:\n" "• New hosts with no containers yet\n" "• Hosts using Docker volumes instead of bind mounts\n" "• Custom deployment methods\n\n" "Next steps:\n" f"1. Deploy containers: docker_compose deploy {host_id} <stack_name>\n" f"2. Manually configure paths: docker_hosts edit {host_id} compose_path /path appdata_path /path\n" "3. Check the guidance in compose_discovery and appdata_discovery for specific suggestions" ) async def discover_all_hosts(self) -> dict[str, Any]: """Discover capabilities for all configured hosts. Returns: Discovery results for all hosts with summary """ try: discovery_tasks = [] host_ids = [] # Create discovery tasks for all enabled hosts for host_id, host_config in self.config.hosts.items(): if host_config.enabled: discovery_tasks.append(self.discover_host_capabilities(host_id)) host_ids.append(host_id) if not host_ids: return { "success": True, "action": "discover_all", "total_hosts": 0, "message": "No enabled hosts to discover", "discoveries": {}, } self.logger.info( "Starting discovery for all hosts", total_hosts=len(host_ids), host_ids=host_ids ) # Run all discoveries in parallel with timeout try: results = await asyncio.wait_for( asyncio.gather(*discovery_tasks, return_exceptions=True), timeout=60.0, # 60 second timeout for all discoveries ) except TimeoutError: self.logger.warning("Discovery operation timed out after 60 seconds") # Return partial results - what we have so far results = [Exception("Discovery timed out") for _ in host_ids] # Compile results discoveries = {} successful_discoveries = 0 failed_discoveries = 0 for host_id, result in zip(host_ids, results, strict=False): if isinstance(result, Exception): discoveries[host_id] = { "success": False, "error": str(result), HOST_ID: host_id, } failed_discoveries += 1 self.logger.error("Host discovery failed", host_id=host_id, error=str(result)) else: result = cast(dict[str, Any], result) discoveries[host_id] = result if result.get("success", False): successful_discoveries += 1 else: failed_discoveries += 1 self.logger.info( "Discovery completed for all hosts", total_hosts=len(host_ids), successful=successful_discoveries, failed=failed_discoveries, ) return { "success": True, "action": "discover_all", "total_hosts": len(host_ids), "successful_discoveries": successful_discoveries, "failed_discoveries": failed_discoveries, "discoveries": discoveries, "summary": f"Discovered {successful_discoveries}/{len(host_ids)} hosts successfully", } except Exception as e: self.logger.error("Failed to discover all hosts", error=str(e)) return { "success": False, "error": f"Discovery failed: {str(e)}", "action": "discover_all", } async def discover_all_hosts_sequential(self) -> dict[str, Any]: """Discover capabilities for all hosts SEQUENTIALLY to avoid timeouts. Processes hosts one at a time to prevent SSH channel exhaustion and avoid parallel processing overload that causes timeouts. Returns: Discovery results for all hosts with summary """ try: self.logger.info( "Starting sequential discovery for all hosts", total_hosts=len(self.config.hosts) ) # Collect enabled hosts first enabled_hosts = self._collect_enabled_hosts() if not enabled_hosts: return self._create_empty_discovery_result() # Process each host sequentially ( discoveries, successful_discoveries, failed_discoveries, ) = await self._process_hosts_sequentially(enabled_hosts) # Calculate summary statistics and return results discovery_stats = self._calculate_discovery_statistics(discoveries) return self._create_discovery_summary( enabled_hosts, successful_discoveries, failed_discoveries, discoveries, discovery_stats, ) except Exception as e: self.logger.error("Sequential discovery failed", error=str(e)) return { "success": False, "error": f"Sequential discovery failed: {str(e)}", "action": "discover_all", } def _collect_enabled_hosts(self) -> list[str]: """Collect list of enabled hosts.""" enabled_hosts = [] for host_id, host_config in self.config.hosts.items(): if host_config.enabled: enabled_hosts.append(host_id) return enabled_hosts def _create_empty_discovery_result(self) -> dict[str, Any]: """Create result for when no enabled hosts are found.""" return { "success": True, "action": "discover_all", "total_hosts": 0, "successful_discoveries": 0, "failed_discoveries": 0, "discoveries": {}, "summary": "No enabled hosts to discover", } async def _process_hosts_sequentially( self, enabled_hosts: list[str] ) -> tuple[dict[str, Any], int, int]: """Process each host discovery sequentially.""" discoveries = {} successful_discoveries = 0 failed_discoveries = 0 for i, host_id in enumerate(enabled_hosts, 1): self.logger.info(f"Starting discovery for host {host_id} ({i}/{len(enabled_hosts)})") result, success = await self._process_single_host_discovery(host_id) discoveries[host_id] = result if success: successful_discoveries += 1 self.logger.info(f"Discovery completed successfully for host {host_id}") else: failed_discoveries += 1 self.logger.warning( f"Discovery failed for host {host_id}: {result.get('error', 'Unknown error')}" ) return discoveries, successful_discoveries, failed_discoveries async def _process_single_host_discovery(self, host_id: str) -> tuple[dict[str, Any], bool]: """Process discovery for a single host with error handling.""" try: result = await asyncio.wait_for( self.discover_host_capabilities(host_id), timeout=30.0, # 30 seconds per host ) return result, result.get("success", False) except TimeoutError: error_msg = "Discovery timed out after 30 seconds" self.logger.error(f"Discovery timed out for host {host_id}") return {"success": False, "error": error_msg, HOST_ID: host_id}, False except Exception as e: error_msg = str(e) self.logger.error(f"Discovery failed for host {host_id}: {error_msg}") return {"success": False, "error": error_msg, HOST_ID: host_id}, False def _calculate_discovery_statistics(self, discoveries: dict[str, Any]) -> dict[str, int]: """Calculate summary statistics from discovery results.""" total_recommendations = 0 total_paths_found = 0 for discovery in discoveries.values(): if discovery.get("success") and discovery.get("recommendations"): total_recommendations += len(discovery["recommendations"]) if discovery.get("compose_discovery", {}).get("paths"): total_paths_found += len(discovery["compose_discovery"]["paths"]) if discovery.get("appdata_discovery", {}).get("paths"): total_paths_found += len(discovery["appdata_discovery"]["paths"]) return { "total_recommendations": total_recommendations, "total_paths_found": total_paths_found, } def _create_discovery_summary( self, enabled_hosts: list[str], successful: int, failed: int, discoveries: dict[str, Any], stats: dict[str, int], ) -> dict[str, Any]: """Create comprehensive discovery results summary.""" return { "success": True, "action": "discover_all", "total_hosts": len(enabled_hosts), "successful_discoveries": successful, "failed_discoveries": failed, "discoveries": discoveries, "summary": f"Discovered {successful}/{len(enabled_hosts)} hosts successfully", "discovery_summary": {"total_hosts_discovered": len(discoveries), **stats}, } async def _discover_compose_paths(self, host: DockerHost) -> dict[str, Any]: """Discover Docker Compose file locations from running containers.""" try: # Use SSH for compose path discovery return await self._discover_compose_paths_ssh(host) except Exception as e: self.logger.error("Compose path discovery failed", host_id=host.hostname, error=str(e)) return {"success": False, "paths": [], "recommended": None, "error": str(e)} async def _discover_compose_paths_ssh(self, host: DockerHost) -> dict[str, Any]: """Discover compose paths using SSH (fallback method).""" try: ssh_cmd = build_ssh_command(host) # Get compose working directories from all containers with compose labels inspect_cmd = ssh_cmd + [ f"docker ps -aq --no-trunc | xargs -r docker inspect --format '{{{{index .Config.Labels \"{DOCKER_COMPOSE_WORKING_DIR}\"}}}}' 2>/dev/null | grep -v '^$' | sort | uniq" ] process = await asyncio.create_subprocess_exec( *inspect_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() if process.returncode == 0 and stdout: # Extract unique compose directories compose_dirs = [d.strip() for d in stdout.decode().strip().split("\n") if d.strip()] if compose_dirs: # Find common base path by counting occurrences path_counts = {} for compose_dir in compose_dirs: # Get parent directory (where compose files typically live) parent_dir = str(Path(compose_dir).parent) path_counts[parent_dir] = path_counts.get(parent_dir, 0) + 1 # Recommend the path with most compose projects recommended = ( max(path_counts.items(), key=lambda x: x[1])[0] if path_counts else None ) return { "success": True, "paths": list(path_counts.keys()), "recommended": recommended, } # Fallback to file system search if no running containers with compose labels fallback_cmd = ssh_cmd + [ "find /opt /srv /home /mnt -maxdepth 3 \\( -name 'docker-compose.*' -o -name 'compose.*' \\) 2>/dev/null | head -10" ] process = await asyncio.create_subprocess_exec( *fallback_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() if process.returncode == 0 and stdout: compose_files = stdout.decode().strip().split("\n") directories = list(set(str(Path(f).parent) for f in compose_files if f.strip())) recommended = self._recommend_compose_path(directories) return {"success": True, "paths": directories, "recommended": recommended} return {"success": True, "paths": [], "recommended": None} except Exception as e: self.logger.warning( "Failed to discover compose paths", hostname=host.hostname, error=str(e) ) return {"success": True, "paths": [], "recommended": None} async def _discover_appdata_paths(self, host: DockerHost) -> dict[str, Any]: """Discover appdata/volume storage locations from container bind mounts.""" try: # Use SSH for appdata path discovery return await self._discover_appdata_paths_ssh(host) except Exception as e: self.logger.error("Appdata path discovery failed", host_id=host.hostname, error=str(e)) return {"success": False, "paths": [], "recommended": None, "error": str(e)} async def _discover_appdata_paths_ssh(self, host: DockerHost) -> dict[str, Any]: """Discover appdata paths using SSH (fallback method).""" try: ssh_cmd = build_ssh_command(host) # First try to discover from container bind mounts result = await self._discover_from_bind_mounts(ssh_cmd) if result: return result # Fallback to checking common appdata locations return await self._discover_from_common_paths(ssh_cmd) except Exception as e: self.logger.warning( "Failed to discover appdata paths", hostname=host.hostname, error=str(e) ) return {"success": True, "paths": [], "recommended": None} async def _discover_from_bind_mounts(self, ssh_cmd: list[str]) -> dict[str, Any] | None: """Discover appdata paths by analyzing container bind mounts.""" inspect_cmd = ssh_cmd + [ "docker ps -aq --no-trunc | xargs -r docker inspect --format '{{range .Mounts}}{{if eq .Type \"bind\"}}{{.Source}}{{\"\\n\"}}{{end}}{{end}}' 2>/dev/null | grep -v '^$' | sort | uniq" ] process = await asyncio.create_subprocess_exec( *inspect_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() if process.returncode == 0 and stdout: bind_mounts = [m.strip() for m in stdout.decode().strip().split("\n") if m.strip()] if bind_mounts: base_path_counts = self._analyze_bind_mount_paths(bind_mounts) if base_path_counts: # Recommend the path with most mounted volumes recommended = max(base_path_counts.items(), key=lambda x: x[1])[0] return { "success": True, "paths": list(base_path_counts.keys()), "recommended": recommended, } return None def _analyze_bind_mount_paths(self, bind_mounts: list[str]) -> dict[str, int]: """Analyze bind mount paths to find common base paths.""" base_path_counts = {} for mount_path in bind_mounts: # Skip system paths and temporary mounts if mount_path.startswith(("/proc", "/sys", "/dev", "/tmp", "/var/run")): continue # Find potential base appdata paths path_parts = Path(mount_path).parts for i in range(2, min(5, len(path_parts))): # Check 2-4 levels deep potential_base = str(Path(*path_parts[:i])) if potential_base not in ["/", "/home", "/opt", "/srv", "/mnt"]: base_path_counts[potential_base] = base_path_counts.get(potential_base, 0) + 1 return base_path_counts async def _discover_from_common_paths(self, ssh_cmd: list[str]) -> dict[str, Any]: """Discover appdata paths by checking common locations.""" search_paths = [ "/opt/appdata", "/srv/docker", "/data", "/mnt/appdata", "/mnt/docker", "/opt/docker-data", ] existing_paths = [] for path in search_paths: if await self._test_path_exists_writable(ssh_cmd, path): existing_paths.append(path) recommended = existing_paths[0] if existing_paths else None return {"success": True, "paths": existing_paths, "recommended": recommended} async def _test_path_exists_writable(self, ssh_cmd: list[str], path: str) -> bool: """Test if a path exists and is writable.""" test_cmd = ssh_cmd + [ f"test -d {shlex.quote(path)} && test -w {shlex.quote(path)} && echo {shlex.quote(path)}" ] process = await asyncio.create_subprocess_exec( *test_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() return process.returncode == 0 and bool(stdout.strip()) def _recommend_compose_path(self, paths: list[str]) -> str | None: """Recommend the best compose path from discovered options using smart detection.""" if not paths: return None # If only one path, return it if len(paths) == 1: return paths[0] # Prefer persistent storage paths over system paths persistent_storage_paths = [] system_paths = [] for path in paths: # Persistent storage locations (more reliable for user data) if any(path.startswith(prefix) for prefix in ["/mnt/", "/data/", "/srv/"]): persistent_storage_paths.append(path) # System paths (less preferred for user data) elif any(path.startswith(prefix) for prefix in ["/opt/", "/home/"]): system_paths.append(path) else: # Unknown path, treat as persistent storage persistent_storage_paths.append(path) # Prefer persistent storage paths if persistent_storage_paths: return persistent_storage_paths[0] # Fall back to system paths if system_paths: return system_paths[0] # Final fallback return paths[0] def _analyze_ssh_error(self, error_output: str, return_code: int, host: DockerHost) -> dict[str, Any]: """Analyze SSH error output and provide specific troubleshooting guidance.""" error_lower = error_output.lower() # Authentication failures if "permission denied" in error_lower or "authentication failed" in error_lower: if "publickey" in error_lower: return { "error": "SSH key authentication failed", "error_type": "key_authentication_failure", "guidance": [ f"Verify SSH key exists: {host.identity_file or '~/.ssh/id_rsa'}", f"Check key permissions: chmod 600 {host.identity_file or '~/.ssh/id_rsa'}", f"Ensure key is added to {host.user}@{host.hostname}:~/.ssh/authorized_keys", "Test manually: ssh -i /path/to/key user@host" ] } else: return { "error": "SSH authentication failed", "error_type": "general_authentication_failure", "guidance": [ "Check username and password/key credentials", "Verify user account exists on remote host", "Check if SSH key is configured correctly", f"Test connection manually: ssh {host.user}@{host.hostname}" ] } # Connection refused elif "connection refused" in error_lower: return { "error": f"Connection refused to {host.hostname}:{host.port}", "error_type": "connection_refused", "guidance": [ f"Check if SSH service is running on {host.hostname}", f"Verify port {host.port} is correct for SSH", f"Check firewall rules allowing port {host.port}", "Ensure host is reachable: ping " + host.hostname ] } # Host key verification elif "host key verification failed" in error_lower or "known_hosts" in error_lower: return { "error": "Host key verification failed", "error_type": "host_key_verification_failure", "guidance": [ "Remove old host key: ssh-keygen -R " + host.hostname, "Accept new host key manually: ssh " + f"{host.user}@{host.hostname}", "Or disable host key checking (less secure): StrictHostKeyChecking=no", "Verify you're connecting to the correct host" ] } # Network timeouts elif "timeout" in error_lower or "timed out" in error_lower: return { "error": f"Connection timeout to {host.hostname}", "error_type": "connection_timeout", "guidance": [ f"Check if {host.hostname} is reachable: ping {host.hostname}", "Verify network connectivity and routing", "Check if there are firewalls blocking the connection", f"Try increasing timeout or using different port than {host.port}" ] } # Name resolution elif "not known" in error_lower or "name resolution" in error_lower: return { "error": f"Hostname resolution failed for {host.hostname}", "error_type": "name_resolution_failure", "guidance": [ f"Check if hostname {host.hostname} is correct", "Verify DNS resolution: nslookup " + host.hostname, "Try using IP address instead of hostname", "Check /etc/hosts file for local hostname entries" ] } # Key file issues elif "no such file" in error_lower and "identity" in error_lower: return { "error": f"SSH key file not found: {host.identity_file}", "error_type": "key_file_not_found", "guidance": [ f"Verify key file exists: ls -la {host.identity_file}", "Generate new SSH key: ssh-keygen -t rsa -b 4096", "Copy key to remote host: ssh-copy-id " + f"{host.user}@{host.hostname}", "Check file path is absolute and correct" ] } # Generic SSH errors else: return { "error": f"SSH connection failed: {error_output}", "error_type": "unknown_ssh_error", "guidance": [ f"Check SSH service status on {host.hostname}", f"Verify connection details: {host.user}@{host.hostname}:{host.port}", "Test with verbose SSH: ssh -v " + f"{host.user}@{host.hostname}", "Check system logs for more details" ] } async def handle_action(self, action, **params) -> dict[str, Any]: """Unified action handler for all host operations. This method consolidates all dispatcher logic from server.py into the service layer. """ try: # Normalize action to HostAction enum when provided as string if isinstance(action, str): from ..models.enums import HostAction try: action = HostAction(action.lower().strip()) except ValueError: return { "success": False, "error": f"Unknown action: {action}", "valid_actions": [a.value for a in HostAction], } # Get action handlers mapping handlers = self._get_action_handlers() handler = handlers.get(action) if handler: return await handler(**params) else: return { "success": False, "error": f"Unknown action: {action}", "valid_actions": [ "list", "add", "edit", "remove", "test_connection", "discover", "ports", "import_ssh", "cleanup", ], } except Exception as e: self.logger.error("host service action error", action=action, error=str(e)) return {"success": False, "error": f"Service action failed: {str(e)}", "action": action} def _get_action_handlers(self) -> dict: """Get mapping of actions to handler methods.""" from ..models.enums import HostAction return { HostAction.LIST: self._handle_list_action, HostAction.ADD: self._handle_add_action, HostAction.EDIT: self._handle_edit_action, HostAction.REMOVE: self._handle_remove_action, HostAction.TEST_CONNECTION: self._handle_test_connection_action, HostAction.DISCOVER: self._handle_discover_action, HostAction.PORTS: self._handle_ports_action, HostAction.IMPORT_SSH: self._handle_import_ssh_action, HostAction.CLEANUP: self._handle_cleanup_action, } async def _handle_list_action(self, **params) -> dict[str, Any]: """Handle LIST action.""" result = await self.list_docker_hosts() return ( result if isinstance(result, dict) else {"success": False, "error": "Invalid result format"} ) async def _handle_add_action(self, **params) -> dict[str, Any]: """Handle ADD action.""" host_id = params.get("host_id", "") ssh_host = params.get("ssh_host", "") ssh_user = params.get("ssh_user", "") ssh_port = params.get("ssh_port", 22) ssh_key_path = params.get("ssh_key_path") description = params.get("description", "") tags = params.get("tags", []) compose_path = params.get("compose_path") enabled = params.get("enabled", True) if not host_id: error_message = "host_id is required for add action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Host add failed", error_message ), } if not ssh_host: error_message = "ssh_host is required for add action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Host add failed", error_message ), } if not ssh_user: error_message = "ssh_user is required for add action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Host add failed", error_message ), } if not (1 <= ssh_port <= 65535): return { "success": False, "error": f"ssh_port must be between 1 and 65535, got {ssh_port}", "formatted_output": self._format_error_output( "Host add failed", f"ssh_port must be between 1 and 65535, got {ssh_port}", ), } result = await self.add_docker_host( host_id, ssh_host, ssh_user, ssh_port, ssh_key_path, description, tags, compose_path, enabled, ) # Auto-run discovery if host was added successfully if result.get("success"): discovery_result = await self.discover_host_capabilities(host_id) if discovery_result.get("success") and discovery_result.get("recommendations"): result["discovery"] = discovery_result result["message"] += " (Discovery completed - check recommendations)" if result.get("formatted_output"): result["formatted_output"] += self._format_discovery_appendix( host_id, discovery_result ) else: result["formatted_output"] = self._format_discovery_appendix( host_id, discovery_result ).strip() return result async def _handle_edit_action(self, **params) -> dict[str, Any]: """Handle EDIT action.""" host_id = params.get("host_id", "") if not host_id: error_message = "host_id is required for edit action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Host edit failed", error_message ), } return await self.edit_docker_host( host_id, params.get("ssh_host"), params.get("ssh_user"), params.get("ssh_port"), params.get("ssh_key_path"), params.get("description"), params.get("tags"), params.get("compose_path"), params.get("appdata_path"), params.get("enabled"), ) async def _handle_remove_action(self, **params) -> dict[str, Any]: """Handle REMOVE action.""" host_id = params.get("host_id", "") if not host_id: error_message = "host_id is required for remove action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Host remove failed", error_message ), } return await self.remove_docker_host(host_id) async def _handle_test_connection_action(self, **params) -> dict[str, Any]: """Handle TEST_CONNECTION action.""" host_id = params.get("host_id", "") if not host_id: error_message = "host_id is required for test_connection action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Connection test failed", error_message ), } return await self.test_connection(host_id) async def _handle_discover_action(self, **params) -> dict[str, Any]: """Handle DISCOVER action.""" host_id = params.get("host_id", "") if host_id == "all" or not host_id: result = await self.discover_all_hosts_sequential() return self._format_discover_all_result(result) else: result = await self.discover_host_capabilities(host_id) return self._format_discover_result(result, host_id) async def _handle_ports_action(self, **params) -> dict[str, Any]: """Handle PORTS action.""" from ..services.container import ContainerService host_id = params.get("host_id", "") port = params.get("port", 0) if not host_id: return {"success": False, "error": "host_id is required for ports action"} if self.context_manager is None: return {"success": False, "error": "Context manager not available"} container_service = ContainerService(self.config, self.context_manager) if port > 0: result = await container_service.check_port_availability(host_id, port) # For specific port checks, return structured content (no complex formatting needed) return cast(dict[str, Any], result.structured_content) else: result = await container_service.list_host_ports(host_id) # CRITICAL: Preserve the formatted content from ContainerService.list_host_ports() # The ContainerService creates beautiful token-efficient formatting like: # "Port Usage on tootie\nFound 136 ports across 67 containers\nPORT MAPPINGS:\n container [project]: 8080→80/tcp" # But we were stripping this and returning only raw JSON, losing the user-friendly formatting # The structured_content is still available but we need to include the formatted text too # Extract the formatted content and include it in the response formatted_content = "" if hasattr(result, 'content') and result.content: formatted_content = result.content[0].text if result.content[0].text else "" # Get the structured content structured_data = cast(dict[str, Any], result.structured_content) # Add the formatted output to the structured data so FastMCP can display it structured_data["formatted_output"] = formatted_content return structured_data async def _handle_import_ssh_action(self, **params) -> dict[str, Any]: """Handle IMPORT_SSH action.""" from ..services import ConfigService ssh_config_path = params.get("ssh_config_path") selected_hosts = params.get("selected_hosts") config_service = ConfigService(self.config, self.context_manager) # type: ignore[arg-type] config_path = getattr(self.config, "config_file", None) result = await config_service.import_ssh_config( ssh_config_path, selected_hosts, config_path ) formatted_text = "" if hasattr(result, "structured_content"): if result.structured_content: import_result = result.structured_content if hasattr(result, "content") and result.content: formatted_text = getattr(result.content[0], "text", "") or "" else: self.logger.error( "import_ssh_config returned invalid/missing structured_content", ssh_config_path=ssh_config_path, selected_hosts=selected_hosts, result_type=type(result).__name__, has_content=hasattr(result, "content"), content_preview=str(result.content)[:200] if hasattr(result, "content") else None, ) import_result = { "success": False, "error": "import_ssh_config returned invalid/missing structured_content", "detail": f"Expected structured data but received: {type(result.structured_content).__name__}", "operation": "import_ssh_config", } else: import_result = result if formatted_text: import_result["formatted_output"] = formatted_text # Auto-run discovery on imported hosts if import was successful if import_result.get("success") and import_result.get("imported_hosts"): discovered_hosts = [] for host_info in import_result["imported_hosts"]: host_id = host_info[HOST_ID] try: test_result = await self.test_connection(host_id) discovery_result = await self.discover_host_capabilities(host_id) discovered_hosts.append( { HOST_ID: host_id, "connection_test": test_result.get("success", False), "discovery": discovery_result.get("success", False), "recommendations": discovery_result.get("recommendations", []), } ) except Exception as e: self.logger.error( "Auto-discovery failed for imported host", host_id=host_id, error=str(e) ) discovered_hosts.append( { HOST_ID: host_id, "connection_test": False, "discovery": False, "error": str(e), } ) import_result["auto_discovery"] = {"completed": True, "results": discovered_hosts} import_result["message"] = ( import_result.get("message", "") + " (Auto-discovery completed for imported hosts)" ) summary_text = self._format_import_result_output(import_result) existing = import_result.get("formatted_output", "").strip() import_result["formatted_output"] = ( f"{existing}\n\n{summary_text}" if existing else summary_text ) elif "formatted_output" not in import_result: import_result["formatted_output"] = self._format_import_result_output(import_result) return import_result async def _handle_cleanup_action(self, **params) -> dict[str, Any]: """Handle CLEANUP action.""" from ..services import CleanupService host_id = params.get("host_id", "") cleanup_type = params.get("cleanup_type") cleanup_service = CleanupService(self.config) # Handle cleanup operations if not host_id: error_message = "host_id is required for cleanup action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Cleanup failed", error_message ), } if not cleanup_type: error_message = "cleanup_type is required for cleanup action" return { "success": False, "error": error_message, "formatted_output": self._format_error_output( "Cleanup failed", error_message ), } if cleanup_type not in ["check", "safe", "moderate", "aggressive"]: return { "success": False, "error": "cleanup_type must be one of: check, safe, moderate, aggressive", "formatted_output": self._format_error_output( "Cleanup failed", "cleanup_type must be one of: check, safe, moderate, aggressive", ), } result = await cleanup_service.docker_cleanup(host_id, cleanup_type) if result.get("success") and "formatted_output" not in result: result["formatted_output"] = self._format_cleanup_output(result) elif not result.get("success") and "formatted_output" not in result: result["formatted_output"] = self._format_error_output( "Cleanup failed", result.get("error", "Unknown error") ) return result def _format_discover_result(self, result: dict[str, Any], host_id: str) -> dict[str, Any]: """Format discovery result for single host.""" if not result.get("success"): if "formatted_output" not in result: result["formatted_output"] = self._format_error_output( f"Discovery failed for {host_id}", result.get("error", "Unknown error") ) return result # Add discovery summary information discovery_count = 0 if result.get("compose_discovery", {}).get("paths"): discovery_count += len(result["compose_discovery"]["paths"]) if result.get("appdata_discovery", {}).get("paths"): discovery_count += len(result["appdata_discovery"]["paths"]) result["discovery_summary"] = { HOST_ID: host_id, "paths_discovered": discovery_count, "recommendations_count": len(result.get("recommendations", [])), } # Collect and format all guidance messages for display guidance_messages = [] if compose_guidance := result.get("compose_discovery", {}).get("guidance"): guidance_messages.append(f"📁 **Compose Paths**: {compose_guidance}") if appdata_guidance := result.get("appdata_discovery", {}).get("guidance"): guidance_messages.append(f"💾 **Appdata Paths**: {appdata_guidance}") if overall_guidance := result.get("overall_guidance"): guidance_messages.append(f"💡 **Overall Guidance**: {overall_guidance}") # Add formatted guidance to result if any guidance exists if guidance_messages: result["helpful_guidance"] = "\n\n".join(guidance_messages) result["formatted_output"] = self._format_discover_single_output(host_id, result) return result def _format_discover_all_result(self, result: dict[str, Any]) -> dict[str, Any]: """Format discovery result for all hosts.""" if not result.get("success"): if "formatted_output" not in result: result["formatted_output"] = self._format_error_output( "Discovery failed", result.get("error", "Unknown error") ) return result # Add summary statistics total_recommendations = 0 total_paths = 0 discoveries = result.get("discoveries", {}) for host_discovery in discoveries.values(): if host_discovery.get("success"): total_recommendations += len(host_discovery.get("recommendations", [])) compose_paths = len(host_discovery.get("compose_discovery", {}).get("paths", [])) appdata_paths = len(host_discovery.get("appdata_discovery", {}).get("paths", [])) total_paths += compose_paths + appdata_paths result["discovery_summary"] = { "total_hosts_discovered": result.get("successful_discoveries", 0), "total_recommendations": total_recommendations, "total_paths_found": total_paths, } result["formatted_output"] = self._format_discover_all_output(result) return result def _format_host_list_output( self, hosts: list[dict[str, Any]], enabled_hosts: int ) -> list[str]: total_hosts = len(hosts) lines = [f"Docker Hosts ({total_hosts} configured)"] if total_hosts == 0: lines.append("No hosts configured. Use `docker_hosts add` to register a host.") return lines lines.append(f"Enabled: {enabled_hosts}/{total_hosts}") lines.append("") header = f"{'Host':<12} {'Address':<22} {'Status':<3} {'Details'}" separator = f"{'-' * 12:<12} {'-' * 22:<22} {'-' * 3:<3} {'-' * 48}" lines.extend([header, separator]) for host in hosts: host_id = cast(str, host.get(HOST_ID, "unknown")) hostname = str(host.get("hostname", "-")) or "-" port = host.get("port") port_display = str(port) if port not in (None, "", 0) else "-" address = f"{hostname}:{port_display}" if port_display != "-" else hostname status_icon = "✓" if host.get("enabled", True) else "✗" address_lines = self._wrap_value(address, 22) detail_lines = self._host_detail_lines(host) row_count = max(len(address_lines), len(detail_lines)) for index in range(row_count): host_column = host_id if index == 0 else "" address_column = address_lines[index] if index < len(address_lines) else "" status_column = status_icon if index == 0 else "" detail_column = detail_lines[index] if index < len(detail_lines) else "" lines.append( f"{host_column:<12} {address_column:<22} {status_column:<3} {detail_column}" ) return lines def _host_detail_lines(self, host: dict[str, Any]) -> list[str]: tags = host.get("tags") or [] details: list[str] = [] tag_line = "/".join(str(tag) for tag in tags if str(tag).strip()) if tag_line: details.append(tag_line) description = host.get("description") if description: details.extend(self._wrap_value(str(description), 48)) compose_path = host.get(COMPOSE_PATH) if compose_path: details.extend(self._wrap_labelled_value("Compose", str(compose_path), 48)) appdata_path = host.get(APPDATA_PATH) if appdata_path: details.extend(self._wrap_labelled_value("Appdata", str(appdata_path), 48)) return details or ["-"] def _format_detail_entries( self, entries: list[tuple[str, str]], width: int = 48 ) -> list[str]: if not entries: return ["-"] lines: list[str] = [] for label, value in entries: if value is None or value == "": continue lines.extend(self._wrap_labelled_value(label, value, width)) return lines or ["-"] def _wrap_labelled_value(self, label: str, value: str, width: int) -> list[str]: available_width = max(width - len(label) - 2, 12) wrapped_value = self._wrap_value( value, available_width, prefer_path=label.lower() in {"compose", "appdata"}, ) formatted_lines = [f"{label}: {wrapped_value[0]}"] indent = " " * (len(label) + 2) for continuation in wrapped_value[1:]: formatted_lines.append(f"{indent}{continuation}") return formatted_lines def _wrap_value(self, value: str, width: int, prefer_path: bool = False) -> list[str]: if width <= 0 or len(value) <= width: return [value] if prefer_path or "/" in value: spaced = value.replace("/", "/ ") wrapped = textwrap.wrap( spaced, width=width, break_long_words=False, break_on_hyphens=False, ) cleaned = [line.replace("/ ", "/").strip() for line in wrapped if line.strip()] return cleaned or [value] wrapped = textwrap.wrap( value, width=width, break_long_words=False, break_on_hyphens=False, ) return wrapped or [value] def _format_error_output( self, prefix: str, message: str, guidance: list[str] | None = None ) -> str: lines = [f"❌ {prefix}: {message}"] if guidance: lines.append("Guidance:") for item in guidance: lines.append(f" • {item}") return "\n".join(lines) def _format_add_host_output( self, host_id: str, hostname: str, user: str, port: int, connection_tested: bool, tags: list[str] | None, compose_path: str | None, description: str, enabled: bool, ) -> str: """Format host addition output with enhanced visual structure.""" connection_status = "✅ Connected" if connection_tested else "⚠️ Not tested" enabled_status = "✅ Enabled" if enabled else "❌ Disabled" lines = [] # Header with visual separator lines.append("═" * 60) lines.append("🚀 New Docker Host Added Successfully") lines.append("═" * 60) # Host information section lines.append("") lines.append("📋 Host Configuration:") lines.append(f" Host ID: {host_id}") lines.append(f" Address: {hostname}:{port}") lines.append(f" SSH User: {user}") # Status section lines.append("") lines.append("📊 Status:") lines.append(f" Connection: {connection_status}") lines.append(f" State: {enabled_status}") # Optional details section if compose_path or description or tags: lines.append("") lines.append("🔧 Additional Configuration:") if compose_path: lines.extend(self._wrap_labelled_value(" Compose Path", compose_path, 70)) if description: lines.extend(self._wrap_labelled_value(" Description", description, 70)) if tags: lines.extend(self._wrap_labelled_value(" Tags", ", ".join(tags), 70)) # Next steps lines.append("") lines.append("💡 Next Steps:") lines.append(" • Run 'docker_hosts test_connection' to verify connectivity") lines.append(" • Run 'docker_hosts discover' to find compose paths") lines.append(" • Deploy stacks with 'docker_compose deploy'") lines.append("─" * 60) return "\n".join(lines) def _format_discovery_appendix( self, host_id: str, discovery_result: dict[str, Any] ) -> str: compose_paths = discovery_result.get("compose_discovery", {}).get("paths", []) appdata_paths = discovery_result.get("appdata_discovery", {}).get("paths", []) recommendations = discovery_result.get("recommendations", []) return ( "\n" + f"Discovery summary for {host_id}: " + f"compose {len(compose_paths)}, appdata {len(appdata_paths)}, " + f"recommendations {len(recommendations)}" ) def _serialize_host_config(self, host_id: str, host_config: DockerHost) -> dict[str, Any]: return { HOST_ID: host_id, "id": host_id, "hostname": host_config.hostname, "user": host_config.user, "port": host_config.port, "identity_file": host_config.identity_file, "description": host_config.description, "tags": host_config.tags, "enabled": host_config.enabled, COMPOSE_PATH: host_config.compose_path, APPDATA_PATH: host_config.appdata_path, } def _calculate_host_changes( self, current: DockerHost, new: DockerHost ) -> dict[str, dict[str, Any]]: changes: dict[str, dict[str, Any]] = {} mapping = { "hostname": "hostname", "user": "user", "port": "port", "identity_file": "identity_file", "description": "description", "tags": "tags", "compose_path": "compose_path", "appdata_path": "appdata_path", "enabled": "enabled", } for attr, key in mapping.items(): old_value = getattr(current, attr) new_value = getattr(new, attr) if old_value != new_value: changes[key] = {"old": old_value, "new": new_value} return changes def _format_edit_host_output( self, host_id: str, host: DockerHost, changes: dict[str, dict[str, Any]] ) -> str: """Format host edit output with enhanced visual structure and change details.""" lines = [] # Header with visual separator lines.append("═" * 60) lines.append("✏️ Docker Host Configuration Updated") lines.append("═" * 60) # Host identification lines.append("") lines.append(f"📋 Host: {host_id}") lines.append(f" Address: {host.hostname}:{host.port}") lines.append(f" Status: {'✅ Enabled' if host.enabled else '❌ Disabled'}") # Changes section lines.append("") if changes: lines.append(f"🔄 Modified Fields ({len(changes)}):") for field, values in sorted(changes.items()): old_val = values.get("old", "") new_val = values.get("new", "") # Format the change with before/after lines.append(f" • {field}:") lines.append(f" Before: {old_val if old_val else '(none)'}") lines.append(f" After: {new_val if new_val else '(none)'}") else: lines.append("ℹ️ No changes detected (configuration already up to date)") # Current configuration section lines.append("") lines.append("🔧 Current Configuration:") lines.append(f" SSH User: {host.user}") if host.compose_path: lines.extend(self._wrap_labelled_value(" Compose Path", host.compose_path, 70)) if host.appdata_path: lines.extend(self._wrap_labelled_value(" Appdata Path", host.appdata_path, 70)) if host.tags: lines.extend(self._wrap_labelled_value(" Tags", ", ".join(host.tags), 70)) if host.description: lines.extend(self._wrap_labelled_value(" Description", host.description, 70)) # Next steps lines.append("") lines.append("💡 Next Steps:") lines.append(" • Run 'docker_hosts test_connection' to verify changes") lines.append(" • Run 'docker_hosts list' to see updated configuration") lines.append("─" * 60) return "\n".join(lines) def _format_remove_host_output(self, host_id: str, hostname: str) -> str: """Format enhanced host removal output with context and visual structure.""" lines = [] # Header with visual separator lines.append("═" * 50) lines.append("🗑️ Host Removal Complete") lines.append("═" * 50) # Host details lines.append(f"✅ Host ID: {host_id}") lines.append(f" Hostname: {hostname}") # Get host details if available from config before removal if self.config and hasattr(self.config, "hosts"): # Try to get additional context from other sources lines.append("") lines.append("📝 Removal Summary:") lines.append(" • Configuration entries cleaned") lines.append(" • Docker context removed (if exists)") lines.append(" • Hot-reload triggered") lines.append("") lines.append("💡 Next Steps:") lines.append(" • Run 'docker_hosts list' to verify removal") lines.append(" • Add new host with 'docker_hosts add' if needed") lines.append("─" * 50) return "\n".join(lines) def _format_test_connection_output( self, host_id: str, hostname: str, port: int, docker_status: str, docker_version: str | None, docker_message: str, ) -> str: status_icon = { "fully_available": "✓", "version_only": "◐", "not_available": "✗", }.get(docker_status, "?") lines = [f"SSH OK: {host_id} {hostname}:{port}"] lines.append(f"Docker: {status_icon} {docker_message}") if docker_version: lines.append(f"Version: {docker_version}") return "\n".join(lines) def _format_discover_single_output( self, host_id: str, discovery: dict[str, Any] ) -> str: lines = self._build_discovery_header(host_id, discovery) self._add_compose_paths(lines, discovery) self._add_appdata_paths(lines, discovery) self._add_recommendations(lines, discovery) self._add_helpful_guidance(lines, discovery) return "\n".join(lines).strip() def _build_discovery_header(self, host_id: str, discovery: dict[str, Any]) -> list[str]: """Build discovery output header.""" compose_count = len(discovery.get("compose_discovery", {}).get("paths", [])) appdata_count = len(discovery.get("appdata_discovery", {}).get("paths", [])) recommendations_count = len(discovery.get("recommendations", [])) return [ f"Host Discovery on {host_id}", ( f"Compose paths: {compose_count} | " f"Appdata paths: {appdata_count} | " f"Recommendations: {recommendations_count}" ), "", ] def _add_compose_paths(self, lines: list[str], discovery: dict[str, Any]) -> None: """Add compose paths to discovery output.""" compose_paths = discovery.get("compose_discovery", {}).get("paths", []) if compose_paths: lines.append("Compose paths:") for path in compose_paths: for wrapped in self._wrap_value(path, 72, prefer_path=True): lines.append(f" {wrapped}") else: lines.append("Compose paths: none detected") lines.append("") def _add_appdata_paths(self, lines: list[str], discovery: dict[str, Any]) -> None: """Add appdata paths to discovery output.""" appdata_paths = discovery.get("appdata_discovery", {}).get("paths", []) if appdata_paths: lines.append("Appdata paths:") for path in appdata_paths: for wrapped in self._wrap_value(path, 72, prefer_path=True): lines.append(f" {wrapped}") else: lines.append("Appdata paths: none detected") def _add_recommendations(self, lines: list[str], discovery: dict[str, Any]) -> None: """Add recommendations to discovery output.""" recommendations = discovery.get("recommendations", []) if recommendations: lines.append("") lines.append("Recommendations:") for recommendation in recommendations: message = self._extract_recommendation_message(recommendation) for wrapped in self._wrap_value(message, 72): lines.append(f" • {wrapped}") def _extract_recommendation_message(self, recommendation: Any) -> str: """Extract message from recommendation object.""" if isinstance(recommendation, dict): return recommendation.get("message", str(recommendation)) return str(recommendation) def _add_helpful_guidance(self, lines: list[str], discovery: dict[str, Any]) -> None: """Add helpful guidance to discovery output.""" if discovery.get("helpful_guidance"): lines.append("") for wrapped in self._wrap_value(discovery["helpful_guidance"], 72): lines.append(wrapped) def _format_discover_all_output(self, result: dict[str, Any]) -> str: discoveries = result.get("discoveries", {}) total_hosts = result.get("total_hosts", len(discoveries)) successful = result.get("successful_discoveries", 0) summary = result.get("discovery_summary", {}) total_paths = summary.get("total_paths_found", 0) total_recommendations = summary.get("total_recommendations", 0) lines = [ "Host Discovery (all)", ( f"Hosts: {total_hosts} | Successful: {successful} | " f"Paths: {total_paths} | Recommendations: {total_recommendations}" ), "", ] header = f"{'Host':<14} {'OK':<3} {'Compose':<7} {'Appdata':<7} {'Recs':<4} {'Notes'}" separator = f"{'-' * 14:<14} {'-' * 3:<3} {'-' * 7:<7} {'-' * 7:<7} {'-' * 4:<4} {'-' * 32}" lines.extend([header, separator]) for host_id in sorted(discoveries): discovery = discoveries[host_id] compose_count = len(discovery.get("compose_discovery", {}).get("paths", [])) appdata_count = len(discovery.get("appdata_discovery", {}).get("paths", [])) rec_count = len(discovery.get("recommendations", [])) ok_icon = "✓" if discovery.get("success") else "✗" note = discovery.get("error", "") lines.append( f"{host_id:<14} {ok_icon:<3} {compose_count:<7} {appdata_count:<7} {rec_count:<4} {note}" ) if note: for wrapped in self._wrap_value(note, 48): lines.append(f"{'':<14} {'':<3} {'':<7} {'':<7} {'':<4} {wrapped}") return "\n".join(lines) def _format_import_result_output(self, import_result: dict[str, Any]) -> str: if not import_result.get("success", False): return self._format_error_output( "SSH import failed", import_result.get("error", "Unknown error") ) if import_result.get("action") == "selection_required": available = len(import_result.get("importable_hosts", [])) return f"SSH config parsed. {available} host(s) available for import. Provide `selected_hosts` to continue." imported_hosts = import_result.get("imported_hosts", []) host_lines = [] for host_info in imported_hosts: host_lines.append( f" • {host_info.get(HOST_ID)} ({host_info.get('hostname')})" ) summary = [ f"Imported {len(imported_hosts)} host(s) from SSH config", *host_lines, ] if import_result.get("auto_discovery", {}).get("completed"): summary.append( f"Auto-discovery complete for {len(import_result['auto_discovery'].get('results', []))} host(s)" ) return "\n".join(summary) def _format_cleanup_output(self, result: dict[str, Any]) -> str: host_id = result.get("host_id", "?") cleanup_type = result.get("cleanup_type", result.get("mode", "unknown")) lines = [f"Cleanup ({cleanup_type}) on {host_id}"] if cleanup_type == "check": self._add_cleanup_check_output(lines, result) else: self._add_cleanup_execution_output(lines, result) self._add_cleanup_message(lines, result) return "\n".join(lines) def _add_cleanup_check_output(self, lines: list[str], result: dict[str, Any]) -> None: """Add check cleanup output.""" reclaimable = result.get("total_reclaimable", "0B") percentage = result.get("reclaimable_percentage", 0) lines.append(f"Reclaimable: {reclaimable} ({percentage}%)") self._add_cleanup_summary_details(lines, result.get("summary", {})) self._add_cleanup_recommendations(lines, result.get("recommendations", [])) def _add_cleanup_summary_details(self, lines: list[str], summary: dict[str, Any]) -> None: """Add cleanup summary details.""" for resource, details in summary.items(): if not isinstance(details, dict): continue resource_line = self._format_cleanup_resource_details(resource, details) if resource_line: lines.append(resource_line) def _format_cleanup_resource_details(self, resource: str, details: dict[str, Any]) -> str: """Format cleanup resource details.""" parts = [] if "stopped" in details: parts.append(f"stopped {details.get('stopped')}") if "unused" in details: parts.append(f"unused {details.get('unused')}") if "reclaimable_space" in details: parts.append(f"reclaim {details.get('reclaimable_space')}") if "size" in details: parts.append(f"size {details.get('size')}") return f"{resource.title()}: {', '.join(parts)}" if parts else "" def _add_cleanup_recommendations(self, lines: list[str], recommendations: list[str]) -> None: """Add cleanup recommendations.""" if recommendations: lines.append("") lines.append("Recommendations:") for recommendation in recommendations: lines.append(f" • {recommendation}") def _add_cleanup_execution_output(self, lines: list[str], result: dict[str, Any]) -> None: """Add execution cleanup output.""" results = result.get("results", []) for entry in results: resource = entry.get("resource_type", "resource") if entry.get("success"): lines.append( f"• {resource}: reclaimed {entry.get('space_reclaimed', '0B')}" ) else: lines.append( f"• {resource}: failed ({entry.get('error', 'unknown error')})" ) def _add_cleanup_message(self, lines: list[str], result: dict[str, Any]) -> None: """Add cleanup message if present.""" if result.get("message"): lines.append("") lines.append(result["message"]) async def _test_ssh_connection( self, hostname: str, user: str, port: int = 22, identity_file: str | None = None ) -> bool: """Test SSH connection with raw parameters before adding host to config. Args: hostname: SSH hostname or IP address user: SSH username port: SSH port (default: 22) identity_file: Path to SSH private key file Returns: True if SSH connection successful, False otherwise """ try: # Build SSH command for connection test (similar to test_connection method) ssh_cmd = [ "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=accept-new", ] if port != 22: ssh_cmd.extend(["-p", str(port)]) if identity_file: ssh_cmd.extend(["-i", identity_file]) ssh_cmd.append(f"{user}@{hostname}") ssh_cmd.append("echo 'connection_test_ok'") # Execute SSH test process = await asyncio.create_subprocess_exec( *ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode().strip() # Check if connection was successful success = process.returncode == 0 and "connection_test_ok" in output if success: self.logger.info( "SSH connection test successful", hostname=hostname, user=user, port=port ) else: self.logger.warning( "SSH connection test failed", hostname=hostname, user=user, port=port, returncode=process.returncode, stderr=stderr.decode().strip()[:200], ) return success except Exception as e: self.logger.error( "SSH connection test exception", hostname=hostname, user=user, port=port, error=str(e), ) return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server