Skip to main content
Glama
config.py28.1 kB
""" Configuration Management Service Business logic for configuration discovery, import, and management operations. """ import asyncio import difflib from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from docker_mcp.core.docker_context import DockerContextManager from docker_mcp.core.ssh_config_parser import SSHConfigEntry import structlog from fastmcp.tools.tool import ToolResult from mcp.types import TextContent from ..core.compose_manager import ComposeManager from ..core.config_loader import DockerMCPConfig, save_config from ..core.ssh_config_parser import SSHConfigParser from ..utils import validate_host class ConfigService: """Service for configuration management operations.""" def __init__(self, config: DockerMCPConfig, context_manager: "DockerContextManager"): self.config = config self.context_manager = context_manager self.compose_manager = ComposeManager(config, context_manager) self.logger = structlog.get_logger() async def update_host_config(self, host_id: str, compose_path: str) -> ToolResult: """Update host configuration with compose file path.""" try: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Update the host configuration self.config.hosts[host_id].compose_path = compose_path # In a production system, this would persist to hosts.yml # For now, we'll just update the in-memory configuration self.logger.info( "Host configuration updated", host_id=host_id, compose_path=compose_path ) return ToolResult( content=[ TextContent( type="text", text=f"Success: Updated compose path for {host_id} to {compose_path}", ) ], structured_content={ "success": True, "host_id": host_id, "compose_path": compose_path, "message": "Host configuration updated (in-memory only)", }, ) except Exception as e: self.logger.error( "Failed to update host config", host_id=host_id, compose_path=compose_path, error=str(e), ) return ToolResult( content=[ TextContent(type="text", text=f"❌ Failed to update host config: {str(e)}") ], structured_content={"success": False, "error": str(e), "host_id": host_id}, ) async def discover_compose_paths(self, host_id: str | None = None) -> ToolResult: """Discover Docker Compose file locations and guide user through configuration.""" try: discovery_results = [] hosts_to_check = [host_id] if host_id else list(self.config.hosts.keys()) if host_id: is_valid, error_msg = validate_host(self.config, host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Discover compose locations for each host discovery_results = await self._perform_discovery(hosts_to_check) # Format results for user summary_lines, recommendations = self._format_discovery_results(discovery_results) return ToolResult( content=[TextContent(type="text", text="\n".join(summary_lines))], structured_content={ "success": True, "discovery_results": discovery_results, "recommendations": recommendations, "hosts_analyzed": len(discovery_results), }, ) except Exception as e: self.logger.error("Failed to discover compose paths", host_id=host_id, error=str(e)) return ToolResult( content=[ TextContent(type="text", text=f"❌ Failed to discover compose paths: {str(e)}") ], structured_content={"success": False, "error": str(e), "host_id": host_id}, ) async def _perform_discovery(self, hosts_to_check: list[str]) -> list[dict[str, Any]]: """Perform compose path discovery for specified hosts with concurrency.""" enabled_hosts = [ host_id for host_id in hosts_to_check if self.config.hosts[host_id].enabled ] if not enabled_hosts: return [] self.logger.info( "Starting compose discovery", total_hosts=len(enabled_hosts), hosts=enabled_hosts ) # Run discovery operations concurrently async def discover_single_host(host_id: str) -> dict[str, Any]: self.logger.info("Discovering compose locations", host_id=host_id) return await self.compose_manager.discover_compose_locations(host_id) # Run discovery operations concurrently with error handling (Python 3.11 compatible) tasks = [asyncio.create_task(discover_single_host(host_id)) for host_id in enabled_hosts] # Use asyncio.gather with return_exceptions for Python 3.11 compatibility results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and handle exceptions discovery_results: list[dict[str, Any]] = [] failed_count = 0 for i, result in enumerate(results): host_id = enabled_hosts[i] if isinstance(result, Exception): failed_count += 1 self.logger.error("Discovery failed for host", host_id=host_id, error=str(result)) elif isinstance(result, dict): discovery_results.append(result) if failed_count > 0: self.logger.warning( "Discovery completed with some failures", successful=len(discovery_results), failed=failed_count, total=len(enabled_hosts), ) return discovery_results def _format_discovery_results( self, discovery_results: list[dict[str, Any]] ) -> tuple[list[str], list[dict[str, Any]]]: """Format discovery results for display.""" summary_lines = ["Docker Compose Path Discovery Results", "=" * 45, ""] recommendations = [] for result in discovery_results: host_id = result["host_id"] host_config = self.config.hosts[host_id] summary_lines.extend(self._format_host_discovery(result, host_config, recommendations)) # Add configuration instructions if recommendations: summary_lines.extend(self._format_recommendations(recommendations)) else: summary_lines.append( "✅ All hosts are properly configured or have no stacks requiring configuration." ) return summary_lines, recommendations def _format_host_discovery( self, result: dict[str, Any], host_config: Any, recommendations: list[dict[str, Any]] ) -> list[str]: """Format discovery results for a single host.""" host_id = result["host_id"] lines = [ f"Host: {host_id} ({host_config.hostname})", "-" * 30, ] # Show current configuration status current_path = host_config.compose_path if current_path: lines.append(f"Currently configured path: {current_path}") else: lines.append("No compose path currently configured") # Show discovery results lines.append(f"Analysis: {result['analysis']}") if result["stacks_found"]: lines.append(f"Found stacks ({len(result['stacks_found'])}):") for stack in result["stacks_found"]: lines.append(f" • {stack['name']}: {stack['compose_file']}") # Show compose locations breakdown if result["compose_locations"]: lines.append("Compose file locations:") for location, data in result["compose_locations"].items(): stacks_list = ", ".join(data["stacks"]) lines.append(f" • {location}: {data['count']} stacks ({stacks_list})") # Generate recommendation suggested_path = result["suggested_path"] if suggested_path: self._add_recommendation( lines, recommendations, host_id, current_path, suggested_path, result ) lines.append("") return lines def _add_recommendation( self, lines: list[str], recommendations: list[dict[str, Any]], host_id: str, current_path: str | None, suggested_path: str, result: dict[str, Any], ) -> None: """Add recommendation for host configuration.""" if current_path == suggested_path: lines.append("✅ Current configuration matches discovery results") elif current_path: lines.append(f"💡 Recommendation: Consider changing to {suggested_path}") recommendations.append( { "host_id": host_id, "current_path": current_path, "suggested_path": suggested_path, "reason": result["analysis"], } ) else: lines.append(f"💡 Recommendation: Set compose_path to {suggested_path}") recommendations.append( { "host_id": host_id, "current_path": None, "suggested_path": suggested_path, "reason": result["analysis"], } ) def _format_recommendations(self, recommendations: list[dict[str, Any]]) -> list[str]: """Format configuration recommendations.""" lines = ["Configuration Recommendations:", "=" * 30, ""] for rec in recommendations: if rec["current_path"]: lines.append( f"Host '{rec['host_id']}': Update compose path from {rec['current_path']} to {rec['suggested_path']}" ) else: lines.append( f"Host '{rec['host_id']}': Set compose path to {rec['suggested_path']}" ) lines.append(f" Reason: {rec['reason']}") lines.append( f" Command: update_host_config(host_id='{rec['host_id']}', compose_path='{rec['suggested_path']}')" ) lines.append("") lines.extend( [ "Note: Each stack will be stored in its own subdirectory:", " {compose_path}/{stack_name}/docker-compose.yml", "", "Example: If compose_path is '/mnt/user/compose' and you deploy", "a stack named 'myapp', it will be stored at:", " /mnt/user/compose/myapp/docker-compose.yml", ] ) return lines async def import_ssh_config( self, ssh_config_path: str | None = None, selected_hosts: str | None = None, config_path: str | None = None, ) -> ToolResult: """Import hosts from SSH config with interactive selection and compose path discovery.""" try: # Initialize SSH config parser ssh_parser = SSHConfigParser(ssh_config_path) # Validate SSH config file is_valid, status_message = await asyncio.to_thread(ssh_parser.validate_config_file) if not is_valid: return ToolResult( content=[ TextContent(type="text", text=f"❌ SSH Config Error: {status_message}") ], structured_content={"success": False, "error": status_message}, ) # Get importable hosts importable_hosts = await asyncio.to_thread(ssh_parser.get_importable_hosts) if not importable_hosts: return ToolResult( content=[ TextContent(type="text", text="❌ No importable hosts found in SSH config") ], structured_content={"success": False, "error": "No importable hosts found"}, ) # Handle host selection if selected_hosts is None: return self._show_host_selection(importable_hosts) # Parse and import selected hosts hosts_to_import = self._parse_host_selection(selected_hosts, importable_hosts) if isinstance(hosts_to_import, ToolResult): # Error case return hosts_to_import # Process selected hosts imported_hosts, compose_path_configs = await self._import_selected_hosts( hosts_to_import ) if not imported_hosts: return ToolResult( content=[ TextContent( type="text", text="❌ No new hosts to import (all selected hosts already exist)", ) ], structured_content={"success": False, "error": "No new hosts to import"}, ) # Save configuration config_file_to_use = config_path or getattr(self.config, "config_file", None) if config_file_to_use: await asyncio.to_thread(save_config, self.config, config_file_to_use) # Build result summary summary_lines = self._format_import_results(imported_hosts, compose_path_configs, config_file_to_use) self.logger.info( "SSH config import completed", imported_hosts=len(imported_hosts), compose_paths_configured=len(compose_path_configs), ) return ToolResult( content=[TextContent(type="text", text="\n".join(summary_lines))], structured_content={ "success": True, "imported_hosts": imported_hosts, "compose_path_configs": compose_path_configs, "total_imported": len(imported_hosts), }, ) except Exception as e: self.logger.error("SSH config import failed", error=str(e)) return ToolResult( content=[TextContent(type="text", text=f"❌ SSH config import failed: {str(e)}")], structured_content={"success": False, "error": str(e)}, ) def _show_host_selection(self, importable_hosts: list["SSHConfigEntry"]) -> ToolResult: """Show available hosts for selection.""" summary_lines = [ "SSH Config Import - Host Selection", "=" * 40, "", f"Found {len(importable_hosts)} importable hosts in SSH config:", "", ] # Display hosts with 1-based indexing for user-friendly selection for i, entry in enumerate(importable_hosts, 1): hostname = entry.hostname or entry.name user = entry.user or "root" port_info = f":{entry.port}" if entry.port != 22 else "" summary_lines.append(f" [{i}] {entry.name}") summary_lines.append(f" → {user}@{hostname}{port_info}") if entry.identity_file: summary_lines.append(f" → Key: {entry.identity_file}") summary_lines.append("") summary_lines.extend( [ "To import specific hosts, use any of these formats:", "", "By index:", ' import_ssh_config(selected_hosts="1,3,5") # Import hosts 1, 3, and 5', "", "By hostname:", ' import_ssh_config(selected_hosts="dookie,squirts,slam") # Import by name', "", "Mixed format:", ' import_ssh_config(selected_hosts="1,dookie,5") # Mix indices and names', "", "Import all hosts:", ' import_ssh_config(selected_hosts="all")', "", "Note: Hostname matching is fuzzy and case-insensitive.", ] ) return ToolResult( content=[TextContent(type="text", text="\n".join(summary_lines))], structured_content={ "success": True, "action": "selection_required", "importable_hosts": [ { "index": i, "name": entry.name, "hostname": entry.hostname or entry.name, "user": entry.user or "root", "port": entry.port, } for i, entry in enumerate(importable_hosts, 1) ], }, ) def _fuzzy_match_host( self, query: str, importable_hosts: list["SSHConfigEntry"] ) -> tuple["SSHConfigEntry", float] | None: """Find best matching host using fuzzy string matching. Args: query: The search term (hostname or partial match) importable_hosts: List of SSH host entries to search Returns: Tuple of (matched_host, confidence_score) or None if no good match """ best_match = None best_score = 0.0 min_threshold = 0.6 # Minimum similarity score to consider a match for host in importable_hosts: # Get all searchable strings for this host searchable_strings = [ host.name.lower(), # SSH config name (host.hostname or host.name).lower(), # Hostname ] # Check for exact matches first query_lower = query.lower().strip() for search_string in searchable_strings: if query_lower == search_string: return host, 1.0 # Perfect match # Check for partial matches for search_string in searchable_strings: # Check if query is contained in string (partial match) if query_lower in search_string: score = len(query_lower) / len(search_string) if score > best_score: best_match = host best_score = score # Use difflib for fuzzy matching similarity = difflib.SequenceMatcher(None, query_lower, search_string).ratio() if similarity > best_score and similarity >= min_threshold: best_match = host best_score = similarity return (best_match, best_score) if best_match else None def _parse_host_selection( self, selected_hosts: str, importable_hosts: list["SSHConfigEntry"] ) -> list["SSHConfigEntry"] | ToolResult: """Parse host selection string and return hosts to import. Supports multiple formats: - "all" - select all hosts - "1,3,5" - numeric indices - "dookie,squirts,slam" - hostnames with fuzzy matching - "1,dookie,3" - mixed indices and hostnames """ if selected_hosts.lower().strip() == "all": return importable_hosts # Split by comma and clean up whitespace selection_items = [item.strip() for item in selected_hosts.split(",") if item.strip()] if not selection_items: return self._create_empty_selection_error() hosts_to_import, errors = self._process_selection_items(selection_items, importable_hosts) if errors: return self._create_selection_error(errors, importable_hosts) return self._remove_duplicate_hosts(hosts_to_import) def _create_empty_selection_error(self) -> ToolResult: """Create error result for empty selection.""" return ToolResult( content=[ TextContent( type="text", text="❌ Empty selection. Provide host indices, hostnames, or 'all'", ) ], structured_content={"success": False, "error": "Empty selection"}, ) def _process_selection_items( self, selection_items: list[str], importable_hosts: list["SSHConfigEntry"] ) -> tuple[list["SSHConfigEntry"], list[str]]: """Process each selection item and return hosts and errors.""" hosts_to_import = [] errors = [] for item in selection_items: host, error = self._process_single_selection_item(item, importable_hosts) if host: hosts_to_import.append(host) if error: errors.append(error) return hosts_to_import, errors def _process_single_selection_item( self, item: str, importable_hosts: list["SSHConfigEntry"] ) -> tuple["SSHConfigEntry | None", str | None]: """Process a single selection item and return host and optional error.""" # Try to parse as numeric index first (user provides 1-based, convert to 0-based array access) try: idx = int(item) if 1 <= idx <= len(importable_hosts): # Convert from 1-based user input to 0-based array index return importable_hosts[idx - 1], None else: return None, f"Index {idx} out of range (1-{len(importable_hosts)})" except ValueError: # Not a number, try hostname matching pass # Try fuzzy matching for hostname match_result = self._fuzzy_match_host(item, importable_hosts) if match_result: matched_host, confidence = match_result # Log fuzzy matches with low confidence for user awareness if confidence < 0.9: self.logger.info( "Fuzzy matched hostname", query=item, matched=matched_host.name, confidence=confidence, ) return matched_host, None else: return None, f"No match found for '{item}'" def _create_selection_error( self, errors: list[str], importable_hosts: list["SSHConfigEntry"] ) -> ToolResult: """Create error result for selection errors.""" error_msg = "Selection errors: " + "; ".join(errors) available_names = [host.name for host in importable_hosts] return ToolResult( content=[ TextContent( type="text", text=f"❌ {error_msg}\n\nAvailable hosts: {', '.join(available_names)}\nUse indices (1-{len(importable_hosts)}), hostnames, or 'all'", ) ], structured_content={ "success": False, "error": error_msg, "available_hosts": available_names, }, ) def _remove_duplicate_hosts( self, hosts_to_import: list["SSHConfigEntry"] ) -> list["SSHConfigEntry"]: """Remove duplicate hosts while preserving order.""" unique_hosts = [] seen_names = set() for host in hosts_to_import: if host.name not in seen_names: unique_hosts.append(host) seen_names.add(host.name) return unique_hosts async def _import_selected_hosts( self, hosts_to_import: list, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: """Import selected hosts with compose path discovery.""" imported_hosts = [] compose_path_configs = [] for ssh_entry in hosts_to_import: host_id = ssh_entry.name # Skip if host already exists if host_id in self.config.hosts: self.logger.warning("Host already exists, skipping", host_id=host_id) continue # Convert SSH entry to DockerHost docker_host = ssh_entry.to_docker_host() # Temporarily add host to config for compose path discovery self.config.hosts[host_id] = docker_host # Discover compose path for this host compose_path = await self._discover_compose_path_for_host(host_id) # Set compose path if discovered or provided if compose_path: docker_host.compose_path = compose_path compose_path_configs.append( { "host_id": host_id, "compose_path": compose_path, "discovered": True, } ) # Update the host in configuration self.config.hosts[host_id] = docker_host imported_hosts.append( { "host_id": host_id, "hostname": docker_host.hostname, "user": docker_host.user, "port": docker_host.port, "compose_path": docker_host.compose_path, } ) return imported_hosts, compose_path_configs async def _discover_compose_path_for_host( self, host_id: str, ) -> str | None: """Discover compose path for a specific host.""" # Try to discover compose path try: discovery_result = await self.compose_manager.discover_compose_locations(host_id) return discovery_result.get("suggested_path") except Exception as e: self.logger.debug( "Could not discover compose path for new host", host_id=host_id, error=str(e), ) return None def _format_import_results( self, imported_hosts: list[dict[str, Any]], compose_path_configs: list[dict[str, Any]], config_file_path: str | None = None, ) -> list[str]: """Format import results for display.""" summary_lines = [ "✅ SSH Config Import Completed", "=" * 35, "", f"Successfully imported {len(imported_hosts)} hosts:", "", ] for host_info in imported_hosts: summary_lines.append( f"• {host_info['host_id']} ({host_info['user']}@{host_info['hostname']})" ) if host_info["compose_path"]: summary_lines.append(f" Compose path: {host_info['compose_path']}") summary_lines.append("") if compose_path_configs: summary_lines.extend(["Compose Path Configuration:", "─" * 28]) for config in compose_path_configs: source = "discovered" if config["discovered"] else "manually set" summary_lines.append(f"• {config['host_id']}: {config['compose_path']} ({source})") summary_lines.append("") # Show actual config file path where configuration was saved config_file_message = f"Configuration saved to {config_file_path}." if config_file_path else "Configuration updated in memory." summary_lines.extend( [ config_file_message, "You can now use these hosts with deploy_stack and other tools.", ] ) return summary_lines

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server