Skip to main content
Glama
handlers.py38.1 kB
from typing import List, Dict, Any import asyncio import os import yaml import platform from python_on_whales import DockerClient from mcp.types import TextContent, Tool, Prompt, PromptArgument, GetPromptResult, PromptMessage from .docker_executor import DockerComposeExecutor # Ensure docker is in PATH if '/usr/bin' not in os.environ.get('PATH', ''): os.environ['PATH'] = f"/usr/bin:{os.environ.get('PATH', '')}" docker_client = DockerClient() async def parse_port_mapping(host_key: str, container_port: str | int) -> tuple[str, str] | tuple[str, str, str]: if '/' in str(host_key): host_port, protocol = host_key.split('/') if protocol.lower() == 'udp': return (str(host_port), str(container_port), 'udp') return (str(host_port), str(container_port)) if isinstance(container_port, str) and '/' in container_port: port, protocol = container_port.split('/') if protocol.lower() == 'udp': return (str(host_key), port, 'udp') return (str(host_key), port) return (str(host_key), str(container_port)) class DockerHandlers: TIMEOUT_AMOUNT = 200 @staticmethod async def handle_create_container(arguments: Dict[str, Any]) -> List[TextContent]: try: image = arguments["image"] container_name = arguments.get("name") ports = arguments.get("ports", {}) environment = arguments.get("environment", {}) if not image: raise ValueError("Image name cannot be empty") port_mappings = [] for host_key, container_port in ports.items(): mapping = await parse_port_mapping(host_key, container_port) port_mappings.append(mapping) async def pull_and_run(): if not docker_client.image.exists(image): await asyncio.to_thread(docker_client.image.pull, image) container = await asyncio.to_thread( docker_client.container.run, image, name=container_name, publish=port_mappings, envs=environment, detach=True ) return container container = await asyncio.wait_for(pull_and_run(), timeout=DockerHandlers.TIMEOUT_AMOUNT) return [TextContent(type="text", text=f"Created container '{container.name}' (ID: {container.id})")] except asyncio.TimeoutError: return [TextContent(type="text", text=f"Operation timed out after {DockerHandlers.TIMEOUT_AMOUNT} seconds")] except Exception as e: return [TextContent(type="text", text=f"Error creating container: {str(e)} | Arguments: {arguments}")] @staticmethod async def handle_deploy_compose(arguments: Dict[str, Any]) -> List[TextContent]: debug_info = [] try: compose_yaml = arguments.get("compose_yaml") compose_file = arguments.get("compose_file") project_name = arguments.get("project_name") if not project_name: raise ValueError("Missing required project_name") if not compose_yaml and not compose_file: raise ValueError("Either compose_yaml or compose_file must be provided") # Handle local file path if compose_file: if not os.path.exists(compose_file): raise ValueError(f"Docker Compose file not found: {compose_file}") compose_path = compose_file debug_info.append(f"Using local compose file: {compose_file}") cleanup_needed = False else: # Handle inline YAML content yaml_content = DockerHandlers._process_yaml( compose_yaml, debug_info) compose_path = DockerHandlers._save_compose_file( yaml_content, project_name) cleanup_needed = True try: result = await DockerHandlers._deploy_stack(compose_path, project_name, debug_info) return [TextContent(type="text", text=result)] finally: if cleanup_needed: DockerHandlers._cleanup_files(compose_path) except Exception as e: debug_output = "\n".join(debug_info) return [TextContent(type="text", text=f"Error deploying compose stack: {str(e)}\n\nDebug Information:\n{debug_output}")] @staticmethod def _process_yaml(compose_yaml: str, debug_info: List[str]) -> dict: debug_info.append("=== Original YAML ===") debug_info.append(compose_yaml) try: yaml_content = yaml.safe_load(compose_yaml) debug_info.append("\n=== Loaded YAML Structure ===") debug_info.append(str(yaml_content)) return yaml_content except yaml.YAMLError as e: raise ValueError(f"Invalid YAML format: {str(e)}") @staticmethod def _save_compose_file(yaml_content: dict, project_name: str) -> str: compose_dir = os.path.join(os.getcwd(), "docker_compose_files") os.makedirs(compose_dir, exist_ok=True) compose_yaml = yaml.safe_dump( yaml_content, default_flow_style=False, sort_keys=False) compose_path = os.path.join( compose_dir, f"{project_name}-docker-compose.yml") with open(compose_path, 'w', encoding='utf-8') as f: f.write(compose_yaml) f.flush() if platform.system() != 'Windows': os.fsync(f.fileno()) return compose_path @staticmethod async def _deploy_stack(compose_path: str, project_name: str, debug_info: List[str]) -> str: compose = DockerComposeExecutor(compose_path, project_name) for command in [compose.down, compose.up]: try: code, out, err = await command() debug_info.extend([ f"\n=== {command.__name__.capitalize()} Command ===", f"Return Code: {code}", f"Stdout: {out}", f"Stderr: {err}" ]) if code != 0 and command == compose.up: raise Exception(f"Deploy failed with code {code}: {err}") except Exception as e: if command != compose.down: raise e debug_info.append(f"Warning during {command.__name__}: {str(e)}") code, out, err = await compose.ps() service_info = out if code == 0 else "Unable to list services" return (f"Successfully deployed compose stack '{project_name}'\n" f"Running services:\n{service_info}\n\n" f"Debug Info:\n{chr(10).join(debug_info)}") @staticmethod def _cleanup_files(compose_path: str) -> None: try: if os.path.exists(compose_path): os.remove(compose_path) compose_dir = os.path.dirname(compose_path) if os.path.exists(compose_dir) and not os.listdir(compose_dir): os.rmdir(compose_dir) except Exception as e: print(f"Warning during cleanup: {str(e)}") @staticmethod async def handle_get_logs(arguments: Dict[str, Any]) -> List[TextContent]: debug_info = [] try: container_name = arguments.get("container_name") if not container_name: raise ValueError("Missing required container_name") # Optional parameters tail = arguments.get("tail", 100) follow = arguments.get("follow", False) timestamps = arguments.get("timestamps", False) since = arguments.get("since", None) until = arguments.get("until", None) debug_info.append(f"Fetching logs for container '{container_name}' (tail={tail}, follow={follow}, timestamps={timestamps})") # First, check if container exists and get its current state try: container_info = await asyncio.to_thread(docker_client.container.inspect, container_name) container_state = container_info.state.status debug_info.append(f"Container state: {container_state}") if container_state in ["dead", "removing"]: return [TextContent(type="text", text=f"Container '{container_name}' is {container_state} and cannot provide logs")] except Exception as inspect_error: debug_info.append(f"Could not inspect container: {str(inspect_error)}") # Continue anyway, the logs command might still work # Build kwargs for logs command log_kwargs = { "tail": tail, "follow": follow, "timestamps": timestamps } if since: log_kwargs["since"] = since if until: log_kwargs["until"] = until # Get logs logs = await asyncio.to_thread(docker_client.container.logs, container_name, **log_kwargs) # Format the output output = f"Logs for container '{container_name}'" if since or until: output += f" (from {since or 'start'} to {until or 'now'})" output += f":\n{logs}" return [TextContent(type="text", text=output)] except Exception as e: error_msg = str(e) # Provide more helpful error messages if "dead or marked for removal" in error_msg: return [TextContent(type="text", text=f"Container '{container_name}' has been removed or recreated. Please use 'list-containers' to find the current container name.")] elif "No such container" in error_msg: return [TextContent(type="text", text=f"Container '{container_name}' not found. Please use 'list-containers' to see available containers.")] else: debug_output = "\n".join(debug_info) return [TextContent(type="text", text=f"Error retrieving logs: {error_msg}\n\nDebug Information:\n{debug_output}")] @staticmethod async def handle_list_containers(arguments: Dict[str, Any] | None) -> List[TextContent]: debug_info = [] try: show_all = True if not arguments else arguments.get("all", True) filters = arguments.get("filters", {}) if arguments else {} debug_info.append(f"Listing Docker containers (all={show_all}, filters={filters})") # Get all containers first containers = await asyncio.to_thread(docker_client.container.list, all=show_all) # Apply filters manually if needed if filters: filtered_containers = [] for c in containers: # Status filter if filters.get("status") and c.state.status != filters["status"]: continue # Name filter (pattern matching) if filters.get("name") and filters["name"] not in c.name: continue # ID filter (prefix matching) if filters.get("id") and not c.id.startswith(filters["id"]): continue # Label filter if filters.get("label"): label_match = False if "=" in filters["label"]: key, value = filters["label"].split("=", 1) if c.config.labels.get(key) == value: label_match = True else: # Just check if label key exists if filters["label"] in c.config.labels: label_match = True if not label_match: continue # Network filter if filters.get("network"): try: inspect_data = await asyncio.to_thread(docker_client.container.inspect, c.name) if filters["network"] not in inspect_data.network_settings.networks: continue except: continue filtered_containers.append(c) containers = filtered_containers container_info = [] for c in containers: ports = [] # Try to get port bindings from inspect data try: inspect_data = await asyncio.to_thread(docker_client.container.inspect, c.name) port_bindings = inspect_data.network_settings.ports if port_bindings: for container_port, host_bindings in port_bindings.items(): if host_bindings: for binding in host_bindings: host_ip = binding.get('HostIp', '0.0.0.0') host_port = binding.get('HostPort', '') if host_port: ports.append(f"{host_ip}:{host_port}->{container_port}") except: # Fallback to basic port info if hasattr(c, 'ports') and c.ports: for port_mapping in c.ports.items(): ports.append(f"{port_mapping[0]}->{port_mapping[1]}") ports_str = ", ".join(ports) if ports else "No ports" status = c.state.status if status == "running" and hasattr(c.state, 'started_at'): status = f"Up {c.state.running_for}" if hasattr(c.state, 'running_for') else "Running" # Check health status try: if hasattr(inspect_data.state, 'health') and inspect_data.state.health: health_status = inspect_data.state.health.status if health_status: status = f"{status} ({health_status})" except: pass elif status == "exited": status = f"Exited ({c.state.exit_code})" info = f"• {c.name} ({c.id[:12]})\n Image: {c.config.image}\n Status: {status}\n Ports: {ports_str}" container_info.append(info) result = "\n\n".join(container_info) if container_info else "No containers found" # Add filter summary if filters were applied output = "Docker Containers" if filters: filter_desc = [] for key, value in filters.items(): filter_desc.append(f"{key}={value}") output += f" (filtered by: {', '.join(filter_desc)})" output += f":\n\n{result}" # Add count summary if container_info: output += f"\n\nTotal: {len(container_info)} container(s)" return [TextContent(type="text", text=output)] except Exception as e: debug_output = "\n".join(debug_info) return [TextContent(type="text", text=f"Error listing containers: {str(e)}\n\nDebug Information:\n{debug_output}")] @staticmethod async def handle_stop_container(arguments: Dict[str, Any]) -> List[TextContent]: try: container_name = arguments.get("container_name") if not container_name: raise ValueError("Missing required container_name") await asyncio.to_thread(docker_client.container.stop, container_name) return [TextContent(type="text", text=f"Successfully stopped container '{container_name}'")] except Exception as e: return [TextContent(type="text", text=f"Error stopping container: {str(e)}")] @staticmethod async def handle_start_container(arguments: Dict[str, Any]) -> List[TextContent]: try: container_name = arguments.get("container_name") if not container_name: raise ValueError("Missing required container_name") await asyncio.to_thread(docker_client.container.start, container_name) return [TextContent(type="text", text=f"Successfully started container '{container_name}'")] except Exception as e: return [TextContent(type="text", text=f"Error starting container: {str(e)}")] @staticmethod async def handle_remove_container(arguments: Dict[str, Any]) -> List[TextContent]: try: container_name = arguments.get("container_name") force = arguments.get("force", False) if not container_name: raise ValueError("Missing required container_name") # Check if container is running and force is not set if not force: try: container_info = await asyncio.to_thread(docker_client.container.inspect, container_name) if container_info.state.status == "running": return [TextContent(type="text", text=f"Container '{container_name}' is running. Use force=true to remove it, or stop it first.")] except: pass await asyncio.to_thread(docker_client.container.remove, container_name, force=force) return [TextContent(type="text", text=f"Successfully removed container '{container_name}'")] except Exception as e: return [TextContent(type="text", text=f"Error removing container: {str(e)}")] @staticmethod async def handle_list_images(arguments: Dict[str, Any] | None) -> List[TextContent]: try: images = await asyncio.to_thread(docker_client.image.list) image_info = [] for img in images: # Get tags tags = img.repo_tags if hasattr(img, 'repo_tags') else [] tag_str = ", ".join(tags) if tags else "<none>" # Get size size_mb = img.size / (1024 * 1024) if hasattr(img, 'size') else 0 size_str = f"{size_mb:.1f}MB" # Get created time created = img.attrs.get('Created', 'Unknown') if hasattr(img, 'attrs') else 'Unknown' info = f"• {tag_str}\n ID: {img.id[:12]}\n Size: {size_str}\n Created: {created}" image_info.append(info) result = "\n\n".join(image_info) if image_info else "No images found" return [TextContent(type="text", text=f"Docker Images:\n\n{result}")] except Exception as e: return [TextContent(type="text", text=f"Error listing images: {str(e)}")] @staticmethod async def handle_pull_image(arguments: Dict[str, Any]) -> List[TextContent]: try: image = arguments.get("image") if not image: raise ValueError("Missing required image name") # Pull the image result = await asyncio.to_thread(docker_client.image.pull, image) return [TextContent(type="text", text=f"Successfully pulled image '{image}'")] except Exception as e: return [TextContent(type="text", text=f"Error pulling image: {str(e)}")] @staticmethod async def handle_remove_image(arguments: Dict[str, Any]) -> List[TextContent]: try: image = arguments.get("image") force = arguments.get("force", False) if not image: raise ValueError("Missing required image name or ID") # Remove the image await asyncio.to_thread(docker_client.image.remove, image, force=force) return [TextContent(type="text", text=f"Successfully removed image '{image}'")] except Exception as e: return [TextContent(type="text", text=f"Error removing image: {str(e)}")] @staticmethod async def handle_list_volumes(arguments: Dict[str, Any] | None) -> List[TextContent]: try: filters = arguments.get("filters", {}) if arguments else {} volumes = await asyncio.to_thread(docker_client.volume.list, filters=filters) volume_info = [] for vol in volumes: # Get volume details name = vol.name if hasattr(vol, 'name') else 'Unknown' driver = vol.driver if hasattr(vol, 'driver') else 'local' mountpoint = vol.mountpoint if hasattr(vol, 'mountpoint') else 'Unknown' # Get labels labels = vol.labels if hasattr(vol, 'labels') else {} label_str = ", ".join([f"{k}={v}" for k, v in labels.items()]) if labels else "No labels" # Get scope scope = vol.scope if hasattr(vol, 'scope') else 'local' info = f"• {name}\n Driver: {driver}\n Scope: {scope}\n Mountpoint: {mountpoint}\n Labels: {label_str}" volume_info.append(info) result = "\n\n".join(volume_info) if volume_info else "No volumes found" return [TextContent(type="text", text=f"Docker Volumes:\n\n{result}")] except Exception as e: return [TextContent(type="text", text=f"Error listing volumes: {str(e)}")] @staticmethod async def handle_remove_volume(arguments: Dict[str, Any]) -> List[TextContent]: try: volume_name = arguments.get("volume_name") force = arguments.get("force", False) if not volume_name: raise ValueError("Missing required volume_name") # Remove the volume await asyncio.to_thread(docker_client.volume.remove, volume_name, force=force) return [TextContent(type="text", text=f"Successfully removed volume '{volume_name}'")] except Exception as e: return [TextContent(type="text", text=f"Error removing volume: {str(e)}")] @staticmethod async def handle_compose_down(arguments: Dict[str, Any]) -> List[TextContent]: debug_info = [] try: project_name = arguments.get("project_name") compose_file = arguments.get("compose_file") remove_volumes = arguments.get("remove_volumes", False) remove_images = arguments.get("remove_images", False) if not project_name: raise ValueError("Missing required project_name") # If compose_file is provided, use it if compose_file: if not os.path.exists(compose_file): raise ValueError(f"Docker Compose file not found: {compose_file}") compose_path = compose_file debug_info.append(f"Using local compose file: {compose_file}") else: # Try to find existing compose file for the project compose_dir = os.path.join(os.getcwd(), "docker_compose_files") compose_path = os.path.join(compose_dir, f"{project_name}-docker-compose.yml") # If no file exists, we'll try to stop by project name only if not os.path.exists(compose_path): compose_path = None debug_info.append(f"No compose file found, attempting to stop by project name: {project_name}") # Execute docker-compose down compose = DockerComposeExecutor(compose_path, project_name) # Build additional flags extra_args = [] if remove_volumes: extra_args.append("--volumes") if remove_images: extra_args.append("--rmi") extra_args.append("all") # Custom down method with extra arguments async def down_with_options(): if compose_path: cmd = ["docker-compose", "-f", compose_path, "-p", project_name, "down"] + extra_args else: cmd = ["docker-compose", "-p", project_name, "down"] + extra_args process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() code, out, err = await down_with_options() debug_info.extend([ f"\n=== Docker Compose Down ===", f"Return Code: {code}", f"Stdout: {out}", f"Stderr: {err}" ]) if code == 0: result = f"Successfully stopped and removed compose stack '{project_name}'" if remove_volumes: result += " (volumes removed)" if remove_images: result += " (images removed)" result += f"\n\nOutput:\n{out}" else: result = f"Failed to stop compose stack '{project_name}': {err}" return [TextContent(type="text", text=f"{result}\n\nDebug Info:\n{chr(10).join(debug_info)}")] except Exception as e: debug_output = "\n".join(debug_info) return [TextContent(type="text", text=f"Error stopping compose stack: {str(e)}\n\nDebug Information:\n{debug_output}")] @staticmethod async def handle_get_container_stats(arguments: Dict[str, Any]) -> List[TextContent]: try: container_name = arguments.get("container_name") if not container_name: raise ValueError("Missing required container_name") # Get stats directly from docker_client.container.stats stats_list = await asyncio.to_thread(docker_client.container.stats, container_name) # python-on-whales returns a list of ContainerStats objects if not stats_list or len(stats_list) == 0: return [TextContent(type="text", text=f"No stats available for container '{container_name}'")] # Get the first (and usually only) stats object stats = stats_list[0] # Extract data from ContainerStats object # The stats object has attributes instead of dict keys try: # CPU percentage (already calculated by python-on-whales) cpu_percent = stats.cpu_percentage if hasattr(stats, 'cpu_percentage') else 0.0 # Memory usage memory_usage = stats.memory_used if hasattr(stats, 'memory_used') else 0 memory_limit = stats.memory_limit if hasattr(stats, 'memory_limit') else 0 memory_percent = stats.memory_percentage if hasattr(stats, 'memory_percentage') else 0 # Network I/O (might be aggregated) network_rx = stats.network_rx_bytes if hasattr(stats, 'network_rx_bytes') else 0 network_tx = stats.network_tx_bytes if hasattr(stats, 'network_tx_bytes') else 0 # Block I/O block_read = stats.blkio_read_bytes if hasattr(stats, 'blkio_read_bytes') else 0 block_write = stats.blkio_write_bytes if hasattr(stats, 'blkio_write_bytes') else 0 # Format the stats stats_info = f"""Container Stats for '{container_name}': CPU Usage: {cpu_percent:.2f}% Memory Usage: {memory_usage / (1024*1024):.2f} MB / {memory_limit / (1024*1024):.2f} MB ({memory_percent:.2f}%) Network I/O: RX {network_rx / (1024*1024):.2f} MB / TX {network_tx / (1024*1024):.2f} MB Block I/O: Read {block_read / (1024*1024):.2f} MB / Write {block_write / (1024*1024):.2f} MB""" return [TextContent(type="text", text=stats_info)] except AttributeError as ae: # If the object doesn't have expected attributes, show what it has available_attrs = [attr for attr in dir(stats) if not attr.startswith('_')] return [TextContent(type="text", text=f"Error: ContainerStats object structure is different than expected.\nAvailable attributes: {', '.join(available_attrs)}\nError: {str(ae)}")] except Exception as e: return [TextContent(type="text", text=f"Error getting container stats: {str(e)}")] @staticmethod async def handle_exec_container(arguments: Dict[str, Any]) -> List[TextContent]: try: container_name = arguments.get("container_name") command = arguments.get("command") if not container_name or not command: raise ValueError("Missing required container_name or command") # Optional parameters user = arguments.get("user") workdir = arguments.get("workdir") env = arguments.get("env", {}) privileged = arguments.get("privileged", False) detach = arguments.get("detach", False) # Build exec kwargs exec_kwargs = {} if user: exec_kwargs["user"] = user if workdir: exec_kwargs["workdir"] = workdir if env: exec_kwargs["environment"] = env if privileged: exec_kwargs["privileged"] = privileged if detach: exec_kwargs["detach"] = detach # Execute the command # Split command into list if it's a string if isinstance(command, str): import shlex command_list = shlex.split(command) else: command_list = command result = await asyncio.to_thread( docker_client.container.execute, container_name, command_list, **exec_kwargs ) if detach: return [TextContent(type="text", text=f"Command executed in background in container '{container_name}'")] else: # Format output output = f"Executed in container '{container_name}':\n$ {command}\n\n" if result: output += str(result) else: output += "(No output)" return [TextContent(type="text", text=output)] except Exception as e: error_msg = str(e) if "No such container" in error_msg: return [TextContent(type="text", text=f"Container '{container_name}' not found. Please use 'list-containers' to see available containers.")] elif "is not running" in error_msg: return [TextContent(type="text", text=f"Container '{container_name}' is not running. Only running containers can execute commands.")] else: return [TextContent(type="text", text=f"Error executing command: {error_msg}")] @staticmethod async def handle_compose_ps(arguments: Dict[str, Any]) -> List[TextContent]: try: project_name = arguments.get("project_name") if not project_name: raise ValueError("Missing required project_name") show_all = arguments.get("all", False) # Get containers with compose project label containers = await asyncio.to_thread(docker_client.container.list, all=show_all) # Filter by compose project compose_containers = [] for c in containers: labels = c.config.labels if labels.get("com.docker.compose.project") == project_name: compose_containers.append(c) if not compose_containers: return [TextContent(type="text", text=f"No containers found for project '{project_name}'")] # Format output similar to docker-compose ps output_lines = [f"Containers for project '{project_name}':", ""] output_lines.append(f"{'SERVICE':<20} {'STATUS':<20} {'PORTS'}") output_lines.append("-" * 70) for c in compose_containers: service_name = c.config.labels.get("com.docker.compose.service", "unknown") status = c.state.status # Get ports ports = [] try: inspect_data = await asyncio.to_thread(docker_client.container.inspect, c.name) port_bindings = inspect_data.network_settings.ports if port_bindings: for container_port, host_bindings in port_bindings.items(): if host_bindings: for binding in host_bindings: host_port = binding.get('HostPort', '') if host_port: ports.append(f"{host_port}->{container_port}") except: pass ports_str = ", ".join(ports) if ports else "No ports" output_lines.append(f"{service_name:<20} {status:<20} {ports_str}") output_lines.append(f"\nTotal: {len(compose_containers)} container(s)") return [TextContent(type="text", text="\n".join(output_lines))] except Exception as e: return [TextContent(type="text", text=f"Error listing compose containers: {str(e)}")] @staticmethod async def handle_compose_logs(arguments: Dict[str, Any]) -> List[TextContent]: try: project_name = arguments.get("project_name") if not project_name: raise ValueError("Missing required project_name") service = arguments.get("service") tail = arguments.get("tail", 100) follow = arguments.get("follow", False) timestamps = arguments.get("timestamps", False) # Get containers for the project containers = await asyncio.to_thread(docker_client.container.list, all=True) # Filter by compose project and optionally by service target_containers = [] for c in containers: labels = c.config.labels if labels.get("com.docker.compose.project") == project_name: if not service or labels.get("com.docker.compose.service") == service: target_containers.append(c) if not target_containers: if service: return [TextContent(type="text", text=f"No containers found for service '{service}' in project '{project_name}'")] else: return [TextContent(type="text", text=f"No containers found for project '{project_name}'")] # Collect logs from all containers output_lines = [f"Logs for project '{project_name}'"] if service: output_lines[0] += f" (service: {service})" output_lines.append("=" * 60) for c in target_containers: service_name = c.config.labels.get("com.docker.compose.service", "unknown") container_num = c.config.labels.get("com.docker.compose.container-number", "1") # Build log kwargs log_kwargs = { "tail": tail, "follow": follow, "timestamps": timestamps } try: logs = await asyncio.to_thread(docker_client.container.logs, c.name, **log_kwargs) # Add service prefix to each log line if logs: log_lines = logs.strip().split('\n') for line in log_lines: output_lines.append(f"[{service_name}_{container_num}] {line}") except Exception as e: output_lines.append(f"[{service_name}_{container_num}] Error getting logs: {str(e)}") return [TextContent(type="text", text="\n".join(output_lines))] except Exception as e: return [TextContent(type="text", text=f"Error getting compose logs: {str(e)}")]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/diegofornalha/docker-mcp-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server