Skip to main content
Glama
logs.py21.7 kB
"""Log streaming MCP tools.""" import asyncio import re import uuid from datetime import UTC, datetime from typing import Any import docker import structlog from ..core.config_loader import DockerMCPConfig from ..core.docker_context import DockerContextManager from ..core.error_response import DockerMCPErrorResponse, create_success_response from ..core.exceptions import DockerCommandError, DockerContextError from ..models.container import ContainerLogs, LogStreamRequest logger = structlog.get_logger() class LogTools: """Log management tools for MCP.""" def __init__(self, config: DockerMCPConfig, context_manager: DockerContextManager): self.config = config self.context_manager = context_manager self._init_log_sanitization_patterns() def _init_log_sanitization_patterns(self) -> None: """Initialize regex patterns for sanitizing sensitive information from logs.""" self.sanitization_patterns = [ # API Keys and tokens { "pattern": re.compile(r'\b[Aa]pi[_-]?[Kk]ey["\s]*[:=]["\s]*([a-zA-Z0-9\-_]{16,})', re.IGNORECASE), "replacement": r'api_key="[REDACTED_API_KEY]"', "description": "API keys" }, { "pattern": re.compile(r'\b[Tt]oken["\s]*[:=]["\s]*([a-zA-Z0-9\-_\.]{20,})', re.IGNORECASE), "replacement": r'token="[REDACTED_TOKEN]"', "description": "Authentication tokens" }, { "pattern": re.compile(r'\b[Bb]earer\s+([a-zA-Z0-9\-_\.]{20,})', re.IGNORECASE), "replacement": r'Bearer [REDACTED_BEARER_TOKEN]', "description": "Bearer tokens" }, # Password patterns { "pattern": re.compile(r'\b[Pp]assword["\s]*[:=]["\s]*([^\s"\']{6,})', re.IGNORECASE), "replacement": r'password="[REDACTED_PASSWORD]"', "description": "Passwords" }, { "pattern": re.compile(r'\b[Pp]asswd["\s]*[:=]["\s]*([^\s"\']{6,})', re.IGNORECASE), "replacement": r'passwd="[REDACTED_PASSWORD]"', "description": "Password abbreviations" }, # Secret patterns { "pattern": re.compile(r'\b[Ss]ecret["\s]*[:=]["\s]*([^\s"\']{8,})', re.IGNORECASE), "replacement": r'secret="[REDACTED_SECRET]"', "description": "Secrets" }, # Database connection strings { "pattern": re.compile(r'([a-zA-Z]+://[^:]+:)([^@]+)(@[^\s]+)', re.IGNORECASE), "replacement": r'\1[REDACTED_DB_PASSWORD]\3', "description": "Database connection strings" }, # Private keys (basic detection) { "pattern": re.compile(r'-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----.*?-----END\s+(?:RSA\s+)?PRIVATE\s+KEY-----', re.DOTALL | re.IGNORECASE), "replacement": '[REDACTED_PRIVATE_KEY]', "description": "Private keys" }, # AWS/Cloud credentials { "pattern": re.compile(r'\bAKIA[0-9A-Z]{16}\b', re.IGNORECASE), "replacement": '[REDACTED_AWS_ACCESS_KEY]', "description": "AWS access keys" }, { "pattern": re.compile(r'\b[0-9a-zA-Z/+]{40}\b'), "replacement": '[REDACTED_AWS_SECRET_KEY]', "description": "AWS secret keys" }, # Personal information patterns (basic) { "pattern": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "replacement": '[REDACTED_EMAIL]', "description": "Email addresses" }, # Credit card numbers (basic detection) { "pattern": re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'), "replacement": '[REDACTED_CARD_NUMBER]', "description": "Credit card numbers" }, # JWT tokens { "pattern": re.compile(r'\beyJ[a-zA-Z0-9\-_\.]{20,}\b'), "replacement": '[REDACTED_JWT_TOKEN]', "description": "JWT tokens" } ] def _sanitize_log_content(self, log_content: list[str]) -> list[str]: """Sanitize log content to remove sensitive information. Args: log_content: List of log lines to sanitize Returns: List of sanitized log lines """ if not log_content: return log_content sanitized_logs: list[str] = [] redaction_count = 0 for line in log_content: if not line or not isinstance(line, str): sanitized_logs.append(line) continue sanitized_line = line line_redactions = [] # Apply each sanitization pattern for pattern_info in self.sanitization_patterns: pattern = pattern_info["pattern"] replacement = pattern_info["replacement"] description = pattern_info["description"] # Count matches before replacement matches = pattern.findall(sanitized_line) if matches: line_redactions.append(f"{len(matches)} {description}") redaction_count += len(matches) # Apply sanitization sanitized_line = pattern.sub(replacement, sanitized_line) sanitized_logs.append(sanitized_line) # Log redaction details (without the actual sensitive data) if line_redactions: logger.debug( "Log line sanitized", redactions=line_redactions, original_length=len(line), sanitized_length=len(sanitized_line) ) if redaction_count > 0: # Count lines that were actually modified modified_lines = sum(1 for i, (original, sanitized) in enumerate(zip(log_content, sanitized_logs, strict=False)) if original != sanitized) logger.info( "Log content sanitized", total_redactions=redaction_count, total_lines=len(log_content), modified_lines=modified_lines ) return sanitized_logs def _build_error_response( self, host_id: str, operation: str, error_message: str, container_id: str | None = None, problem_type: str | None = None, **context, ) -> dict[str, Any]: """Build standardized error response with container context. Args: host_id: ID of the Docker host operation: Name of the operation that failed error_message: Error message to include container_id: Optional container ID for container-specific errors problem_type: Explicit error type specification for precise categorization. Valid values: 'container_not_found', 'host_not_found', 'docker_context_error', 'generic' **context: Additional context to include in error response """ base_context = {"host_id": host_id, "operation": operation} if container_id: base_context["container_id"] = container_id # Merge additional context parameters base_context.update(context) # Use explicit problem_type if provided for precise error categorization if problem_type == "container_not_found": return DockerMCPErrorResponse.container_not_found(host_id, container_id or "unknown") elif problem_type == "host_not_found": return DockerMCPErrorResponse.host_not_found(host_id) elif problem_type == "docker_context_error": return DockerMCPErrorResponse.docker_context_error(host_id, operation, error_message) elif problem_type == "generic": return DockerMCPErrorResponse.generic_error(error_message, base_context) # Fallback to pattern matching for backward compatibility when no explicit type provided if "not found" in error_message.lower(): if container_id: return DockerMCPErrorResponse.container_not_found(host_id, container_id) else: return DockerMCPErrorResponse.host_not_found(host_id) elif "could not connect" in error_message.lower(): return DockerMCPErrorResponse.docker_context_error(host_id, operation, error_message) else: return DockerMCPErrorResponse.generic_error(error_message, base_context) async def get_container_logs( self, host_id: str, container_id: str, lines: int = 100, since: str | None = None, timestamps: bool = False, ) -> dict[str, Any]: """Get logs from a container. Args: host_id: ID of the Docker host container_id: Container ID or name lines: Number of lines to retrieve (default: 100) since: Only logs since this timestamp (e.g., '2023-01-01T00:00:00Z') timestamps: Include timestamps in output Returns: Container logs """ try: client = await self.context_manager.get_client(host_id) if client is None: return self._build_error_response( host_id, "get_container_logs", f"Could not connect to Docker on host {host_id}", container_id, problem_type="docker_context_error", ) # Get container and retrieve logs using Docker SDK container = await asyncio.to_thread(client.containers.get, container_id) # Build kwargs for logs method logs_kwargs = { "tail": lines, "timestamps": timestamps, } if since: try: dt = datetime.fromisoformat(since.replace("Z", "+00:00")) logs_kwargs["since"] = int(dt.timestamp()) except Exception: logs_kwargs["since"] = since # fallback # Get logs using Docker SDK try: logs_bytes = await asyncio.to_thread(container.logs, **logs_kwargs) # Parse logs (logs_bytes is bytes, need to decode) logs_str = logs_bytes.decode("utf-8", errors="replace") logs_data = logs_str.strip().split("\n") if logs_str.strip() else [] except Exception as sdk_error: logger.warning( "Docker SDK logs failed, will use fallback", error=str(sdk_error), host_id=host_id, container_id=container_id ) logs_data = [] # Fallback: If no logs from SDK, try direct docker command if not logs_data or (len(logs_data) == 1 and not logs_data[0]): logger.debug( "No logs from Docker SDK, trying direct command", host_id=host_id, container_id=container_id ) # Try using docker logs command directly via context logs_cmd = f"logs --tail {lines} {container_id}" cmd_result = await self.context_manager.execute_docker_command(host_id, logs_cmd) if cmd_result and "output" in cmd_result: logs_str = cmd_result["output"] logs_data = logs_str.strip().split("\n") if logs_str.strip() else [] # Sanitize logs before returning sanitized_logs = self._sanitize_log_content(logs_data) # Create logs response logs = ContainerLogs( container_id=container_id, host_id=host_id, logs=sanitized_logs, timestamp=datetime.now(UTC), truncated=len(sanitized_logs) >= lines, ) logger.info( "Retrieved container logs", host_id=host_id, container_id=container_id, lines_returned=len(sanitized_logs), sanitization_applied=len(sanitized_logs) != len(logs_data) or any(s != o for s, o in zip(sanitized_logs, logs_data, strict=False)), ) return create_success_response( data=logs.model_dump(), context={ "host_id": host_id, "operation": "get_container_logs", "container_id": container_id, }, ) except docker.errors.NotFound: logger.error("Container not found for logs", host_id=host_id, container_id=container_id) return self._build_error_response( host_id, "get_container_logs", f"Container {container_id} not found", container_id, problem_type="container_not_found" ) except docker.errors.APIError as e: logger.error( "Docker API error getting container logs", host_id=host_id, container_id=container_id, error=str(e), ) return self._build_error_response( host_id, "get_container_logs", f"Failed to get logs: {str(e)}", container_id ) except (DockerCommandError, DockerContextError) as e: logger.error( "Failed to get container logs", host_id=host_id, container_id=container_id, error=str(e), ) return self._build_error_response(host_id, "get_container_logs", str(e), container_id) async def stream_container_logs_setup( self, host_id: str, container_id: str, follow: bool = True, tail: int = 100, since: str | None = None, timestamps: bool = False, ) -> dict[str, Any]: """Setup real-time log streaming for a container. This creates a streaming endpoint that can be used for real-time log monitoring. The actual streaming is handled by FastMCP's HTTP streaming capabilities. Args: host_id: ID of the Docker host container_id: Container ID or name follow: Continue streaming new logs (default: True) tail: Number of initial lines to include since: Only logs since this timestamp timestamps: Include timestamps in output Returns: Streaming configuration and endpoint information """ try: # Validate container exists and is accessible await self._validate_container_exists(host_id, container_id) # Create stream configuration stream_config = LogStreamRequest( host_id=host_id, container_id=container_id, follow=follow, tail=tail, since=since, timestamps=timestamps, ) # In a real implementation, this would register the stream # with FastMCP's streaming system and return an endpoint URL stream_id = f"{host_id}_{container_id}_{uuid.uuid4().hex}" logger.info( "Log stream setup created", host_id=host_id, container_id=container_id, stream_id=stream_id, ) return create_success_response( data={ "stream_id": stream_id, "stream_endpoint": f"/streams/logs/{stream_id}", "config": stream_config.model_dump(), "message": f"Log stream setup for container {container_id} on host {host_id}", "instructions": { "connect": "Connect to the streaming endpoint to receive real-time logs", "format": "Server-sent events (SSE)", "reconnect": "Client should handle reconnection on connection loss", }, }, context={ "host_id": host_id, "operation": "stream_container_logs_setup", "container_id": container_id, }, ) except (DockerCommandError, DockerContextError) as e: logger.error( "Failed to setup log stream", host_id=host_id, container_id=container_id, error=str(e), ) return self._build_error_response( host_id, "stream_container_logs_setup", str(e), container_id ) async def get_service_logs( self, host_id: str, service_name: str, lines: int = 100, since: str | None = None, timestamps: bool = False, ) -> dict[str, Any]: """Get logs from a Docker Compose service. Args: host_id: ID of the Docker host service_name: Name of the service lines: Number of lines to retrieve since: Only logs since this timestamp timestamps: Include timestamps in output Returns: Service logs """ try: # Build Docker Compose logs command cmd = f"compose logs --tail {lines}" if since: cmd += f" --since {since}" if timestamps: cmd += " --timestamps" cmd += f" {service_name}" result = await self.context_manager.execute_docker_command(host_id, cmd) # Parse logs logs_data = [] if isinstance(result, dict) and "output" in result: logs_data = result["output"].strip().split("\n") # Sanitize service logs before returning sanitized_logs = self._sanitize_log_content(logs_data) logger.info( "Retrieved service logs", host_id=host_id, service_name=service_name, lines_returned=len(sanitized_logs), sanitization_applied=len(sanitized_logs) != len(logs_data) or any(s != o for s, o in zip(sanitized_logs, logs_data, strict=False)), ) return create_success_response( data={ "service_name": service_name, "host_id": host_id, "logs": sanitized_logs, "truncated": len(sanitized_logs) >= lines, }, context={ "host_id": host_id, "operation": "get_service_logs", "service_name": service_name, }, ) except (DockerCommandError, DockerContextError) as e: logger.error( "Failed to get service logs", host_id=host_id, service_name=service_name, error=str(e), ) return self._build_error_response( host_id, "get_service_logs", str(e), service_name=service_name ) async def _validate_container_exists(self, host_id: str, container_id: str) -> None: """Validate that a container exists and is accessible.""" try: cmd = f"inspect {container_id}" await self.context_manager.execute_docker_command(host_id, cmd) # If we get here without exception, container exists logger.debug( "Container validation successful", host_id=host_id, container_id=container_id ) except DockerCommandError as e: if "No such container" in str(e): raise DockerCommandError( f"Container {container_id} not found on host {host_id}" ) from e raise async def _stream_logs_generator(self, stream_config: LogStreamRequest): """Generator for streaming logs (used internally by FastMCP streaming).""" # This would be used by FastMCP's streaming system # to continuously yield log lines as they become available try: # Build streaming command cmd = f"logs --tail {stream_config.tail}" if stream_config.follow: cmd += " --follow" if stream_config.since: cmd += f" --since {stream_config.since}" if stream_config.timestamps: cmd += " --timestamps" cmd += f" {stream_config.container_id}" # In a real implementation, this would use asyncio subprocess # to stream logs continuously logger.info( "Starting log stream", host_id=stream_config.host_id, container_id=stream_config.container_id, ) # Placeholder for actual streaming implementation yield f"data: Starting log stream for {stream_config.container_id}\n\n" except Exception as e: logger.error("Log streaming error", error=str(e)) yield f"data: Error: {str(e)}\n\n"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server