Skip to main content
Glama
operations.py41.3 kB
""" Stack Operations Module Basic CRUD operations for Docker Compose stack management. Handles deployment, lifecycle management, listing, and compose file retrieval. """ from typing import Any import structlog from fastmcp.tools.tool import ToolResult from mcp.types import TextContent from ...core.config_loader import DockerMCPConfig from ...core.docker_context import DockerContextManager from ...tools.stacks import StackTools class StackOperations: """Core stack operations: deploy, manage, list, and compose file retrieval.""" def __init__(self, config: DockerMCPConfig, context_manager: DockerContextManager): self.config = config self.context_manager = context_manager self.stack_tools = StackTools(config, context_manager) self.logger = structlog.get_logger() def _validate_host(self, host_id: str) -> tuple[bool, str]: """Validate host exists in configuration.""" if host_id not in self.config.hosts: return False, f"Host '{host_id}' not found" return True, "" async def deploy_stack_with_partial_failure_handling( self, host_id: str, stack_name: str, compose_content: str, environment: dict[str, str] | None = None, pull_images: bool = True, recreate: bool = False, ) -> ToolResult: """Deploy a Docker Compose stack with comprehensive partial failure handling.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Track service-level results service_results = { "successful_services": [], "failed_services": [], "partial_success": False, "recovery_options": [] } # First, attempt normal deployment result = await self.stack_tools.deploy_stack( host_id, stack_name, compose_content, environment, pull_images, recreate ) if result["success"]: # Deployment succeeded, but verify individual services await self._verify_service_status(host_id, stack_name, service_results) if service_results["failed_services"]: service_results["partial_success"] = True service_results["recovery_options"] = [ "retry_failed_services", "restart_failed_services", "rollback_stack" ] formatted_text = self._format_deployment_result(stack_name, result, service_results) structured = {**result, "service_details": service_results, "formatted_output": formatted_text} return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) else: # Deployment failed, check for partial services that may have started await self._analyze_partial_deployment(host_id, stack_name, service_results) if service_results["successful_services"]: service_results["partial_success"] = True service_results["recovery_options"] = [ "retry_failed_services", "rollback_successful_services", "complete_manual_deployment" ] formatted_text = ( "Deployment failed with partial success. " f"{self._format_deployment_result(stack_name, result, service_results)}" ) structured = {**result, "service_details": service_results, "formatted_output": formatted_text} return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) except Exception as e: self.logger.error( "Failed to deploy stack with partial failure handling", host_id=host_id, stack_name=stack_name, error=str(e) ) formatted_text = f"❌ Failed to deploy stack: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), "host_id": host_id, "stack_name": stack_name, "formatted_output": formatted_text, }, ) async def deploy_stack( self, host_id: str, stack_name: str, compose_content: str, environment: dict[str, str] | None = None, pull_images: bool = True, recreate: bool = False, ) -> ToolResult: """Deploy a Docker Compose stack to a remote host.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use stack tools to deploy result = await self.stack_tools.deploy_stack( host_id, stack_name, compose_content, environment, pull_images, recreate ) if result["success"]: # Briefly wait for the project to become visible in list_stacks try: import asyncio as _asyncio await _asyncio.sleep(0.5) # Initial delay for deployment to settle for _ in range(5): list_result = await self.stack_tools.list_stacks(host_id) if any( isinstance(s, dict) and s.get("name", "").lower() == stack_name.lower() for s in list_result.get("stacks", []) ): break await _asyncio.sleep(1) except Exception as e: self.logger.debug( "Stack deployment verification failed", host_id=host_id, stack_name=stack_name, error=str(e), error_type=type(e).__name__, ) # Use enhanced deployment result formatting formatted_lines = self._format_deploy_result(result, stack_name, host_id) formatted_text = "\n".join(formatted_lines) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) else: formatted_text = ( "❌ Failed to deploy stack '", ) formatted_text = ( f"❌ Failed to deploy stack '{stack_name}': {result.get('error', 'Unknown error')}" ) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) except Exception as e: self.logger.error( "Failed to deploy stack", host_id=host_id, stack_name=stack_name, error=str(e) ) formatted_text = f"❌ Failed to deploy stack: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), "host_id": host_id, "stack_name": stack_name, "formatted_output": formatted_text, }, ) async def manage_stack( self, host_id: str, stack_name: str, action: str, options: dict[str, Any] | None = None ) -> ToolResult: """Unified stack lifecycle management.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use stack tools to manage stack result = await self.stack_tools.manage_stack(host_id, stack_name, action, options) if result["success"]: message_lines = self._format_stack_action_result(result, stack_name, action) formatted_text = "\n".join(message_lines) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) else: formatted_text = ( f"❌ Failed to {action} stack '{stack_name}': {result.get('error', 'Unknown error')}" ) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) except Exception as e: self.logger.error( "Failed to manage stack", host_id=host_id, stack_name=stack_name, action=action, error=str(e), ) formatted_text = f"❌ Failed to {action} stack: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), "host_id": host_id, "stack_name": stack_name, "action": action, "formatted_output": formatted_text, }, ) def _format_stack_action_result( self, result: dict[str, Any], stack_name: str, action: str ) -> list[str]: """Format stack action result for display with enhanced visual formatting.""" # Special handling for ps action if action == "ps": return self._format_ps_result(result, stack_name) # Header with action-specific emoji action_emoji = { "up": "🚀", "down": "🛑", "restart": "🔄", "build": "🔨", "pull": "📦", "logs": "📋" } emoji = action_emoji.get(action, "⚙️") message_lines = [] message_lines.append("─" * 50) message_lines.append(f"{emoji} Stack {action.title()}: {stack_name}") message_lines.append("─" * 50) if result.get("success"): message_lines.append(f"✅ Status: {action.upper()} completed successfully") else: message_lines.append(f"❌ Status: {action.upper()} failed") if error := result.get("error"): message_lines.append(f" Error: {error}") # Add output if available if output := result.get("output"): message_lines.append("") message_lines.append("📄 Command Output:") output_lines = output.split("\n") max_width = max((len(line) for line in output_lines), default=0) border = "┌" + "─" * (max_width + 2) + "┐" message_lines.append(border) for line in output_lines: message_lines.append(f"│ {line:<{max_width}} │") message_lines.append("└" + "─" * (max_width + 2) + "┘") message_lines.append("") return message_lines async def list_stacks(self, host_id: str) -> ToolResult: """List Docker Compose stacks on a host.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use stack tools to list stacks result = await self.stack_tools.list_stacks(host_id) if result["success"]: summary_lines = self._format_stacks_list(result, host_id) formatted_text = "\n".join(summary_lines) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) else: formatted_text = ( f"❌ Failed to list stacks: {result.get('error', 'Unknown error')}" ) structured = dict(result) structured["formatted_output"] = formatted_text return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=structured, ) except Exception as e: self.logger.error("Failed to list stacks", host_id=host_id, error=str(e)) formatted_text = f"❌ Failed to list stacks: {str(e)}" return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content={ "success": False, "error": str(e), "host_id": host_id, "formatted_output": formatted_text, }, ) def _format_stacks_list(self, result: dict[str, Any], host_id: str) -> list[str]: """Format stacks list for display - enhanced visual hierarchy with NO truncation.""" stacks = result["stacks"] # Count stacks by status status_counts = {} for stack in stacks: status = stack.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 status_summary = ", ".join(f"{status}: {count}" for status, count in status_counts.items()) summary_lines = [ f"Docker Compose Stacks on {host_id} ({len(stacks)} total)", f"Status breakdown: {status_summary}", "", ] summary_lines.extend([ " Stack Status Services", " ------------------------- ---------- --------------------", ]) for stack in stacks: status_indicator = {"running": "●", "partial": "◐", "stopped": "○"}.get( stack.get("status", "unknown"), "?" ) services = stack.get("services", []) if services: services_display = f"[{len(services)}] {', '.join(services)}" else: services_display = "[0]" stack_name = stack.get("name", "unknown") status = stack.get("status", "unknown") summary_lines.append( f"{status_indicator} {stack_name:<25} {status:<10} {services_display}" ) return summary_lines def _format_deploy_result(self, result: dict[str, Any], stack_name: str, host_id: str) -> list[str]: """Format deployment result with service-level progress visualization.""" lines = [] # Add header section self._add_deploy_header(lines, stack_name, host_id) # Add status section self._add_deploy_status(lines, result) # Add services section self._add_services_status(lines, result) # Add deployment info self._add_deploy_info(lines, result) # Add footer lines.append("") lines.append("═" * 60) return lines def _add_deploy_header(self, lines: list[str], stack_name: str, host_id: str) -> None: """Add deployment header to output lines.""" lines.append("═" * 60) lines.append(f"🚀 Stack Deployment: {stack_name} → {host_id}") lines.append("═" * 60) def _add_deploy_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add deployment status to output lines.""" if result.get("success"): lines.append("✅ Deployment Status: SUCCESS") else: lines.append("❌ Deployment Status: FAILED") if error := result.get("error"): lines.append(f" Error: {error}") lines.append("") def _add_services_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add services deployment status to output lines.""" services_data = result.get("data", {}).get("services", []) if not services_data: return lines.append("📋 Service Deployment Status:") lines.append("─" * 40) for service in services_data: name = service.get("Name", "Unknown") status = service.get("Status", "Unknown").lower() indicator, status_text = self._get_service_status_indicator(status) lines.append(f" {indicator} {name:<25} │ {status_text}") lines.append("") def _get_service_status_indicator(self, status: str) -> tuple[str, str]: """Get status indicator and text for a service.""" if "running" in status or "up" in status: if "healthy" in status: return "✅", "Healthy & Running" else: return "🟢", "Running" elif "starting" in status or "restarting" in status: return "🔄", "Starting" elif "unhealthy" in status: return "⚠️ ", "Unhealthy" elif "exited" in status or "stopped" in status: return "🔴", "Stopped" else: return "❓", status.title() def _add_deploy_info(self, lines: list[str], result: dict[str, Any]) -> None: """Add additional deployment information to output lines.""" if pull_info := result.get("pull_images"): lines.append(f"📦 Images: {'Pulled' if pull_info else 'Used cached'}") if recreate_info := result.get("recreate"): lines.append(f"🔄 Recreation: {'Forced' if recreate_info else 'Incremental'}") if compose_path := result.get("compose_path"): lines.append(f"📁 Deployed to: {compose_path}") def _format_ps_result(self, result: dict[str, Any], stack_name: str) -> list[str]: """Format ps action result with enhanced health status indicators.""" summary_lines: list[str] = [f"Stack Services: {stack_name}"] services = result.get("data", {}).get("services", []) if not services: summary_lines.append("No services reported (stack may be stopped)") return summary_lines # Add table header self._add_ps_table_header(summary_lines) # Process services and collect status counts status_counts = {"healthy": 0, "unhealthy": 0, "starting": 0, "stopped": 0} self._process_services_for_ps(summary_lines, services, status_counts) # Add summary self._add_ps_summary(summary_lines, services, status_counts) return summary_lines def _add_ps_table_header(self, summary_lines: list[str]) -> None: """Add table header for ps result.""" summary_lines.append("") summary_lines.append(f"{'State':<7} {'Service':<25} {'Health':<10} Ports") summary_lines.append( f"{'-' * 7:<7} {'-' * 25:<25} {'-' * 10:<10} {'-' * 30}" ) def _process_services_for_ps( self, summary_lines: list[str], services: list[dict[str, Any]], status_counts: dict[str, int] ) -> None: """Process services and add them to the ps result.""" for service in services: name = service.get("Name", "Unknown") status = (service.get("Status") or "").lower() ports = (service.get("Ports") or "-").replace("->", "→") state_icon, health_label = self._get_service_state_info(status, status_counts) summary_lines.append( f"{state_icon:<7} {name:<25} {health_label:<10} {ports}" ) def _get_service_state_info(self, status: str, status_counts: dict[str, int]) -> tuple[str, str]: """Get service state icon and health label based on status.""" if "running" in status or "up" in status: if "healthy" in status: status_counts["healthy"] += 1 return "●", "healthy" elif "unhealthy" in status: status_counts["unhealthy"] += 1 return "●", "unhealthy" else: status_counts["healthy"] += 1 return "●", "running" elif "starting" in status or "restarting" in status: status_counts["starting"] += 1 return "◐", "starting" elif "exited" in status or "stopped" in status: status_counts["stopped"] += 1 return "○", "stopped" else: status_counts["stopped"] += 1 return "?", "unknown" def _add_ps_summary( self, summary_lines: list[str], services: list[dict[str, Any]], status_counts: dict[str, int] ) -> None: """Add summary section to ps result.""" totals: list[str] = [] for status_type, count in status_counts.items(): if count > 0: totals.append(f"{status_type} {count}") summary_lines.append("") total_services = len(services) if totals: summary_lines.append(f"Summary: {total_services} services ({', '.join(totals)})") else: summary_lines.append(f"Summary: {total_services} services") async def _verify_service_status(self, host_id: str, stack_name: str, service_results: dict) -> None: """Verify the status of individual services after deployment.""" try: # Get stack services status ps_result = await self.stack_tools.manage_stack(host_id, stack_name, "ps") if ps_result.get("success") and ps_result.get("data", {}).get("services"): services = ps_result["data"]["services"] for service in services: service_name = service.get("Name", "Unknown") service_status = service.get("Status", "").lower() service_info = { "name": service_name, "status": service_status, "container_id": service.get("ID", ""), "image": service.get("Image", "") } if "running" in service_status or "up" in service_status: service_results["successful_services"].append(service_info) else: service_results["failed_services"].append(service_info) except Exception as e: self.logger.warning( "Failed to verify service status", host_id=host_id, stack_name=stack_name, error=str(e) ) # Add a generic failure indication service_results["failed_services"].append({ "name": "verification_failed", "status": "unknown", "error": str(e) }) async def _analyze_partial_deployment(self, host_id: str, stack_name: str, service_results: dict) -> None: """Analyze what services may have started despite deployment failure.""" try: # Check if any containers from this stack are running list_result = await self.stack_tools.list_stacks(host_id) if list_result.get("success") and list_result.get("stacks"): for stack in list_result["stacks"]: if stack.get("name") == stack_name: services = stack.get("services", []) stack_status = stack.get("status", "unknown") # If stack has partial status, some services might be running if stack_status == "partial" or services: for service_name in services: service_results["successful_services"].append({ "name": service_name, "status": "partially_running", "container_id": "unknown" }) break except Exception as e: self.logger.warning( "Failed to analyze partial deployment", host_id=host_id, stack_name=stack_name, error=str(e) ) def _format_deployment_result(self, stack_name: str, result: dict, service_results: dict) -> str: """Format deployment result with enhanced service details and visual hierarchy.""" lines = [] # Enhanced header with visual separation lines.append("═" * 60) if result.get("success"): lines.append(f"✅ Stack '{stack_name}' deployed successfully") else: lines.append(f"❌ Stack '{stack_name}' deployment failed: {result.get('error', 'Unknown error')}") lines.append("═" * 60) # Service status details with enhanced formatting if service_results.get("successful_services"): lines.append(f"\n🟢 Successful Services ({len(service_results['successful_services'])}):") lines.append("─" * 40) for service in service_results["successful_services"]: status = service.get('status', 'running') image = service.get('image', '') lines.append(f" ✅ {service['name']:<25} │ {status}") if image: lines.append(f" └─ Image: {image}") if service_results.get("failed_services"): lines.append(f"\n🔴 Failed Services ({len(service_results['failed_services'])}):") lines.append("─" * 40) for service in service_results["failed_services"]: status = service.get('status', 'failed') error = service.get('error', '') lines.append(f" ❌ {service['name']:<25} │ {status}") if error and error != 'failed': lines.append(f" └─ Error: {error}") # Recovery options with enhanced formatting self._format_recovery_section(lines, service_results) lines.append("═" * 60) return "\n".join(lines) def _format_service_sections(self, lines: list[str], service_results: dict) -> None: """Format successful and failed services sections.""" self._format_service_sections(lines, service_results) def _format_recovery_section(self, lines: list[str], service_results: dict) -> None: """Format recovery options and warnings section.""" if service_results.get("recovery_options"): lines.append("\n🔧 Recovery Options Available:") lines.append("─" * 30) for option in service_results["recovery_options"]: option_text = option.replace('_', ' ').title() lines.append(f" 🔄 {option_text}") if service_results.get("partial_success"): lines.append("\n⚠️ Partial deployment detected - manual intervention may be required") lines.append(" Consider using recovery options above to resolve issues") def _format_migrate_result(self, result: dict[str, Any], stack_name: str, source_host: str, target_host: str) -> list[str]: """Format migration result with clear step-by-step progress visualization.""" lines = [] # Add migration sections self._add_migration_header(lines, stack_name, source_host, target_host) self._add_migration_status(lines, result) self._add_migration_steps(lines, result) self._add_transfer_stats(lines, result) self._add_stack_status(lines, result) self._add_migration_summary(lines, result) self._add_migration_footer(lines, result) return lines def _add_migration_header(self, lines: list[str], stack_name: str, source_host: str, target_host: str) -> None: """Add migration header with stack name and host flow.""" lines.append("╔" + "═" * 58 + "╗") lines.append(f"║ 🚚 Stack Migration: {stack_name:<35} ║") lines.append(f"║ {source_host} ➡️ {target_host:<42} ║") lines.append("╚" + "═" * 58 + "╝") lines.append("") def _add_migration_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add overall migration status.""" if result.get("overall_success") or result.get("success"): lines.append("✅ Migration Status: COMPLETED SUCCESSFULLY") else: lines.append("❌ Migration Status: FAILED") if error := result.get("error"): lines.append(f" Primary Error: {error}") lines.append("") def _add_migration_steps(self, lines: list[str], result: dict[str, Any]) -> None: """Add migration steps progress.""" steps = result.get("migration_steps", []) if not steps: return lines.append("📋 Migration Progress:") for i, step in enumerate(steps, 1): step_name = step.get("name", f"Step {i}") step_status = step.get("status", "unknown") step_duration = step.get("duration_seconds", 0) status_icon, status_text = self._get_step_status_info( step_status, step_duration, step ) lines.append(f" {i}. {status_icon} {step_name} :: {status_text}") lines.append("") def _get_step_status_info(self, status: str, duration: float, step: dict[str, Any]) -> tuple[str, str]: """Get step status icon and text.""" if status == "completed": return "✅", f"Completed ({duration:.1f}s)" elif status == "failed": status_text = f"Failed ({duration:.1f}s)" if step_error := step.get("error"): status_text += f" - {step_error}" return "❌", status_text elif status == "in_progress": return "🔄", "In Progress..." elif status == "skipped": return "⏭️", "Skipped" else: return "⏸️", "Pending" def _add_transfer_stats(self, lines: list[str], result: dict[str, Any]) -> None: """Add data transfer statistics.""" transfer_stats = result.get("transfer_stats", {}) if not transfer_stats: return lines.append("📊 Transfer Statistics:") lines.append("─" * 30) if bytes_transferred := transfer_stats.get("bytes_transferred"): lines.append(f" 📦 Data transferred: {self._format_bytes(bytes_transferred)}") if transfer_speed := transfer_stats.get("transfer_speed_mbps"): lines.append(f" ⚡ Transfer speed: {transfer_speed:.1f} MB/s") if files_count := transfer_stats.get("files_transferred"): lines.append(f" 📁 Files transferred: {files_count}") lines.append("") def _add_stack_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add source and target stack status.""" self._add_source_stack_status(lines, result) self._add_target_stack_status(lines, result) def _add_source_stack_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add source stack status information.""" source_status = result.get("source_stack_status") if not source_status: return lines.append("🔹 Source Stack Status:") lines.append(f" Status: {source_status.get('status', 'unknown')}") if source_status.get("stopped"): lines.append(" ✅ Successfully stopped for migration") if source_status.get("removed") and result.get("remove_source"): lines.append(" 🗑️ Removed after successful migration") def _add_target_stack_status(self, lines: list[str], result: dict[str, Any]) -> None: """Add target stack status information.""" target_status = result.get("target_stack_status") if not target_status: return lines.append("") lines.append("🔸 Target Stack Status:") lines.append(f" Status: {target_status.get('status', 'unknown')}") target_services = target_status.get("services", []) if target_services: lines.append(f" Services: {len(target_services)} running") for service in target_services: service_name = service.get("name", "Unknown") service_status = service.get("status", "unknown") lines.append(f" • {service_name}: {service_status}") def _add_migration_summary(self, lines: list[str], result: dict[str, Any]) -> None: """Add migration summary information.""" lines.append("") lines.append("📈 Migration Summary:") lines.append("─" * 25) total_duration = result.get("total_duration_seconds", 0) lines.append(f" ⏱️ Total duration: {total_duration:.1f} seconds") if result.get("dry_run"): lines.append(" 🧪 Mode: Dry run (no actual changes made)") else: lines.append(" 🚀 Mode: Live migration") # Recommendations recommendations = result.get("recommendations", []) if recommendations: lines.append("") lines.append("💡 Recommendations:") lines.append("─" * 20) for rec in recommendations: lines.append(f" • {rec}") def _add_migration_footer(self, lines: list[str], result: dict[str, Any]) -> None: """Add migration result footer.""" lines.append("") lines.append("╔" + "═" * 58 + "╗") if result.get("overall_success") or result.get("success"): lines.append("║ ✅ Migration completed successfully! ║") else: lines.append("║ ❌ Migration failed - check logs for details ║") lines.append("╚" + "═" * 58 + "╝") def _format_bytes(self, bytes_count: int) -> str: """Format bytes count in human-readable format.""" if bytes_count < 1024: return f"{bytes_count} B" elif bytes_count < 1024 * 1024: return f"{bytes_count / 1024:.1f} KB" elif bytes_count < 1024 * 1024 * 1024: return f"{bytes_count / (1024 * 1024):.1f} MB" else: return f"{bytes_count / (1024 * 1024 * 1024):.1f} GB" async def retry_failed_services(self, host_id: str, stack_name: str, failed_services: list[str]) -> ToolResult: """Retry deployment for specific failed services.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) retry_results = { "retried_services": [], "successful_retries": [], "failed_retries": [] } for service_name in failed_services: try: # Try to restart the specific service restart_result = await self.stack_tools.manage_stack( host_id, stack_name, "restart", {"services": [service_name]} ) retry_results["retried_services"].append(service_name) if restart_result.get("success"): retry_results["successful_retries"].append(service_name) else: retry_results["failed_retries"].append({ "service": service_name, "error": restart_result.get("error", "Unknown error") }) except Exception as e: retry_results["failed_retries"].append({ "service": service_name, "error": str(e) }) # Format result message message_lines = [f"Service retry operation completed for stack '{stack_name}'"] if retry_results["successful_retries"]: message_lines.append(f"✓ Successfully restarted: {', '.join(retry_results['successful_retries'])}") if retry_results["failed_retries"]: message_lines.append("✗ Failed to restart:") for failure in retry_results["failed_retries"]: message_lines.append(f" • {failure['service']}: {failure['error']}") overall_success = len(retry_results["successful_retries"]) > 0 and len(retry_results["failed_retries"]) == 0 return ToolResult( content=[TextContent(type="text", text="\n".join(message_lines))], structured_content={ "success": overall_success, "retry_results": retry_results, "host_id": host_id, "stack_name": stack_name } ) except Exception as e: self.logger.error( "Failed to retry services", host_id=host_id, stack_name=stack_name, services=failed_services, error=str(e) ) return ToolResult( content=[TextContent(type="text", text=f"❌ Failed to retry services: {str(e)}")], structured_content={ "success": False, "error": str(e), "host_id": host_id, "stack_name": stack_name, }, ) async def get_stack_compose_file(self, host_id: str, stack_name: str) -> ToolResult: """Get the docker-compose.yml content for a specific stack.""" try: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return ToolResult( content=[TextContent(type="text", text=f"Error: {error_msg}")], structured_content={"success": False, "error": error_msg}, ) # Use stack tools to get the compose file content result = await self.stack_tools.get_stack_compose_content(host_id, stack_name) if result["success"]: compose_content = result.get("compose_content", "") return ToolResult( content=[TextContent(type="text", text=compose_content)], structured_content={ "success": True, "host_id": host_id, "stack_name": stack_name, "compose_content": compose_content, }, ) else: return ToolResult( content=[TextContent(type="text", text=f"❌ {result['error']}")], structured_content=result, ) except Exception as e: self.logger.error( "Failed to get stack compose file", host_id=host_id, stack_name=stack_name, error=str(e), ) return ToolResult( content=[TextContent(type="text", text=f"❌ Failed to get compose file: {str(e)}")], structured_content={ "success": False, "error": str(e), "host_id": host_id, "stack_name": stack_name, }, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server