"""Stack deployment MCP tools."""
import asyncio
import json
import shlex
import subprocess
import time
from collections.abc import Callable, Mapping
from datetime import datetime
from pathlib import Path
from typing import Any
import structlog
from ..core.compose_manager import ComposeManager
from ..core.config_loader import DockerHost, DockerMCPConfig
from ..core.docker_context import DockerContextManager
from ..core.exceptions import DockerCommandError, DockerContextError
from ..models.container import StackInfo
from ..utils import build_ssh_command
logger = structlog.get_logger()
class StackTools:
"""Stack deployment tools for MCP."""
def __init__(self, config: DockerMCPConfig, context_manager: DockerContextManager):
self.config = config
self.context_manager = context_manager
self.compose_manager = ComposeManager(config, context_manager)
async def deploy_stack(
self,
host_id: str,
stack_name: str,
compose_content: str,
environment: dict[str, str] | None = None,
pull_images: bool = True,
recreate: bool = False,
) -> dict[str, Any]:
"""Deploy a Docker Compose stack to a remote host with persistent compose files.
Args:
host_id: ID of the Docker host
stack_name: Name for the stack (used as project name)
compose_content: Docker Compose YAML content
environment: Environment variables for the stack
pull_images: Pull latest images before deploying
recreate: Recreate containers even if config hasn't changed
Returns:
Deployment result
"""
try:
# Validate stack name
if not self._validate_stack_name(stack_name):
return {
"success": False,
"error": f"Invalid stack name: {stack_name}. Must be alphanumeric with hyphens/underscores.",
"host_id": host_id,
"timestamp": datetime.now().isoformat(),
}
# Write compose file to persistent location on remote host
compose_file_path = await self.compose_manager.write_compose_file(
host_id, stack_name, compose_content
)
# Deploy using persistent compose file
result = await self._deploy_stack_with_persistent_file(
host_id, stack_name, compose_file_path, environment or {}, pull_images, recreate
)
logger.info(
"Stack deployment completed",
host_id=host_id,
stack_name=stack_name,
success=result["success"],
)
return result
except Exception as e:
logger.error(
"Stack deployment failed", host_id=host_id, stack_name=stack_name, error=str(e)
)
return {
"success": False,
"error": str(e),
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
async def list_stacks(self, host_id: str) -> dict[str, Any]:
"""List Docker Compose stacks on a host.
Args:
host_id: ID of the Docker host
Returns:
List of stacks
"""
try:
host_config = self.config.hosts.get(host_id)
if not host_config:
return {
"success": False,
"error": f"Host '{host_id}' not found",
"host_id": host_id,
"timestamp": datetime.now().isoformat(),
}
compose_list_output = await self._run_ssh_command(
host_config,
"docker compose ls --all --format json",
timeout=30,
)
if compose_list_output.returncode != 0:
error_msg = compose_list_output.stderr.strip() or compose_list_output.stdout.strip()
logger.error(
"Failed to list compose projects",
host_id=host_id,
error=error_msg,
)
return {
"success": False,
"error": f"docker compose ls failed: {error_msg}",
"host_id": host_id,
"timestamp": datetime.now().isoformat(),
}
projects = self._parse_compose_ls(compose_list_output.stdout)
stacks: list[dict[str, Any]] = []
for project in projects:
project_name = project.get("Name") or project.get("name")
if not project_name:
continue
compose_files = project.get("ConfigFiles") or project.get("config_files") or ""
compose_file = compose_files.split(",")[0].strip() if compose_files else None
ps_output = await self._run_ssh_command(
host_config,
self._build_compose_ps_command(project_name, compose_file),
timeout=30,
)
if ps_output.returncode != 0:
error_msg = ps_output.stderr.strip() or ps_output.stdout.strip()
logger.debug(
"docker compose ps failed",
host_id=host_id,
project=project_name,
error=error_msg,
)
services_info: list[dict[str, Any]] = []
else:
services_info = self._parse_compose_ps(ps_output.stdout)
raw_service_names = {svc.get("Service") or svc.get("service") or svc.get("Name") for svc in services_info}
filtered_service_names = [name for name in raw_service_names if name]
service_names = sorted(filtered_service_names)
service_states = [
(svc.get("State") or svc.get("state") or "").lower() for svc in services_info
]
if service_states and all(state.startswith("running") or state.startswith("up") for state in service_states):
aggregate_status = "running"
elif any(state.startswith("running") or state.startswith("up") for state in service_states):
aggregate_status = "partial"
else:
aggregate_status = (project.get("Status") or project.get("status") or "unknown").lower()
stack_info = StackInfo(
name=project_name,
host_id=host_id,
services=service_names, # Now properly typed as list[str]
status=aggregate_status,
created=self._parse_datetime(project.get("CreatedAt") or project.get("created")),
updated=self._parse_datetime(project.get("UpdatedAt") or project.get("updated")),
compose_file=compose_file,
).model_dump()
stacks.append(stack_info)
logger.info("Listed stacks", host_id=host_id, count=len(stacks))
return {
"success": True,
"stacks": stacks,
"host_id": host_id,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error("Failed to list stacks", host_id=host_id, error=str(e))
return {
"success": False,
"error": str(e),
"host_id": host_id,
"timestamp": datetime.now().isoformat(),
}
async def stop_stack(self, host_id: str, stack_name: str) -> dict[str, Any]:
"""Stop a Docker Compose stack.
Args:
host_id: ID of the Docker host
stack_name: Name of the stack to stop
Returns:
Operation result
"""
try:
if not self._validate_stack_name(stack_name):
return {
"success": False,
"error": f"Invalid stack name: {stack_name}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
cmd = f"compose --project-name {stack_name} stop"
await self.context_manager.execute_docker_command(host_id, cmd)
logger.info("Stack stopped", host_id=host_id, stack_name=stack_name)
return {
"success": True,
"message": f"Stack {stack_name} stopped successfully",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
except (DockerCommandError, DockerContextError) as e:
logger.error(
"Failed to stop stack", host_id=host_id, stack_name=stack_name, error=str(e)
)
return {
"success": False,
"error": str(e),
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
async def remove_stack(
self, host_id: str, stack_name: str, remove_volumes: bool = False
) -> dict[str, Any]:
"""Remove a Docker Compose stack.
Args:
host_id: ID of the Docker host
stack_name: Name of the stack to remove
remove_volumes: Also remove associated volumes
Returns:
Operation result
"""
try:
if not self._validate_stack_name(stack_name):
return {
"success": False,
"error": f"Invalid stack name: {stack_name}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
cmd = f"compose --project-name {stack_name} down"
if remove_volumes:
cmd += " --volumes"
await self.context_manager.execute_docker_command(host_id, cmd)
logger.info(
"Stack removed",
host_id=host_id,
stack_name=stack_name,
remove_volumes=remove_volumes,
)
return {
"success": True,
"message": f"Stack {stack_name} removed successfully",
"host_id": host_id,
"stack_name": stack_name,
"removed_volumes": remove_volumes,
"timestamp": datetime.now().isoformat(),
}
except (DockerCommandError, DockerContextError) as e:
logger.error(
"Failed to remove stack", host_id=host_id, stack_name=stack_name, error=str(e)
)
return {
"success": False,
"error": str(e),
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
async def _deploy_stack_with_persistent_file(
self,
host_id: str,
stack_name: str,
compose_file_path: str,
environment: dict[str, str],
pull_images: bool,
recreate: bool,
) -> dict[str, Any]:
"""Deploy stack using Docker context with persistent compose file."""
try:
# Get Docker context for the host
context_name = await self.context_manager.ensure_context(host_id)
# Pull images first if requested
if pull_images:
try:
await self._execute_compose_with_file(
context_name, stack_name, compose_file_path, ["pull"], environment
)
logger.info(
"Images pulled successfully", host_id=host_id, stack_name=stack_name
)
except Exception as e:
logger.warning(
"Image pull failed, continuing with deployment",
host_id=host_id,
stack_name=stack_name,
error=str(e),
)
# Build deployment command arguments
up_args = ["up", "-d"]
if recreate:
up_args.append("--force-recreate")
# Deploy the stack
result = await self._execute_compose_with_file(
context_name, stack_name, compose_file_path, up_args, environment
)
logger.info(
"Stack deployed successfully",
host_id=host_id,
stack_name=stack_name,
compose_file=compose_file_path,
)
return {
"success": True,
"message": f"Stack {stack_name} deployed successfully",
"output": result,
"host_id": host_id,
"stack_name": stack_name,
"compose_file": compose_file_path,
"timestamp": datetime.now().isoformat(),
}
except (DockerCommandError, DockerContextError) as e:
logger.error(
"Stack deployment failed", host_id=host_id, stack_name=stack_name, error=str(e)
)
return {
"success": False,
"error": str(e),
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(
"Unexpected deployment error", host_id=host_id, stack_name=stack_name, error=str(e)
)
return {
"success": False,
"error": f"Deployment failed: {e}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
async def _execute_compose_with_file(
self,
context_name: str,
project_name: str,
compose_file_path: str,
compose_args: list[str],
environment: dict[str, str] | None = None,
timeout: int = 300,
) -> str:
"""Execute docker compose command via SSH on remote host."""
# Extract directory from full path
compose_path = Path(compose_file_path)
project_directory = str(compose_path.parent)
# Build docker compose command to run on remote host
# Since we cd into the project directory, use relative path for compose file
compose_filename = compose_path.name
compose_cmd = [
"docker",
"compose",
"--project-name",
project_name,
"-f",
compose_filename,
]
compose_cmd.extend(compose_args)
# Get host config for SSH connection
host_id = self._extract_host_id_from_context(context_name)
host_config = self.config.hosts.get(host_id)
if not host_config:
raise DockerCommandError(f"Host {host_id} not found in configuration")
# Build SSH command
ssh_cmd = build_ssh_command(host_config)
# Build remote command
remote_cmd = self._build_remote_command(project_directory, compose_cmd, environment)
ssh_cmd.append(remote_cmd)
# Record start time and log operation start
start_time = time.monotonic()
compose_action = " ".join(compose_cmd)
logger.info(
"SSH compose operation started",
host_id=host_id,
ssh_host=host_config.hostname,
compose_action=compose_action, # Safe to log
# Note: remote_command may contain env vars with secrets - not logged
)
try:
result = await asyncio.to_thread(
subprocess.run, # nosec B603
ssh_cmd,
check=False,
text=True,
capture_output=True,
timeout=timeout,
)
# Calculate duration and log completion
duration = time.monotonic() - start_time
if result.returncode != 0:
error_msg = result.stderr.strip() or result.stdout.strip()
logger.error(
"SSH compose operation failed",
host_id=host_id,
ssh_host=host_config.hostname,
compose_action=compose_action,
duration=duration,
return_code=result.returncode,
error_message=error_msg
)
raise DockerCommandError(f"Docker compose command failed: {error_msg}")
logger.info(
"SSH compose operation completed",
host_id=host_id,
ssh_host=host_config.hostname,
compose_action=compose_action,
duration=duration
)
return result.stdout.strip()
except subprocess.TimeoutExpired as e:
duration = time.monotonic() - start_time
logger.error(
"SSH compose operation timed out",
host_id=host_id,
ssh_host=host_config.hostname,
compose_action=compose_action,
duration=duration,
timeout=timeout
)
raise DockerCommandError(f"Docker compose command timed out: {e}") from e
except Exception as e:
duration = time.monotonic() - start_time
logger.error(
"SSH compose operation failed with exception",
host_id=host_id,
ssh_host=host_config.hostname,
compose_action=compose_action,
duration=duration,
error=str(e)
)
if isinstance(e, DockerCommandError):
raise
raise DockerCommandError(f"Failed to execute docker compose: {e}") from e
def _extract_host_id_from_context(self, context_name: str) -> str:
"""Extract host_id from context_name."""
return (
context_name.replace("docker-mcp-", "")
if context_name.startswith("docker-mcp-")
else context_name
)
def _build_remote_command(
self, project_directory: str, compose_cmd: list[str], environment: dict[str, str] | None
) -> str:
"""Build remote Docker compose command with safe quoting."""
quoted_cd = f"cd {shlex.quote(project_directory)}"
env_prefix = ""
if environment:
parts = [f"{k}={shlex.quote(v)}" for k, v in environment.items()]
env_prefix = " " + " ".join(parts)
quoted_compose = " ".join(shlex.quote(arg) for arg in compose_cmd)
return f"{quoted_cd} &&{env_prefix} {quoted_compose}"
async def _run_ssh_command(
self, host: DockerHost, command: str, timeout: int = 60
) -> subprocess.CompletedProcess[str]:
ssh_cmd = build_ssh_command(host)
ssh_cmd.append(command)
return await asyncio.to_thread(
subprocess.run, # nosec B603
ssh_cmd,
check=False,
capture_output=True,
text=True,
timeout=timeout,
)
def _parse_compose_ls(self, output: str) -> list[dict[str, Any]]:
if not output:
return []
text = output.strip()
if not text:
return []
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return [parsed]
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
entries: list[dict[str, Any]] = []
for line in text.splitlines():
stripped_line = line.strip()
if not stripped_line:
continue
try:
entries.append(json.loads(stripped_line))
except json.JSONDecodeError:
logger.debug("Failed to parse docker compose ls line", line=stripped_line)
return entries
return []
def _build_compose_ps_command(self, project_name: str, compose_file: str | None) -> str:
command = ["docker", "compose", "--project-name", project_name]
if compose_file:
command.extend(["-f", compose_file])
command.extend(["ps", "--all", "--format", "json"])
return " ".join(shlex.quote(part) for part in command)
def _parse_compose_ps(self, output: str) -> list[dict[str, Any]]:
if not output:
return []
text = output.strip()
if not text:
return []
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return [parsed]
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
entries: list[dict[str, Any]] = []
for line in text.splitlines():
stripped_line = line.strip()
if not stripped_line:
continue
try:
entries.append(json.loads(stripped_line))
except json.JSONDecodeError:
logger.debug("Failed to parse docker compose ps line", line=stripped_line)
return entries
return []
def _parse_datetime(self, value: str | None) -> datetime | None:
if not value:
return None
value = value.strip()
if not value:
return None
try:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
return datetime.fromisoformat(value)
except ValueError:
return None
async def manage_stack(
self, host_id: str, stack_name: str, action: str, options: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Unified stack lifecycle management.
Note: Uses SSH execution instead of Docker context because Docker contexts
cannot access compose files on remote hosts. This is a fundamental Docker
limitation, not a bug in our code.
See: https://github.com/docker/compose/issues/9075
Args:
host_id: ID of the Docker host
stack_name: Name of the stack to manage
action: Action to perform (up, down, restart, build, pull, logs, ps)
options: Optional parameters for the action
Returns:
Operation result
"""
# Validate inputs
validation_error = self._validate_stack_inputs(host_id, stack_name, action)
if validation_error:
return validation_error
try:
options = options or {}
# Get compose file information
compose_info = await self._get_compose_file_info(host_id, stack_name)
# Always use SSH execution for consistency with deploy_stack
# This ensures we can access compose files on remote hosts and maintains
# consistent behavior across all stack operations
return await self._execute_stack_via_ssh(
host_id, stack_name, action, options, compose_info
)
except (DockerCommandError, DockerContextError) as e:
return self._build_error_response(host_id, stack_name, action, str(e))
async def _execute_stack_via_ssh(
self,
host_id: str,
stack_name: str,
action: str,
options: dict[str, Any],
compose_info: dict[str, Any],
) -> dict[str, Any]:
"""Execute stack command via SSH for all stack operations."""
try:
# Build compose arguments for the action
try:
compose_args = self._build_compose_args(action, options)
except ValueError as e:
return self._build_error_response(host_id, stack_name, action, str(e))
# Add service filter if specified
self._add_service_filter(compose_args, options)
# Get Docker context for SSH connection info
context_name = await self.context_manager.ensure_context(host_id)
# Validate compose file exists for the action
compose_file_validation = self._validate_compose_file_exists(
compose_info, action, host_id, stack_name
)
if isinstance(compose_file_validation, dict): # Error response
return compose_file_validation
compose_file_path = compose_file_validation
# Execute via SSH with appropriate timeout
timeout = self._determine_timeout(action)
result = await self._execute_compose_with_file(
context_name, stack_name, compose_file_path, compose_args, None, timeout
)
# Parse action-specific outputs
output_data = self._parse_action_output(action, result)
logger.info(
f"Stack {action} completed via SSH",
host_id=host_id,
stack_name=stack_name,
action=action,
)
return {
"success": True,
"message": f"Stack {stack_name} {action} completed successfully",
"host_id": host_id,
"stack_name": stack_name,
"action": action,
"options": options,
"output": result,
"data": output_data,
"execution_method": "ssh",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return self._build_error_response(host_id, stack_name, action, str(e))
def _build_compose_args(
self, action: str, options: dict[str, Any]
) -> list[str]:
"""Build compose arguments based on action and options.
Raises:
ValueError: If the action is not supported
"""
# Get the argument builder for the action
builders = self._get_compose_args_builders()
builder = builders.get(action)
if builder:
return builder(options)
else:
# Raise exception instead of returning error dict
raise ValueError(f"Action '{action}' not supported")
def _get_compose_args_builders(self) -> Mapping[str, Callable[..., list[str]]]:
"""Get mapping of actions to their argument builders."""
return {
"ps": self._build_ps_args,
"down": self._build_down_args,
"restart": self._build_restart_args,
"logs": self._build_logs_args,
"pull": self._build_pull_args,
"build": self._build_build_args,
"up": self._build_up_args,
}
def _build_ps_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for ps action."""
return ["ps", "--format", "json"]
def _build_down_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for down action."""
args = ["down"]
if options.get("volumes", False):
args.append("--volumes")
if options.get("remove_orphans", False):
args.append("--remove-orphans")
return args
def _build_restart_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for restart action."""
args = ["restart"]
if options.get("timeout"):
args.extend(["--timeout", str(options["timeout"])])
return args
def _build_logs_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for logs action."""
args = ["logs"]
if options.get("follow", False):
args.append("--follow")
if options.get("tail"):
args.extend(["--tail", str(options["tail"])])
return args
def _build_pull_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for pull action."""
args = ["pull"]
if options.get("ignore_pull_failures", False):
args.append("--ignore-pull-failures")
return args
def _build_build_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for build action."""
args = ["build"]
if options.get("no_cache", False):
args.append("--no-cache")
if options.get("pull", False):
args.append("--pull")
return args
def _build_up_args(self, options: dict[str, Any]) -> list[str]:
"""Build arguments for up action."""
args = ["up", "-d"]
if options.get("force_recreate", False):
args.append("--force-recreate")
if options.get("build", False):
args.append("--build")
if options.get("pull"):
pull_policy = options.get("pull_policy", "always")
args.extend(["--pull", pull_policy])
return args
def _add_service_filter(self, compose_args: list[str], options: dict[str, Any]) -> None:
"""Add service filter to compose arguments if specified."""
if options.get("services"):
services = options["services"]
if isinstance(services, list):
compose_args.extend(services)
else:
compose_args.append(str(services))
def _validate_compose_file_exists(
self, compose_info: dict[str, Any], action: str, host_id: str, stack_name: str
) -> str | dict[str, Any]:
"""Validate compose file exists and return path or error."""
if compose_info["exists"]:
return compose_info["path"]
else:
if action == "up":
return self._build_error_response(
host_id,
stack_name,
action,
f"No compose file found for stack '{stack_name}'. Use deploy_stack for new deployments.",
)
return self._build_error_response(
host_id,
stack_name,
action,
f"Cannot {action} stack '{stack_name}': no compose file found. Stack may not exist or was not deployed via this tool.",
)
def _determine_timeout(self, action: str) -> int:
"""Determine timeout based on action type."""
return 300 if action in ["up", "build"] else 60
def _parse_action_output(self, action: str, result: str) -> dict[str, Any] | None:
"""Parse action-specific outputs."""
if action == "ps":
return self._parse_ps_output_from_ssh(result)
return None
def _parse_ps_output_from_ssh(self, result: str) -> dict[str, Any]:
"""Parse docker compose ps output from SSH execution."""
try:
s = result.strip()
# First try full JSON (array or object)
parsed = json.loads(s)
if isinstance(parsed, list):
return {"services": parsed}
return {"services": [parsed]}
except Exception:
# Fallback: JSON-lines
services = []
for line in result.strip().split("\n"):
try:
if line.strip():
services.append(json.loads(line))
except json.JSONDecodeError:
logger.warning("Failed to parse JSON line from compose ps", line=line)
continue
return {"services": services}
def _validate_stack_inputs(
self, host_id: str, stack_name: str, action: str
) -> dict[str, Any] | None:
"""Validate stack management inputs."""
valid_actions = ["up", "down", "restart", "build", "pull", "logs", "ps"]
if action not in valid_actions:
return {
"success": False,
"error": f"Invalid action '{action}'. Valid actions: {', '.join(valid_actions)}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
if not self._validate_stack_name(stack_name):
return {
"success": False,
"error": f"Invalid stack name: {stack_name}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
return None
async def get_stack_compose_content(self, host_id: str, stack_name: str) -> dict[str, Any]:
"""Get the docker-compose.yml content for a specific stack."""
try:
compose_info = await self._get_compose_file_info(host_id, stack_name)
if not compose_info["exists"]:
return {
"success": False,
"error": f"Compose file not found for stack '{stack_name}'",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
# Get the compose file path
compose_file_path = compose_info["path"]
# Read the file content via SSH using centralized command builder
host = self.config.hosts[host_id]
ssh_cmd = build_ssh_command(host)
ssh_cmd.append(f"cat {shlex.quote(compose_file_path)}")
try:
result = await asyncio.to_thread(
subprocess.run, # nosec B603
ssh_cmd,
check=False,
capture_output=True,
text=True,
timeout=30,
)
except subprocess.TimeoutExpired as timeout_err:
# Extract SSH target from command for context
ssh_target = f"{host.user}@{host.hostname}"
logger.error(
"SSH cat command timeout",
compose_file_path=compose_file_path,
ssh_target=ssh_target,
timeout_seconds=30,
original_error=str(timeout_err)
)
return {
"success": False,
"error": f"Timeout reading compose file at {compose_file_path} from {ssh_target} (30s timeout)",
"host_id": host_id,
"stack_name": stack_name,
"compose_file_path": compose_file_path,
"ssh_target": ssh_target,
"timeout_seconds": 30,
"timestamp": datetime.now().isoformat(),
}
if result.returncode == 0:
return {
"success": True,
"host_id": host_id,
"stack_name": stack_name,
"compose_content": result.stdout,
"compose_file_path": compose_file_path,
"timestamp": datetime.now().isoformat(),
}
else:
return {
"success": False,
"error": f"Failed to read compose file: {result.stderr}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"success": False,
"error": f"Failed to get compose content: {str(e)}",
"host_id": host_id,
"stack_name": stack_name,
"timestamp": datetime.now().isoformat(),
}
async def _get_compose_file_info(self, host_id: str, stack_name: str) -> dict[str, Any]:
"""Get compose file information for a stack."""
compose_file_exists = await self.compose_manager.compose_file_exists(host_id, stack_name)
if compose_file_exists:
compose_file_path = await self.compose_manager.get_compose_file_path(
host_id, stack_name
)
# Use just the filename since we cd into the project directory
compose_filename = Path(compose_file_path).name
return {
"exists": True,
"path": compose_file_path,
"base_cmd": f"compose --project-name {stack_name} -f {compose_filename}",
}
else:
return {
"exists": False,
"path": None,
"base_cmd": f"compose --project-name {stack_name}",
}
def _build_error_response(
self, host_id: str, stack_name: str, action: str, error: str
) -> dict[str, Any]:
"""Build error response."""
logger.error(
f"Failed to {action} stack",
host_id=host_id,
stack_name=stack_name,
action=action,
error=error,
)
return {
"success": False,
"error": error,
"host_id": host_id,
"stack_name": stack_name,
"action": action,
"timestamp": datetime.now().isoformat(),
}
def _validate_stack_name(self, stack_name: str) -> bool:
"""Validate stack name for security and Docker Compose compatibility."""
import re
# Must be alphanumeric with hyphens/underscores, no spaces
pattern = r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$"
if not re.match(pattern, stack_name):
return False
# Additional length check
if len(stack_name) > 63: # Docker limit
return False
# Reserved names
reserved = {"docker", "compose", "system", "network", "volume"}
if stack_name.lower() in reserved:
return False
return True