Daytona MCP Python Interpreter

by nkkko
Verified
#!/usr/bin/env python import shlex import asyncio import json import logging from logging.handlers import RotatingFileHandler import os from pathlib import Path import sys import time import uuid import base64 import mimetypes import tempfile from typing import List, Optional, Any, Union from dotenv import load_dotenv from daytona_sdk import Daytona, DaytonaConfig, CreateWorkspaceParams, Workspace from daytona_sdk.process import ExecuteResponse from daytona_sdk.filesystem import FileSystem from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource # Custom exception classes for better error handling class DaytonaError(Exception): """Base exception class for all Daytona-related errors.""" pass class WorkspaceError(DaytonaError): """Exception raised for workspace-related errors.""" pass class WorkspaceInitializationError(WorkspaceError): """Exception raised when workspace initialization fails.""" pass class WorkspaceNotFoundError(WorkspaceError): """Exception raised when a workspace is not found.""" pass class WorkspaceQuotaExceededError(WorkspaceError): """Exception raised when CPU quota is exceeded.""" pass class FileSystemError(DaytonaError): """Exception raised for filesystem-related errors.""" pass class FileNotAccessibleError(FileSystemError): """Exception raised when a file cannot be accessed.""" pass class FileTooLargeError(FileSystemError): """Exception raised when a file is too large to process.""" pass class CommandExecutionError(DaytonaError): """Exception raised when a command execution fails.""" pass class ConfigurationError(DaytonaError): """Exception raised for configuration-related errors.""" pass class NetworkError(DaytonaError): """Exception raised for network-related errors.""" pass # Initialize mimetypes mimetypes.init() # Uncomment the following line only if api_client is necessary and correctly imported # from daytona_sdk import api_client # Configure logging LOG_FILE = '/tmp/daytona-interpreter.log' LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # File to track workspace ID across multiple processes WORKSPACE_TRACKING_FILE = '/tmp/daytona-workspace.json' WORKSPACE_LOCK_FILE = '/tmp/daytona-workspace.lock' # Removed global flag approach for better inter-process coordination # Now using file locking for the entire initialization process HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", 8000)) def setup_logging() -> logging.Logger: """Configure logging with file and console output""" logger = logging.getLogger("daytona-interpreter") logger.setLevel(logging.DEBUG) if not logger.hasHandlers(): # File handler file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) # Console handler # console_handler = logging.StreamHandler(sys.stderr) # console_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(file_handler) # logger.addHandler(console_handler) return logger class Config: """Server configuration class that loads environment variables for MCP Daytona setup""" def __init__(self): # Load environment variables from .env file try: load_dotenv() logger = logging.getLogger("daytona-interpreter") # Required API key for authentication self.api_key = os.getenv('MCP_DAYTONA_API_KEY') if not self.api_key: raise ConfigurationError("MCP_DAYTONA_API_KEY environment variable is required") else: logger.info("MCP_DAYTONA_API_KEY loaded successfully.") # Optional configuration with defaults self.server_url = os.getenv('MCP_DAYTONA_SERVER_URL', 'https://app.daytona.io/api') # Validate server URL format if not self.server_url.startswith(('http://', 'https://')): logger.warning(f"Invalid server URL format: {self.server_url}, adding https://") self.server_url = f"https://{self.server_url}" self.target = os.getenv('MCP_DAYTONA_TARGET', 'eu') # Validate and parse timeout timeout_str = os.getenv('MCP_DAYTONA_TIMEOUT', '180.0') try: self.timeout = float(timeout_str) if self.timeout <= 0: logger.warning(f"Invalid timeout value: {self.timeout}, using default of 180.0") self.timeout = 180.0 except ValueError: logger.warning(f"Invalid timeout format: {timeout_str}, using default of 180.0") self.timeout = 180.0 self.verify_ssl = os.getenv('MCP_VERIFY_SSL', 'false').lower() == 'true' # Optional debug logging self._log_config() except ConfigurationError: # Re-raise ConfigurationError exceptions raise except Exception as e: # Convert other exceptions to ConfigurationError with context logger.error(f"Configuration initialization error: {e}", exc_info=True) raise ConfigurationError(f"Failed to initialize configuration: {str(e)}") from e def _log_config(self) -> None: """Logs the current configuration settings excluding sensitive information.""" logger = logging.getLogger("daytona-interpreter") logger.debug("Configuration Loaded:") logger.debug(f" Server URL: {self.server_url}") logger.debug(f" Target: {self.target}") logger.debug(f" Timeout: {self.timeout}") logger.debug(f" Verify SSL: {self.verify_ssl}") class DaytonaInterpreter: """ MCP Server implementation for executing Python code and shell commands in Daytona workspaces using the Daytona SDK. Handles workspace creation, file operations, and command execution. """ def __init__(self, logger: logging.Logger, config: Config): # Initialize core components self.logger = logger self.config = config # Initialize Daytona SDK client self.daytona = Daytona( config=DaytonaConfig( api_key=self.config.api_key, server_url=self.config.server_url, target=self.config.target ) ) self.workspace: Optional[Workspace] = None # Current workspace instance self.filesystem: Optional[FileSystem] = None # FileSystem instance for the workspace # Initialize MCP server self.server = Server("daytona-interpreter") # Setup MCP handlers self.setup_handlers() # Setup empty resources list handler to prevent "Method not found" errors @self.server.list_resources() async def list_resources(): return [] # Setup empty prompts list handler to prevent "Method not found" errors @self.server.list_prompts() async def list_prompts(): return [] self.logger.info("Initialized DaytonaInterpreter with Daytona SDK and MCP Server") def setup_notification_handlers(self): """ Configure handlers for various MCP protocol notifications. Each handler processes specific notification types and performs appropriate actions. """ async def handle_cancel_request(params: dict[str, Any]) -> None: self.logger.info("Received cancellation request") await self.cleanup_workspace() async def handle_progress(params: dict[str, Any]) -> None: if 'progressToken' in params and 'progress' in params: self.logger.debug(f"Progress update: {params}") async def handle_initialized(params: dict[str, Any]) -> None: self.logger.debug("Received initialized notification") async def handle_roots_list_changed(params: dict[str, Any]) -> None: self.logger.debug("Received roots list changed notification") async def handle_cancelled(params: dict[str, Any]) -> None: self.logger.info(f"Received cancelled notification: {params}") if 'requestId' in params and 'reason' in params: self.logger.info(f"Request {params['requestId']} was cancelled: {params['reason']}") # Handle shutdown if we get a cancel notification with a specific error message that # indicates connection is closing if 'reason' in params and 'connection' in str(params.get('reason', '')).lower(): self.logger.info("Cancel notification indicates connection is closing, initiating cleanup") asyncio.create_task(self.cleanup_workspace()) # Otherwise don't trigger workspace cleanup on individual request cancellations # as this is likely just timeout on individual requests, not session termination async def handle_shutdown(params: dict[str, Any]) -> None: """Handle shutdown notification.""" self.logger.info("Received shutdown notification") # Perform cleanup but don't await it to prevent blocking asyncio.create_task(self.cleanup_workspace()) async def handle_unknown_notification(method: str, params: dict[str, Any]) -> None: """Handle any unknown notifications gracefully.""" # Don't log warnings for common notifications to reduce log noise if method not in ["notifications/cancelled"]: self.logger.warning(f"Received unknown notification method: {method} with params: {params}") # Register notification handlers with error handling wrapped around each handler handlers = { "$/cancelRequest": handle_cancel_request, "notifications/progress": handle_progress, "notifications/initialized": handle_initialized, "notifications/roots/list_changed": handle_roots_list_changed, "notifications/cancelled": handle_cancelled, "notifications/shutdown": handle_shutdown, "*": handle_unknown_notification # Add a catch-all handler for any unknown notifications } # Wrap all handlers with error handling wrapped_handlers = {} for method, handler in handlers.items(): async def wrapped_handler(params: dict[str, Any], method=method, original_handler=handler) -> None: try: await original_handler(params) except Exception as e: self.logger.error(f"Error in notification handler for {method}: {e}", exc_info=True) wrapped_handlers[method] = wrapped_handler self.server.notification_handlers.update(wrapped_handlers) def setup_handlers(self): """ Configure server request handlers for tool listing and execution. Defines available tools and their execution logic using the Daytona SDK. """ self.setup_notification_handlers() @self.server.list_tools() async def list_tools() -> List[Tool]: """ Define available tools: 1. shell_exec: Executes shell commands in workspace 2. file_download: Downloads a file from the workspace 3. file_upload: Uploads a file to the workspace 4. git_clone: Clones a Git repository into the workspace 5. web_preview: Generates a preview URL for web servers """ return [ Tool( name="shell_exec", description="Execute shell commands in the ephemeral Daytona Linux environment. Returns full stdout and stderr output with exit codes. Commands have workspace user permissions and can install packages, modify files, and interact with running services. Always use /tmp directory. Use verbose flags where available for better output.", inputSchema={ "type": "object", "properties": { "command": {"type": "string", "description": "Shell command to execute. Always use verbose where available."} }, "required": ["command"] } ), Tool( name="file_download", description="Download files from the Daytona workspace with smart handling for different file types and sizes. Supports text, binary, images, PDFs, and other formats with automatic content type detection. Results can be returned as text, base64-encoded data, or embedded resources.", inputSchema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to the file in the workspace"}, "max_size_mb": {"type": "number", "description": "Maximum file size in MB to download automatically (default: 5.0)"}, "download_option": {"type": "string", "description": "Option for handling large files: 'download_partial', 'convert_to_text', 'compress_file', or 'force_download'"}, "chunk_size_kb": {"type": "integer", "description": "Size of each chunk in KB when downloading partially (default: 100)"} }, "required": ["file_path"] } ), Tool( name="git_clone", description="Clone Git repositories into the Daytona workspace with customizable options. Supports branch/tag selection, shallow clones, Git LFS for large files, and SSH/HTTPS authentication. Cloned content persists during the session and can be accessed by other tools. Returns repository structure and metadata for easier navigation.", inputSchema={ "type": "object", "properties": { "repo_url": {"type": "string", "description": "URL of the Git repository to clone"}, "target_path": {"type": "string", "description": "Target directory to clone into (default: repository name)"}, "branch": {"type": "string", "description": "Branch to checkout (default: repository default branch)"}, "depth": {"type": "integer", "description": "Depth of history to clone (default: 1 for shallow clone)"}, "lfs": {"type": "boolean", "description": "Whether to enable Git LFS (default: false)"} }, "required": ["repo_url"] } ), Tool( name="file_upload", description="Upload files to the Daytona workspace from text or base64-encoded binary content. Creates necessary parent directories automatically and verifies successful writes. Files persist during the session and have appropriate permissions for further tool operations. Supports overwrite controls and maintains original file formats.", inputSchema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path where the file should be created in the workspace"}, "content": {"type": "string", "description": "Content to write to the file (text or base64-encoded binary)"}, "encoding": {"type": "string", "description": "Encoding of the content: 'text' (default) or 'base64'"}, "overwrite": {"type": "boolean", "description": "Whether to overwrite the file if it already exists (default: true)"} }, "required": ["file_path", "content"] } ), Tool( name="web_preview", description="Generate accessible preview URLs for web applications running in the Daytona workspace. Creates a secure tunnel to expose local ports externally without configuration. Validates if a server is actually running on the specified port and provides diagnostic information for troubleshooting. Supports custom descriptions and metadata for better organization of multiple services.", inputSchema={ "type": "object", "properties": { "port": {"type": "integer", "description": "The port number the server is running on"}, "description": {"type": "string", "description": "Optional description of the server (default: empty string)"}, "check_server": {"type": "boolean", "description": "Whether to check if a server is running on the port (default: true)"} }, "required": ["port"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: """ Handle tool execution requests from MCP. Uses Daytona SDK to execute Python code or shell commands within the workspace. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") if name == "shell_exec": command = arguments.get("command") if not command: raise ValueError("Command argument is required") try: result = await self.execute_command(command) return [TextContent(type="text", text=result)] except Exception as e: self.logger.error(f"Error executing tool '{name}': {e}", exc_info=True) return [TextContent(type="text", text=f"Error executing tool: {e}")] elif name == "file_download": file_path = arguments.get("file_path") if not file_path: raise ValueError("File path argument is required") # Extract optional parameters max_size_mb = arguments.get("max_size_mb", 5.0) download_option = arguments.get("download_option") chunk_size_kb = arguments.get("chunk_size_kb", 100) try: # Add extra debug logging self.logger.info(f"Using file_download with: path={file_path}, max_size={max_size_mb}MB, option={download_option}, chunk_size={chunk_size_kb}KB") # Validate inputs before calling file_downloader if max_size_mb is not None and not isinstance(max_size_mb, (int, float)): try: max_size_mb = float(max_size_mb) except (ValueError, TypeError): self.logger.warning(f"Invalid max_size_mb value: {max_size_mb}, using default of 5.0") max_size_mb = 5.0 if chunk_size_kb is not None and not isinstance(chunk_size_kb, (int, float)): try: chunk_size_kb = int(chunk_size_kb) except (ValueError, TypeError): self.logger.warning(f"Invalid chunk_size_kb value: {chunk_size_kb}, using default of 100") chunk_size_kb = 100 # Call our improved file_downloader function result = file_downloader( path=file_path, max_size_mb=max_size_mb, download_option=download_option, chunk_size_kb=chunk_size_kb ) self.logger.info(f"Download result: success={result.get('success', False)}") # Check if we got a download options response (for large files) if result.get("file_too_large"): options_text = ( f"File '{result.get('filename')}' is {result.get('file_size_mb')}MB which exceeds the " f"{result.get('max_size_mb')}MB limit.\n\n" "Available options:\n" "- download_partial: Download first part of the file\n" "- convert_to_text: Try to convert file to plain text\n" "- compress_file: Compress the file before downloading\n" "- force_download: Download the entire file anyway\n\n" "To proceed, call file_download again with the desired option, for example:\n" f"file_download(file_path='{file_path}', download_option='download_partial')" ) return [TextContent(type="text", text=options_text)] # For successful downloads, process the content if result.get("success"): # If partial download, convert_to_text, or compressed, add a message if result.get("partial") or result.get("converted") or result.get("compressed"): message = result.get("message", "") content = result.get("content", b"") # For binary content, determine content type content_type = result.get("content_type", "application/octet-stream") # For image content types, return as ImageContent if content_type.startswith("image/"): # Convert binary content to base64 base64_content = base64.b64encode(content).decode('utf-8') return [ ImageContent(type="image", data=base64_content, mimeType=content_type), TextContent(type="text", text=message) ] else: # For text content try: # Try to decode as text if it's a text type if content_type.startswith("text/"): text_content = content.decode('utf-8') return [TextContent(type="text", text=f"{message}\n\n{text_content}")] else: # Return as embedded resource for binary files base64_content = base64.b64encode(content).decode('utf-8') return [ EmbeddedResource( type="resource", resource={ "uri": f"file://{file_path}", "data": base64_content, "mimeType": content_type } ), TextContent(type="text", text=message) ] except UnicodeDecodeError: # If we can't decode as text, treat as binary base64_content = base64.b64encode(content).decode('utf-8') return [ EmbeddedResource( type="resource", resource={ "uri": f"file://{file_path}", "data": base64_content, "mimeType": content_type } ), TextContent(type="text", text=message) ] # For standard downloads, use process_file_content content = result.get("content", b"") if content: return await self.process_file_content(file_path, content) # For errors, return the error message with specific error handling based on error type if not result.get("success"): error_msg = result.get('error', 'Unknown error') error_type = result.get('error_type', 'UnknownError') # Format user-friendly error messages based on error type if error_type == "FileNotAccessibleError": friendly_msg = f"File not accessible: {error_msg}. Please check if the file exists and you have permission to access it." elif error_type == "FileTooLargeError": friendly_msg = f"File too large: {error_msg}. Try downloading with a specific option like 'download_partial'." elif error_type == "WorkspaceQuotaExceededError": friendly_msg = f"Resource limit exceeded: {error_msg}. Please try again later or contact support." elif error_type == "NetworkError": friendly_msg = f"Network error: {error_msg}. Please check your connection and try again." elif error_type in ["FileSystemError", "WorkspaceError", "WorkspaceInitializationError"]: friendly_msg = f"System error ({error_type}): {error_msg}. Please try again or contact support if the issue persists." else: friendly_msg = f"Error downloading file: {error_msg}" self.logger.error(f"Error type: {error_type}, Message: {error_msg}") return [TextContent(type="text", text=friendly_msg)] # Fallback for unexpected results return [TextContent(type="text", text=f"Unexpected download result: {json.dumps(result, default=str)}")] except Exception as e: self.logger.error(f"Error in file_downloader: {e}", exc_info=True) # Classify exception type for better error messages if isinstance(e, FileNotAccessibleError): error_msg = f"File not accessible: {str(e)}. Please check if the file exists and you have permission to access it." elif isinstance(e, FileTooLargeError): error_msg = f"File too large: {str(e)}. Try downloading with a specific option." elif isinstance(e, FileSystemError): error_msg = f"File system error: {str(e)}. Please try again." elif isinstance(e, NetworkError): error_msg = f"Network error: {str(e)}. Please check your connection and try again." elif isinstance(e, WorkspaceError): error_msg = f"Workspace error: {str(e)}. Please try again later." else: error_msg = f"Error downloading file: {str(e)}" # Return error as text return [TextContent(type="text", text=error_msg)] elif name == "git_clone": repo_url = arguments.get("repo_url") if not repo_url: raise ValueError("Repository URL is required") # Extract optional parameters target_path = arguments.get("target_path") branch = arguments.get("branch") depth = arguments.get("depth", 1) lfs = arguments.get("lfs", False) try: # Add debug logging self.logger.info(f"Using git_clone with: url={repo_url}, target={target_path}, branch={branch}, depth={depth}, lfs={lfs}") # Call the git_clone function result = git_repo_cloner( repo_url=repo_url, target_path=target_path, branch=branch, depth=depth, lfs=lfs ) # Handle errors if not result.get("success", False): error_msg = f"Error cloning repository: {result.get('error', 'Unknown error')}" self.logger.error(error_msg) return [TextContent(type="text", text=error_msg)] # Format successful clone result target_dir = result.get("target_directory", "repo") total_files = result.get("total_files", 0) files_sample = result.get("files_sample", []) repo_info = result.get("repository_info", "") # Build response response_parts = [ f"Repository cloned successfully into '{target_dir}'", f"Total files: {total_files}", f"\nRepository information:\n{repo_info}" ] if files_sample: sample_list = "\n".join(files_sample[:20]) # Show first 20 files file_count_msg = "" if len(files_sample) > 20: file_count_msg = f"...and {len(files_sample) - 20} more files" response_parts.append(f"\nSample files:\n{sample_list}\n{file_count_msg}") if total_files > len(files_sample): response_parts.append(f"\n(Showing {len(files_sample)} of {total_files} total files)") return [TextContent(type="text", text="\n".join(response_parts))] except Exception as e: self.logger.error(f"Error in git_repo_cloner: {e}", exc_info=True) return [TextContent(type="text", text=f"Error cloning repository: {str(e)}")] elif name == "file_upload": file_path = arguments.get("file_path") content = arguments.get("content") encoding = arguments.get("encoding", "text") overwrite = arguments.get("overwrite", True) if not file_path: raise ValueError("File path is required") if content is None: raise ValueError("Content is required") try: # Add debug logging self.logger.info(f"Using file_upload with: path={file_path}, encoding={encoding}, overwrite={overwrite}") # Call the file_uploader function result = file_uploader( file_path=file_path, content=content, encoding=encoding, overwrite=overwrite ) self.logger.info(f"Upload result: success={result.get('success', False)}") if result.get("success"): return [TextContent(type="text", text=result.get("message", "File uploaded successfully"))] else: return [TextContent(type="text", text=f"Error uploading file: {result.get('error', 'Unknown error')}")] except Exception as e: self.logger.error(f"Error in file_upload: {e}", exc_info=True) return [TextContent(type="text", text=f"Error uploading file: {str(e)}")] elif name == "web_preview": port = arguments.get("port") if port is None: raise ValueError("Port number is required") # Validate port is a valid number try: port = int(port) if port < 1 or port > 65535: raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.") except ValueError as e: return [TextContent(type="text", text=f"Invalid port: {e}")] # Extract optional parameters description = arguments.get("description", "") check_server = arguments.get("check_server", True) try: # Call the web_preview function self.logger.info(f"Using web_preview with port={port}") result = preview_link_generator( port=port, description=description, check_server=check_server ) # Handle errors if not result.get("success", False): error_msg = f"Error generating preview link: {result.get('error', 'Unknown error')}" # Add process info if available to help diagnose the issue process_info = result.get("process_info", "") if process_info and process_info != "No process found": error_msg += f"\n\nProcess found on port {port}:\n{process_info}" self.logger.error(error_msg) return [TextContent(type="text", text=error_msg)] # Format successful result preview_url = result.get("preview_url", "") accessible = result.get("accessible", None) status_code = result.get("status_code", None) # Create a response message with the preview URL response_parts = [] # Add the description if provided if description: response_parts.append(f"# {description}") # Add the preview URL (primary information) response_parts.append(f"Preview URL: {preview_url}") # Add accessibility status if checked if check_server and accessible is not None: if accessible: response_parts.append(f"URL is accessible (status code: {status_code})") else: response_parts.append(f"Warning: URL is not accessible. The server may not be properly configured to accept external requests.") # Add any additional notes note = result.get("note", "") if note: response_parts.append(f"Note: {note}") # Add useful information for debugging response_parts.append(f"\nPort: {port}") response_parts.append(f"Workspace ID: {result.get('workspace_id', '')}") # Create clickable link using markdown response_parts.append(f"\n[Click here to open preview]({preview_url})") return [TextContent(type="text", text="\n".join(response_parts))] except Exception as e: self.logger.error(f"Error in preview_link_generator: {e}", exc_info=True) return [TextContent(type="text", text=f"Error generating preview link: {str(e)}")] else: self.logger.error(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}") async def initialize_workspace(self) -> None: """ Initialize the Daytona workspace using the SDK. Creates a new workspace if it doesn't exist, or reuses an existing one. Uses a file-based lock to coordinate between processes. IMPORTANT: This method enforces a single workspace per session by using a tracking file to store and retrieve the active workspace ID. Raises: WorkspaceInitializationError: If workspace initialization fails WorkspaceQuotaExceededError: If CPU quota is exceeded WorkspaceNotFoundError: If a referenced workspace cannot be found NetworkError: If network-related errors occur """ # First check if this instance already has a workspace if self.workspace: self.logger.info(f"Instance already has workspace ID: {self.workspace.id}") return # Use file lock for the ENTIRE initialization process to prevent race conditions # This is critical to prevent multiple processes from initializing at the same time try: # Try to get the lock with a timeout with FileLock(WORKSPACE_LOCK_FILE): self.logger.info(f"Acquired file lock for workspace initialization (process {os.getpid()})") # Check if another process has already initialized the workspace workspace_id, created_at = get_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) if workspace_id: try: self.logger.info(f"Found active workspace ID: {workspace_id} (process {os.getpid()})") # Try to get the workspace from Daytona try: workspaces = self.daytona.list() except Exception as list_err: if "Unauthorized" in str(list_err) or "401" in str(list_err): raise NetworkError("Authentication failed when listing workspaces. Please check your API key.") elif "Connection" in str(list_err) or "Timeout" in str(list_err): raise NetworkError(f"Network error when listing workspaces: {str(list_err)}") else: raise WorkspaceError(f"Failed to list workspaces: {str(list_err)}") for workspace in workspaces: if workspace.id == workspace_id: # Reuse the existing workspace self.workspace = workspace # Initialize filesystem for this workspace try: # The API has changed in daytona-sdk 0.10.2 # We need to create the FileSystem instance with proper parameters # In SDK 0.10.2, toolbox_api is available directly on the Daytona instance toolbox_api = self.daytona.toolbox_api # Create filesystem with the workspace instance and toolbox_api self.filesystem = FileSystem(instance=self.workspace, toolbox_api=toolbox_api) self.logger.info(f"Reusing existing workspace ID: {workspace_id}") return except Exception as fs_err: self.logger.error(f"Failed to initialize filesystem for workspace {workspace_id}: {fs_err}") raise FileSystemError(f"Failed to initialize filesystem: {str(fs_err)}") # If we get here, the workspace in the file wasn't found in Daytona self.logger.warning(f"Workspace {workspace_id} not found in Daytona, clearing tracking") # Use filesystem if available for this instance clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) raise WorkspaceNotFoundError(f"Workspace {workspace_id} not found in Daytona") except (WorkspaceNotFoundError, FileSystemError, NetworkError, WorkspaceError) as specific_error: # Clear tracking file to prevent future issues clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) raise except Exception as e: self.logger.warning(f"Error fetching workspace: {e}") # Clear tracking file to prevent future issues clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) raise WorkspaceError(f"Error fetching workspace: {str(e)}") # If we get here, we need to create a new workspace if os.path.exists(WORKSPACE_TRACKING_FILE): self.logger.info(f"Clearing stale workspace tracking file") # Use filesystem if available for this instance clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) # Only create a new workspace if we don't have a valid tracking file self.logger.info(f"Creating a new Daytona workspace (process {os.getpid()})") params = CreateWorkspaceParams( language="python", os_user="workspace" # Fix: use os_user instead of user parameter # Additional parameters can be defined here ) # Check if we have any existing workspaces and terminate them if needed try: # List existing workspaces to avoid creating too many try: workspaces = self.daytona.list() except Exception as list_err: if "Unauthorized" in str(list_err) or "401" in str(list_err): raise NetworkError("Authentication failed when listing workspaces. Please check your API key.") elif "Connection" in str(list_err) or "Timeout" in str(list_err): raise NetworkError(f"Network error when listing workspaces: {str(list_err)}") else: self.logger.warning(f"Error listing workspaces: {list_err}") # Continue without failing - we'll attempt creation anyway workspaces = [] if len(workspaces) > 0: self.logger.info(f"Found {len(workspaces)} existing workspaces, removing oldest") # Sort by creation time (oldest first) if available try: workspaces.sort(key=lambda ws: getattr(ws, 'created_at', 0)) # Remove oldest workspace if workspaces: oldest = workspaces[0] self.logger.info(f"Removing oldest workspace: {oldest.id}") self.daytona.remove(oldest) except Exception as e: self.logger.warning(f"Error sorting/removing workspaces: {e}") except Exception as e: self.logger.warning(f"Error listing workspaces: {e}") # Add a retry mechanism for workspace creation max_retries = 3 retry_count = 0 retry_delay = 2.0 last_error = None while retry_count < max_retries: try: self.workspace = self.daytona.create(params) workspace_id = self.workspace.id # Initialize filesystem for this workspace try: toolbox_api = self.daytona.toolbox_api self.filesystem = FileSystem(instance=self.workspace, toolbox_api=toolbox_api) self.logger.info(f"Created Workspace ID: {workspace_id}") # Save workspace ID to tracking file for other processes to reuse # This must happen BEFORE releasing the lock to prevent race conditions set_active_workspace(workspace_id, self.filesystem) self.logger.info(f"Registered workspace ID {workspace_id} in tracking file") break except Exception as fs_err: self.logger.error(f"Failed to initialize filesystem for workspace {workspace_id}: {fs_err}") # Try to clean up the workspace we just created try: self.daytona.remove(self.workspace) self.logger.info(f"Cleaned up workspace {workspace_id} after filesystem initialization failed") except Exception as cleanup_err: self.logger.warning(f"Failed to clean up workspace after error: {cleanup_err}") raise FileSystemError(f"Failed to initialize filesystem: {str(fs_err)}") except Exception as e: last_error = e error_str = str(e) # Handle quota exceeded errors if "Total CPU quota exceeded" in error_str or "quota" in error_str.lower(): # Extract the quota information if available quota_info = "" try: import re match = re.search(r"Total CPU quota exceeded \((\d+) > (\d+)\)", error_str) if match: current = match.group(1) limit = match.group(2) quota_info = f" (Current: {current}, Limit: {limit})" except: pass self.logger.error(f"CPU quota exceeded error{quota_info}: {e}") # Provide more user-friendly error message error_message = ( f"Daytona CPU quota exceeded{quota_info}. Please delete unused workspaces " "from your Daytona account or upgrade your plan to continue." ) self.logger.error(error_message) # Don't retry on quota errors raise WorkspaceQuotaExceededError(error_message) # Handle network-related errors elif "Connection" in error_str or "Timeout" in error_str: self.logger.warning(f"Network error during workspace creation: {e}") if retry_count >= max_retries - 1: raise NetworkError(f"Network error during workspace creation: {str(e)}") # Handle authentication errors elif "Unauthorized" in error_str or "401" in str(e): raise NetworkError("Authentication failed when creating workspace. Please check your API key.") retry_count += 1 if retry_count >= max_retries: self.logger.error(f"Workspace creation failed after {max_retries} attempts") raise WorkspaceInitializationError(f"Failed to create workspace after {max_retries} attempts: {str(last_error)}") self.logger.warning(f"Workspace creation attempt {retry_count} failed: {e}, retrying in {retry_delay}s") await asyncio.sleep(retry_delay) retry_delay *= 1.5 # Exponential backoff except (WorkspaceInitializationError, WorkspaceQuotaExceededError, WorkspaceNotFoundError, FileSystemError, NetworkError, WorkspaceError): # Re-raise specific exceptions without wrapping raise except Exception as e: self.logger.error(f"Failed to create/find workspace: {e}", exc_info=True) raise WorkspaceInitializationError(f"Failed to create/find workspace: {str(e)}") async def execute_python_code(self, code: str) -> str: """ Execute Python code in the Daytona workspace using the SDK. Returns the execution result as a JSON string. Uses a wrapped code approach with robust error handling, environment diagnostics, and improved temporary directory management. Results are clearly marked for easier parsing. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") # Create wrapped code with all the requested enhancements wrapped_code = f""" import tempfile import os import base64 import json import io import sys import platform import shutil import uuid from pathlib import Path import traceback # Dictionary to collect execution results result_dict = {{ "stdout": "", "stderr": "", "exit_code": 0, "environment": {{}}, "images": [] }} # Setup output capture original_stdout = sys.stdout original_stderr = sys.stderr stdout_capture = io.StringIO() stderr_capture = io.StringIO() sys.stdout = stdout_capture sys.stderr = stderr_capture try: # Collect environment diagnostics result_dict["environment"] = {{ "python_version": platform.python_version(), "platform": platform.platform(), "user": os.getenv("USER", "unknown"), "home": os.path.expanduser("~"), "cwd": os.getcwd(), "pid": os.getpid() }} # Print diagnostics to stdout print(f"Python {{platform.python_version()}} on {{platform.platform()}}") print(f"Process ID: {{os.getpid()}}") print(f"Working directory: {{os.getcwd()}}") # Multiple fallback options for temporary directory temp_dir = None temp_dirs_to_try = [ os.path.expanduser("~/.daytona_tmp"), # First try user home "/tmp/daytona_tmp", # Then try system /tmp os.getcwd(), # Then try current directory os.path.join(os.path.expanduser("~"), ".cache") # Then try .cache ] # Try creating temp directory in each location for dir_path in temp_dirs_to_try: try: # Make sure parent directory exists with appropriate permissions parent = os.path.dirname(dir_path) if parent and not os.path.exists(parent): os.makedirs(parent, mode=0o777, exist_ok=True) # Create the directory if it doesn't exist if not os.path.exists(dir_path): os.makedirs(dir_path, mode=0o777, exist_ok=True) # Set as tempdir and create a unique subdirectory tempfile.tempdir = dir_path temp_dir = tempfile.mkdtemp(prefix='daytona_execution_') print(f"Created temporary directory: {{temp_dir}}") break except Exception: error = traceback.format_exc() print(f"Failed to use {{dir_path}} as temp directory:") print(error) # If all fallbacks failed, use a uuid-based directory in the current path if not temp_dir: try: temp_dir = os.path.join(os.getcwd(), f"daytona_tmp_{{uuid.uuid4().hex}}") os.makedirs(temp_dir, exist_ok=True) print(f"Using last-resort temporary directory: {{temp_dir}}") except Exception: error = traceback.format_exc() print(f"Failed to create last-resort temp directory: {{error}}") # Continue with temp_dir=None, will use current directory # Store original working directory original_dir = os.getcwd() # Change to temp directory if it exists if temp_dir and os.path.exists(temp_dir): try: os.chdir(temp_dir) print(f"Changed to working directory: {{os.getcwd()}}") except Exception: print(f"Failed to change to {{temp_dir}}, using current directory") # Create globals and locals dictionaries for execution globals_dict = {{'__name__': '__main__'}} locals_dict = {{}} # Add common packages to globals to make them available try: # Standard library modules import datetime, math, random, re, collections, itertools globals_dict.update({{ 'datetime': datetime, 'math': math, 'random': random, 're': re, 'collections': collections, 'itertools': itertools, 'os': os, 'sys': sys, 'json': json, 'Path': Path }}) # Try importing common data science packages try: import numpy as np globals_dict['np'] = np print("NumPy successfully imported") except ImportError: print("Warning: numpy not available") try: import pandas as pd globals_dict['pd'] = pd print("Pandas successfully imported") except ImportError: print("Warning: pandas not available") try: import matplotlib matplotlib.use('Agg') # Non-interactive backend import matplotlib.pyplot as plt globals_dict['plt'] = plt globals_dict['matplotlib'] = matplotlib print("Matplotlib successfully imported") except ImportError: print("Warning: matplotlib not available") except Exception: print("Warning: Error importing common packages") print(traceback.format_exc()) # Execute the user code with exec() in the prepared environment try: print("\\n--- Executing Code ---") exec(r'''{code}''', globals_dict, locals_dict) print("--- Code execution completed successfully ---\\n") except Exception: traceback_str = traceback.format_exc() print(f"\\n--- Error executing code ---\\n{{traceback_str}}\\n--- End of error ---\\n") result_dict["exit_code"] = 1 # Check for matplotlib figures if 'plt' in globals_dict and hasattr(globals_dict['plt'], 'get_fignums'): plt = globals_dict['plt'] if plt.get_fignums(): try: print("Saving unsaved matplotlib figures...") for fig_num in plt.get_fignums(): fig = plt.figure(fig_num) img_path = f"figure_{{fig_num}}.png" fig.savefig(img_path) print(f"Saved figure {{fig_num}} to {{img_path}}") plt.close('all') except Exception: print("Error saving matplotlib figures:") print(traceback.format_exc()) # Collect and encode any image files image_files = [f for f in os.listdir('.') if f.endswith(('.png', '.jpg', '.jpeg', '.gif', '.svg'))] if image_files: print(f"Found {{len(image_files)}} image files: {{image_files}}") for img_file in image_files: try: with open(img_file, 'rb') as f: img_data = f.read() if img_data: img_base64 = base64.b64encode(img_data).decode('utf-8') mime_type = "image/png" if img_file.endswith('.jpg') or img_file.endswith('.jpeg'): mime_type = "image/jpeg" elif img_file.endswith('.gif'): mime_type = "image/gif" elif img_file.endswith('.svg'): mime_type = "image/svg+xml" # Add to result result_dict["images"].append({{ "data": img_base64, "mime_type": mime_type, "filename": img_file, "size": len(img_data) }}) print(f"Added {{img_file}} to results ({{len(img_data)}} bytes)") except Exception: print(f"Error processing image file {{img_file}}:") print(traceback.format_exc()) # Restore stdout, stderr before final output sys.stdout = original_stdout sys.stderr = original_stderr # Get captured output result_dict["stdout"] = stdout_capture.getvalue() result_dict["stderr"] = stderr_capture.getvalue() # Cleanup temporary directory if it exists if temp_dir and os.path.exists(temp_dir): try: # Change back to original directory first if original_dir: os.chdir(original_dir) # Delete temp directory shutil.rmtree(temp_dir, ignore_errors=True) print(f"Cleaned up temporary directory: {{temp_dir}}") except Exception: print(f"Failed to remove temporary directory {{temp_dir}}") # Print results with special marker for parsing result_json = json.dumps(result_dict, indent=2) print("RESULT_JSON:" + result_json) except Exception: # Global exception handler for the entire wrapper sys.stdout = original_stdout sys.stderr = original_stderr error_trace = traceback.format_exc() print(f"Critical error in code execution wrapper:\\n{{error_trace}}") # Ensure we still return a valid JSON result result_dict = {{ "stdout": stdout_capture.getvalue(), "stderr": stderr_capture.getvalue() + "\\n" + error_trace, "exit_code": 2, "images": [] }} # Print with marker for parsing result_json = json.dumps(result_dict, indent=2) print("RESULT_JSON:" + result_json) """ # Execute the wrapped code in the workspace try: # Log the code execution attempt self.logger.debug(f"Executing wrapped Python code with length: {len(wrapped_code)}") # Execute using the SDK response = self.workspace.process.code_run(wrapped_code) # Process the results raw_result = str(response.result).strip() if response.result else "" exit_code = response.exit_code if hasattr(response, 'exit_code') else -1 # Log a truncated version of the output log_output = raw_result[:500] + "..." if len(raw_result) > 500 else raw_result self.logger.info(f"Code execution completed with exit code {exit_code}, output length: {len(raw_result)}") self.logger.debug(f"Execution Output (truncated):\n{log_output}") # Look for the RESULT_JSON marker in the output result_json = None marker = "RESULT_JSON:" marker_pos = raw_result.find(marker) if marker_pos >= 0: # Extract JSON after the marker json_str = raw_result[marker_pos + len(marker):].strip() try: result_data = json.loads(json_str) self.logger.info("Successfully parsed execution result JSON") # Process images if present if result_data.get("images"): self.logger.info(f"Found {len(result_data['images'])} images in result") # If only one image, include it in the main result if len(result_data["images"]) == 1: img = result_data["images"][0] result_data["image"] = img["data"] result_data["metadata"] = { "filename": img["filename"], "size": img["size"], "type": img["mime_type"].split("/")[-1] } # Return the formatted result return json.dumps(result_data, indent=2) except json.JSONDecodeError: self.logger.warning(f"Failed to parse result JSON, marker found but JSON invalid") # Continue with fallback # Fallback: If no marker or invalid JSON, return the raw output self.logger.warning("No result marker found, returning raw output") return json.dumps({ "stdout": raw_result, "stderr": "", "exit_code": exit_code }, indent=2) except Exception as exc: # Capture the error and return it in a structured format error_info = str(exc) self.logger.error(f"Error executing Python code: {error_info}") # Return a properly formatted error result return json.dumps({ "stdout": "", "stderr": f"Error executing code: {error_info}", "exit_code": -1 }, indent=2) async def execute_command(self, command: str) -> str: """ Execute a shell command in the Daytona workspace using the SDK. Returns the execution result as a JSON string. Args: command: The shell command to execute Returns: JSON string containing stdout, stderr, and exit_code Raises: WorkspaceError: If workspace is not initialized CommandExecutionError: If command execution fails NetworkError: If connection issues occur during execution """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise WorkspaceError("Workspace is not initialized.") try: # Validate command input if not command or not isinstance(command, str): raise ValueError("Command must be a non-empty string") # For commands containing && or cd, execute them as a single shell command if '&&' in command or command.strip().startswith('cd '): # Wrap the entire command in /bin/sh -c command = f'/bin/sh -c {shlex.quote(command)}' else: # For simple commands, just use shlex.quote on arguments if needed command = command.strip() self.logger.debug(f"Executing command: {command}") try: # Execute shell command using the SDK response: ExecuteResponse = self.workspace.process.exec(command) self.logger.debug(f"ExecuteResponse received: {type(response)}") # Handle the response result result = str(response.result).strip() if response.result else "" exit_code = response.exit_code if hasattr(response, 'exit_code') else -1 self.logger.info(f"Command completed with exit code: {exit_code}, output length: {len(result)}") # Truncate log output if too long log_output = result[:500] + "..." if len(result) > 500 else result self.logger.debug(f"Command Output (truncated):\n{log_output}") # Check for high exit code (error conditions) if exit_code > 0: self.logger.warning(f"Command exited with non-zero status: {exit_code}") # Return the execution output as JSON return json.dumps({ "stdout": result, "stderr": "", "exit_code": exit_code }, indent=2) except Exception as e: error_str = str(e) self.logger.error(f"Error during command execution: {e}", exc_info=True) # Classify error types for better handling if "Connection" in error_str or "Timeout" in error_str: raise NetworkError(f"Network error during command execution: {error_str}") elif "Unauthorized" in error_str or "401" in str(e): raise NetworkError("Authentication failed during command execution. Please check your API key.") else: raise CommandExecutionError(f"Command execution failed: {error_str}") except (NetworkError, CommandExecutionError, WorkspaceError, ValueError) as specific_error: # For specific error types, return formatted output with appropriate error info self.logger.error(f"Command execution error: {specific_error}") return json.dumps({ "stdout": "", "stderr": str(specific_error), "exit_code": -1, "error_type": specific_error.__class__.__name__ }, indent=2) except Exception as e: # For unexpected errors, wrap in CommandExecutionError but preserve original self.logger.error(f"Unexpected error executing command: {e}", exc_info=True) error = CommandExecutionError(f"Unexpected error: {str(e)}") return json.dumps({ "stdout": "", "stderr": str(error), "exit_code": -1, "error_type": "CommandExecutionError" }, indent=2) async def cleanup_workspace(self) -> None: """ Clean up the Daytona workspace by removing it using the SDK. Uses a file lock to coordinate between processes. This method handles workspace cleanup with error recovery and ensures proper coordination between multiple processes accessing the same workspace. """ if not self.workspace: self.logger.debug("No workspace to clean up for this instance") return # Store workspace ID for logging workspace_id = self.workspace.id try: # Use file lock to ensure only one process cleans up with FileLock(WORKSPACE_LOCK_FILE): # Check if this instance's workspace is the active workspace active_id, _ = get_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) if active_id and active_id == workspace_id: self.logger.info(f"Starting cleanup for workspace ID: {workspace_id}") # Attempt to remove the workspace with retry mechanism max_retries = 2 retry_count = 0 retry_delay = 1.0 success = False last_error = None while retry_count < max_retries and not success: try: # Remove the workspace self.daytona.remove(self.workspace) success = True # Clear tracking file so other processes know it's gone clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) # Log success self.logger.info(f"Successfully removed Workspace ID: {workspace_id}") except Exception as e: last_error = e retry_count += 1 error_str = str(e) # Handle different error cases if "not found" in error_str.lower() or "404" in error_str: # Workspace already removed, just clear tracking self.logger.info(f"Workspace {workspace_id} already removed, clearing tracking") clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) success = True break elif "Connection" in error_str or "Timeout" in error_str: # Network errors might be temporary, retry self.logger.warning(f"Network error during workspace removal: {e}, retrying...") elif "Unauthorized" in error_str or "401" in str(e): # Auth errors won't be fixed by retry self.logger.error(f"Authentication error during workspace removal: {e}") break else: self.logger.warning(f"Failed to remove workspace (attempt {retry_count}): {e}") if retry_count < max_retries: await asyncio.sleep(retry_delay) retry_delay *= 1.5 # If we've exhausted retries and still failed, log but don't raise if not success and last_error: self.logger.error(f"Failed to remove workspace after {max_retries} attempts: {last_error}") # Last-ditch effort: at least clear tracking file try: clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) self.logger.info("Cleared workspace tracking file despite removal failure") except Exception as tracking_error: self.logger.error(f"Failed to clear tracking file: {tracking_error}") else: self.logger.info(f"This process doesn't own the active workspace, skipping cleanup") except Exception as e: # Log lock acquisition errors but don't crash self.logger.error(f"Error during workspace cleanup lock acquisition: {e}", exc_info=True) # Try cleanup without lock as last resort try: self.logger.warning("Attempting cleanup without lock as fallback") self.daytona.remove(self.workspace) self.logger.info(f"Successfully removed Workspace ID: {workspace_id} without lock") except Exception as fallback_error: self.logger.error(f"Fallback cleanup also failed: {fallback_error}") finally: # Always clear this instance's references regardless of cleanup success self.workspace = None self.filesystem = None self.logger.debug("Cleared workspace and filesystem references") async def process_file_content(self, file_path: str, file_content: bytes) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: """ Process downloaded file content and return in appropriate format. """ try: # If download succeeded, get the file extension ext = Path(file_path).suffix.lower() filename = Path(file_path).name # Log success for debugging self.logger.info(f"Processing file: {file_path}, size: {len(file_content)} bytes") # Special case: Check if this might be a JSON file containing a base64 image # This handles matplotlib plots saved as JSON with embedded base64 images if ext == '.json': try: # Try to parse as JSON first text_content = file_content.decode('utf-8') json_data = json.loads(text_content) # Look for a base64 image field (common pattern in matplotlib exports) if isinstance(json_data, dict) and 'image' in json_data and isinstance(json_data['image'], str): # This might be a matplotlib plot with embedded image try: # Verify it's valid base64 image_data = json_data['image'] # Try to decode a small part to validate it's base64 base64.b64decode(image_data[:20] + "=" * ((4 - len(image_data[:20]) % 4) % 4)) # If we get here, it seems to be a valid base64 string self.logger.info(f"Detected JSON with embedded base64 image in {file_path}") # Get metadata if available metadata = json_data.get('metadata', {}) metadata_text = "" if metadata: try: # Format metadata nicely metadata_items = [] if isinstance(metadata, dict): for key, value in metadata.items(): if key != 'elements': # Skip detailed elements array metadata_items.append(f"{key}: {value}") # Handle elements separately if present if 'elements' in metadata and isinstance(metadata['elements'], list): elements = metadata['elements'] if elements: metadata_items.append(f"elements: [{len(elements)} items]") if metadata_items: metadata_text = "\n\nMetadata:\n" + "\n".join(metadata_items) except Exception as e: self.logger.warning(f"Error formatting metadata: {e}") # Return both the image and any metadata as text mime_type = "image/png" # Default for matplotlib return [ ImageContent(type="image", data=image_data, mimeType=mime_type), *([TextContent(type="text", text=metadata_text)] if metadata_text else []) ] except Exception as e: self.logger.debug(f"Not a valid base64 image in JSON: {e}") # Continue with normal JSON handling # If not a special case, continue with normal text handling return [TextContent(type="text", text=text_content)] except Exception as e: self.logger.debug(f"Error parsing as JSON with image: {e}") # Continue with regular file handling # Handle standard image files image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.svg'] if ext in image_extensions: try: # Convert binary content to base64 base64_content = base64.b64encode(file_content).decode('utf-8') # Determine MIME type based on extension # Use mimetypes library to get the correct MIME type mime_type = mimetypes.guess_type(file_path)[0] if not mime_type: # Fallback for common image types mime_type = f"image/{ext[1:]}" if ext[1:] != 'svg' else "image/svg+xml" if ext[1:] == 'jpg': mime_type = "image/jpeg" self.logger.info(f"Downloaded image file: {file_path} ({mime_type})") result = [ImageContent(type="image", data=base64_content, mimeType=mime_type)] # Check result is valid self.logger.info(f"Created ImageContent successfully: {type(result[0])}") return result except Exception as e: self.logger.error(f"Error creating ImageContent: {e}", exc_info=True) # Fallback to text description return [TextContent( type="text", text=f"Image downloaded but could not be displayed. File: {file_path}, Size: {len(file_content)} bytes." )] # Handle text files - try to decode as UTF-8 try: # Try to decode as text text_content = file_content.decode('utf-8') self.logger.info(f"Downloaded text file: {file_path}") # For matplotlib plots that might be plain text with base64 content if len(text_content) > 1000 and 'base64' in text_content[:1000] and ( 'matplotlib' in text_content[:1000] or 'plt' in text_content[:1000]): try: # Try to extract a base64 string import re match = re.search(r"base64,([A-Za-z0-9+/=]+)", text_content) if match: base64_content = match.group(1) self.logger.info(f"Found embedded base64 content in text file: {file_path}") return [ ImageContent(type="image", data=base64_content, mimeType="image/png"), TextContent(type="text", text="Found embedded image in text content.") ] except Exception as e: self.logger.debug(f"Error extracting base64 from text: {e}") # Continue with normal text handling # Wrap the result in a try-except to avoid any unexpected errors try: return [TextContent(type="text", text=text_content)] except Exception as e: self.logger.error(f"Error creating TextContent: {e}", exc_info=True) # Fallback to plain string with truncation return [TextContent(type="text", text=f"File content (first 1000 chars): {text_content[:1000]}"+(len(text_content)>1000 and "..." or ""))] except UnicodeDecodeError: # If we can't decode as text, return as binary base64_content = base64.b64encode(file_content).decode('utf-8') # For binary files, try to return as an attachment with base64 data self.logger.info(f"Downloaded binary file: {file_path}") # For PDF and other document files document_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'] if ext.lower() in document_extensions: # Get the MIME type mime_type = mimetypes.guess_type(file_path)[0] if not mime_type: # Default to application/octet-stream if cannot determine mime_type = "application/octet-stream" self.logger.info(f"Detected document file: {file_path} ({mime_type})") try: return [EmbeddedResource( type="resource", resource={ "uri": f"file://{file_path}", "data": base64_content, "mimeType": mime_type } )] except Exception as e: self.logger.error(f"Error creating EmbeddedResource: {e}", exc_info=True) # Fallback to text return [TextContent( type="text", text=f"Binary file downloaded but could not be embedded. File size: {len(file_content)} bytes." )] # For other binary types, try to detect if it's an image based on content try: # Check for common image headers if (len(file_content) > 4 and (file_content[:4] == b'\x89PNG' or # PNG file_content[:3] == b'\xff\xd8\xff' or # JPEG file_content[:4] == b'GIF8' or # GIF file_content[:2] == b'BM')): # BMP self.logger.info(f"Detected binary image file without correct extension: {file_path}") # Guess mime type from headers mime_type = "image/png" if file_content[:3] == b'\xff\xd8\xff': mime_type = "image/jpeg" elif file_content[:4] == b'GIF8': mime_type = "image/gif" elif file_content[:2] == b'BM': mime_type = "image/bmp" return [ImageContent(type="image", data=base64_content, mimeType=mime_type)] except Exception as e: self.logger.debug(f"Error detecting binary image: {e}") # If none of the special cases match, return generic binary message return [TextContent( type="text", text=f"Binary file downloaded but can't be displayed directly. File size: {len(file_content)} bytes." )] except Exception as e: self.logger.error(f"Error processing file {file_path}: {e}", exc_info=True) # Return error as text rather than raising an exception # to prevent the server from crashing return [TextContent( type="text", text=f"Error processing file: {str(e)}" )] async def download_file(self, file_path: str) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: """ Download a file from the Daytona workspace. Returns the file content either as text or as a base64 encoded image. Handles special cases like matplotlib plots stored as JSON with embedded base64 images. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") try: # Need to check if the file exists first # Use the filesystem API to download the file content if not self.filesystem: self.logger.error("Filesystem is not initialized.") raise RuntimeError("Filesystem is not initialized.") # Check if file exists using a command instead of filesystem.exists() # which doesn't exist in the Daytona SDK try: response = self.workspace.process.exec(f"test -f {shlex.quote(file_path)} && echo 'exists' || echo 'not exists'") if "exists" not in str(response.result): raise FileNotFoundError(f"File not found: {file_path}") except Exception as e: self.logger.warning(f"Error checking if file exists: {e}, will try to download anyway") file_content = self.filesystem.download_file(file_path) return await self.process_file_content(file_path, file_content) except Exception as e: self.logger.error(f"Error downloading file {file_path}: {e}", exc_info=True) # Return error as text rather than raising an exception # to prevent the server from crashing return [TextContent( type="text", text=f"Error downloading file: {str(e)}" )] async def cleanup(self) -> None: """ Perform full cleanup of resources: 1. Clean up workspace if it exists 2. Close Daytona SDK client connection if necessary 3. Ensure workspace tracking files are cleaned up """ try: self.logger.info("Starting full cleanup procedure") await self.cleanup_workspace() # Attempt to clean up tracking files if we exit unexpectedly try: # Only attempt if we have an active workspace if self.workspace: # Check if our workspace is the active one active_id, _ = get_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) if active_id == self.workspace.id: # We own the tracking file, so clean it up clear_active_workspace(self.filesystem if hasattr(self, 'filesystem') else None) self.logger.info("Cleared workspace tracking file") except Exception as e: self.logger.warning(f"Error clearing workspace tracking: {e}") self.logger.info("Cleanup procedure completed") # Additional cleanup steps can be added here if the SDK requires except Exception as e: self.logger.error(f"Error during cleanup: {e}", exc_info=True) # Don't raise the exception to prevent crashes async def run(self) -> None: """ Main server execution loop: 1. Initialize workspace 2. Run MCP server with stdio communication 3. Handle cleanup on shutdown """ try: # Before initializing a workspace, check if we have any existing ones and clean up # This helps prevent multiple sandbox issues try: # List and clean up existing workspaces workspaces = self.daytona.list() if len(workspaces) > 0: self.logger.info(f"Found {len(workspaces)} existing workspaces at startup, cleaning up") for workspace in workspaces: try: self.logger.info(f"Removing existing workspace: {workspace.id}") self.daytona.remove(workspace) except Exception as e: self.logger.warning(f"Error removing workspace {workspace.id}: {e}") except Exception as e: self.logger.warning(f"Error listing/cleaning workspaces at startup: {e}") # Only initialize workspace once at the beginning of the run # and don't initialize workspace in each stdio_server context try: await self.initialize_workspace() except Exception as e: # Special handling for quota errors to make them more visible if "CPU quota exceeded" in str(e): error_message = [ "\n\n======= DAYTONA ERROR =======", "CPU quota exceeded on Daytona server. To resolve this issue:", "1. Log into your Daytona account and delete unused workspaces", "2. Or upgrade your Daytona plan for higher CPU quota" ] # Try automatic cleanup of old workspaces first try: error_message.append("\nAttempting automatic cleanup of workspaces older than 1 day...") cleaned, errors, remaining = cleanup_stale_workspaces(self.daytona, max_age_days=1, logger=self.logger) if cleaned > 0: error_message.append(f"Successfully cleaned up {cleaned} old workspaces.") error_message.append("Retrying workspace creation...") # Try again to create the workspace try: await self.initialize_workspace() error_message.append("Workspace created successfully after cleanup!") error_message.append("============================\n") print("\n".join(error_message), file=sys.stderr) # If we reach here, we succeeded - continue with the run self.logger.info("Workspace initialization succeeded after cleanup") # Skip straight to server startup except Exception as retry_error: error_message.append(f"Retry failed after cleanup: {retry_error}") # Continue with normal error flow else: error_message.append(f"No old workspaces found to clean up ({remaining} workspaces remain).") except Exception as cleanup_error: error_message.append(f"Automatic cleanup failed: {cleanup_error}") # If we're still here, the initialization failed even after cleanup # Try to list existing workspaces to help user identify what to clean up manually try: workspaces = self.daytona.list() if workspaces: error_message.append("\nExisting workspaces:") for ws in workspaces: created_info = "" if hasattr(ws, 'created_at'): if isinstance(ws.created_at, (int, float)): # Convert timestamp to readable date from datetime import datetime created_info = datetime.fromtimestamp(ws.created_at).strftime('%Y-%m-%d %H:%M:%S') else: created_info = str(ws.created_at) error_message.append(f"- {ws.id} (created: {created_info or 'unknown'})") except Exception as list_error: self.logger.warning(f"Failed to list workspaces: {list_error}") error_message.append("\nCould not list existing workspaces due to an error.") error_message.append("============================\n") full_message = "\n".join(error_message) print(full_message, file=sys.stderr) # Print to stderr for visibility self.logger.error(full_message) # Only re-raise if we don't have a workspace (meaning cleanup didn't work) if not self.workspace: raise else: # For non-quota errors, always re-raise raise # Now run the MCP server async with stdio_server() as streams: try: # Add additional debug logging for server lifetime self.logger.info("Starting MCP server with stdio communication") # Create a keep-alive task that periodically logs to keep connection alive async def keep_alive(): while True: try: await asyncio.sleep(30) # Log every 30 seconds self.logger.debug("Server keep-alive ping") except asyncio.CancelledError: self.logger.debug("Keep-alive task cancelled") break # Start the keep-alive task keep_alive_task = asyncio.create_task(keep_alive()) try: await self.server.run( streams[0], streams[1], self.server.create_initialization_options() ) finally: # Make sure to cancel the keep-alive task keep_alive_task.cancel() # No need to await the cancelled task, it's causing BrokenResourceError # when the underlying stream is already closed except BaseExceptionGroup as e: # Handle ExceptionGroup (introduced in Python 3.11) if any(isinstance(exc, asyncio.CancelledError) for exc in e.exceptions): self.logger.info("Server was cancelled") elif any(isinstance(exc, (BrokenPipeError, ConnectionResetError)) or hasattr(exc, '__class__') and 'BrokenResourceError' in exc.__class__.__name__ for exc in e.exceptions): self.logger.info("Client disconnected unexpectedly") elif any("notifications/cancelled" in str(exc) for exc in e.exceptions): self.logger.info("Server received cancel notification, handling gracefully") elif any("ValidationError" in str(exc) for exc in e.exceptions): self.logger.info("Encountered validation error in notification handling, continuing") else: # Just log the error but don't re-raise it to prevent crashes filtered_exceptions = [exc for exc in e.exceptions if not ( hasattr(exc, '__class__') and 'BrokenResourceError' in exc.__class__.__name__ )] if filtered_exceptions: self.logger.error(f"Server error during run: {e}", exc_info=True) else: self.logger.info("Server shutdown initiated due to connection close") except asyncio.CancelledError: self.logger.info("Server task was cancelled") except BrokenPipeError: self.logger.info("Client pipe was broken") except ConnectionResetError: self.logger.info("Connection was reset by peer") except Exception as e: # Check for anyio BrokenResourceError by name to avoid import dependencies if hasattr(e, '__class__') and 'BrokenResourceError' in e.__class__.__name__: self.logger.info("Client resource was broken or closed") else: self.logger.error(f"Unhandled exception in MCP server: {e}", exc_info=True) finally: self.logger.info("MCP server shutdown initiated, starting cleanup") await self.cleanup() self.logger.info("Cleanup completed") except Exception as e: self.logger.error(f"Server error during run: {e}", exc_info=True) await self.cleanup() # Don't re-raise the exception to prevent crashing the process # Just log it and exit gracefully # Global variable to track interpreter instance within a process _interpreter_instance = None class FileLock: """Simple file-based lock for inter-process coordination.""" def __init__(self, lock_file, timeout_seconds=10): self.lock_file = lock_file self.lock_fd = None self.timeout_seconds = timeout_seconds self.logger = logging.getLogger("daytona-interpreter") def acquire(self): """Acquire the lock. Returns True if successful, False otherwise.""" start_time = time.time() # Keep trying until we get the lock or timeout while time.time() - start_time < self.timeout_seconds: try: # Create lock directory if it doesn't exist, with permissive permissions lock_dir = os.path.dirname(self.lock_file) try: os.makedirs(lock_dir, mode=0o777, exist_ok=True) except Exception as e: self.logger.warning(f"Failed to create lock directory with permissions: {e}") # Try again without setting permissions os.makedirs(lock_dir, exist_ok=True) # Open the lock file exclusively self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY) # Write the current process ID os.write(self.lock_fd, str(os.getpid()).encode()) return True except (OSError, IOError): # Another process already has the lock, wait a bit and retry self.logger.debug(f"Lock file {self.lock_file} is held by another process, waiting...") time.sleep(0.5) # Check if the lock file still exists - might have been released if not os.path.exists(self.lock_file): self.logger.debug("Lock file no longer exists, will retry") continue # Check if the lock is stale (older than 60 seconds) try: lock_stat = os.stat(self.lock_file) if time.time() - lock_stat.st_mtime > 60: self.logger.warning(f"Found stale lock file (over 60s old), removing") try: os.unlink(self.lock_file) continue # retry immediately except: pass # If we can't remove it, just wait and retry normally except: pass # If we can't stat the file, just wait and retry self.logger.warning(f"Failed to acquire lock after {self.timeout_seconds} seconds") return False def release(self): """Release the lock if held.""" if self.lock_fd is not None: os.close(self.lock_fd) try: os.unlink(self.lock_file) except (OSError, IOError): pass # Lock file already gone self.lock_fd = None def __enter__(self): # Only return self if lock was successfully acquired if not self.acquire(): # Wait and retry a few times for _ in range(3): time.sleep(0.2) if self.acquire(): break return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() def get_content_type(file_path: str) -> str: """Determine the content type of a file based on its extension.""" mime_type, _ = mimetypes.guess_type(file_path) if mime_type: return mime_type # Default content types for common extensions ext = os.path.splitext(file_path.lower())[1] content_types = { '.txt': 'text/plain', '.md': 'text/markdown', '.json': 'application/json', '.py': 'text/x-python', '.html': 'text/html', '.css': 'text/css', '.js': 'application/javascript', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.svg': 'image/svg+xml', '.pdf': 'application/pdf', '.doc': 'application/msword', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.xls': 'application/vnd.ms-excel', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.zip': 'application/zip', '.tar': 'application/x-tar', '.gz': 'application/gzip', } return content_types.get(ext, 'application/octet-stream') def preview_link_generator(port: int, description: str = "", check_server: bool = True): """ Generate a preview link for a web server running inside the Daytona workspace. Used by the web_preview tool. Args: port: The port number the server is running on description: Optional description of the server check_server: Whether to check if the server is running (default: True) Returns: Dict containing preview link information """ try: logger = logging.getLogger("daytona-interpreter") logger.info(f"Generating preview link for port {port}") # Initialize Daytona using the current interpreter's instance if possible global _interpreter_instance if _interpreter_instance and _interpreter_instance.workspace: logger.info("Using existing workspace from interpreter instance") workspace = _interpreter_instance.workspace else: logger.info("Creating new Daytona workspace") daytona = Daytona() workspace = daytona.create() # Check if the server is running on the specified port if check_server: logger.info(f"Checking if server is running on port {port}") check_cmd = f"curl -s -o /dev/null -w '%{{http_code}}' http://localhost:{port} --max-time 2 || echo 'error'" check_result = workspace.process.exec(check_cmd) response = str(check_result.result).strip() # If we can't connect or get an error response if response == 'error' or response.startswith('0'): logger.warning(f"No server detected on port {port}") # Check what might be using the port ps_cmd = f"ps aux | grep ':{port}' | grep -v grep || echo 'No process found'" ps_result = workspace.process.exec(ps_cmd) process_info = str(ps_result.result).strip() return { "success": False, "error": f"No server detected on port {port}. Please make sure your server is running.", "port": port, "process_info": process_info } # Extract the necessary domain information from workspace metadata try: # Extract node domain from provider metadata (JSON) node_domain = json.loads(workspace.instance.info.provider_metadata)['nodeDomain'] # Format the preview URL preview_url = f"http://{port}-{workspace.id}.{node_domain}" # Test that the URL is accessible via curl with timeout if check_server: # Test via port forwarding to make sure it's accessible check_cmd = f"curl -s -o /dev/null -w '%{{http_code}}' {preview_url} --max-time 3 || echo 'error'" check_result = workspace.process.exec(check_cmd) response = str(check_result.result).strip() accessible = response != 'error' and not response.startswith('0') status_code = response if response.isdigit() else None logger.info(f"Preview URL {preview_url} check result: {response}") else: accessible = None status_code = None # Return the formatted preview URL and metadata return { "success": True, "preview_url": preview_url, "port": port, "workspace_id": workspace.id, "node_domain": node_domain, "description": description, "accessible": accessible, "status_code": status_code } except Exception as e: logger.error(f"Error extracting domain information: {e}") # Try alternate method to get domain info try: # Extract from workspace info workspace_info = workspace.info() domains_info = str(workspace_info) # Look for domain pattern in the info import re domain_match = re.search(r'domain[\'"]?\s*:\s*[\'"]([^"\'\s]+)[\'"]', domains_info) if domain_match: node_domain = domain_match.group(1) preview_url = f"http://{port}-{workspace.id}.{node_domain}" return { "success": True, "preview_url": preview_url, "port": port, "workspace_id": workspace.id, "node_domain": node_domain, "description": description, "note": "Domain extracted using fallback method" } except Exception as fallback_error: logger.error(f"Fallback domain extraction failed: {fallback_error}") return { "success": False, "error": f"Failed to generate preview link: {str(e)}", "port": port } except Exception as e: logger.error(f"Error generating preview link: {e}", exc_info=True) return { "success": False, "error": str(e), "port": port } def git_repo_cloner(repo_url: str, target_path: str = None, branch: str = None, depth: int = 1, lfs: bool = False): """ Clone a Git repository into the Daytona workspace. Used by the git_clone tool. Args: repo_url: The URL of the Git repository to clone (https or ssh) target_path: Target directory to clone into (default: repository name) branch: Branch to checkout (default: repository default branch) depth: Depth of history to clone (default: 1 for shallow clone) lfs: Whether to enable Git LFS (default: False) Returns: Dict containing clone operation results and file list """ try: logger = logging.getLogger("daytona-interpreter") logger.info(f"Cloning Git repository: {repo_url}") # Initialize Daytona using the current interpreter's instance if possible global _interpreter_instance if _interpreter_instance and _interpreter_instance.workspace: logger.info("Using existing workspace from interpreter instance") workspace = _interpreter_instance.workspace else: logger.info("Creating new Daytona workspace") daytona = Daytona() workspace = daytona.create() # Extract repo name from URL for default target path import re repo_name = re.search(r"([^/]+)(?:\.git)?$", repo_url) if repo_name: repo_name = repo_name.group(1) else: repo_name = "repo" # Use provided target path or default to repo name target_dir = target_path or repo_name # Prepare the git clone command clone_cmd = f"git clone" # Add depth parameter for shallow clone if specified if depth > 0: clone_cmd += f" --depth {depth}" # Add branch parameter if specified if branch: clone_cmd += f" --branch {branch}" # Add the repository URL clone_cmd += f" {shlex.quote(repo_url)}" # Add target directory if it's not the default if target_path: clone_cmd += f" {shlex.quote(target_path)}" # Execute the clone command logger.info(f"Executing git clone command: {clone_cmd}") clone_result = workspace.process.exec(clone_cmd, timeout=180) # Longer timeout for large repos # Check if clone was successful if clone_result.exit_code != 0: logger.error(f"Git clone failed with exit code {clone_result.exit_code}") return { "success": False, "error": f"Git clone failed: {clone_result.result}", "exit_code": clone_result.exit_code } # If Git LFS is enabled, fetch LFS content if lfs: logger.info("Git LFS enabled, fetching LFS content") try: # Move into the cloned directory cd_cmd = f"cd {shlex.quote(target_dir)}" # Setup and pull LFS content lfs_cmd = f"{cd_cmd} && git lfs install && git lfs pull" lfs_result = workspace.process.exec(lfs_cmd, timeout=180) if lfs_result.exit_code != 0: logger.warning(f"Git LFS pull had issues: {lfs_result.result}") except Exception as e: logger.warning(f"Error with Git LFS: {e}") # List files in the cloned repository try: ls_cmd = f"find {shlex.quote(target_dir)} -type f -not -path '*/\\.git/*' | sort | head -n 100" ls_result = workspace.process.exec(ls_cmd) file_list = str(ls_result.result).strip().split('\n') # Get repository info info_cmd = f"cd {shlex.quote(target_dir)} && git log -1 --pretty=format:'%h %an <%ae> %ad %s' && echo '' && git branch -v" info_result = workspace.process.exec(info_cmd) repo_info = str(info_result.result).strip() # Count total files count_cmd = f"find {shlex.quote(target_dir)} -type f -not -path '*/\\.git/*' | wc -l" count_result = workspace.process.exec(count_cmd) total_files = int(str(count_result.result).strip()) return { "success": True, "repository": repo_url, "target_directory": target_dir, "branch": branch, "depth": depth, "files_sample": file_list[:100], # Limit to first 100 files "total_files": total_files, "repository_info": repo_info, "message": f"Repository cloned successfully into {target_dir}" } except Exception as e: logger.error(f"Error listing repository files: {e}") return { "success": True, "repository": repo_url, "target_directory": target_dir, "error_listing_files": str(e), "message": f"Repository cloned successfully into {target_dir}, but error listing files" } except Exception as e: logger.error(f"Error cloning repository: {e}", exc_info=True) return { "success": False, "error": str(e), "repository": repo_url } def file_uploader(file_path: str, content: str, encoding: str = "text", overwrite: bool = True): """ Upload files to Daytona workspace. Used by the file_upload tool. Args: file_path: Path where the file should be created in the workspace content: Content to write to the file (text or base64-encoded binary) encoding: Encoding of the content: 'text' (default) or 'base64' overwrite: Whether to overwrite the file if it already exists (default: True) Returns: Dict containing upload status and any error messages """ try: logger = logging.getLogger("daytona-interpreter") logger.info(f"Uploading file: {file_path}, encoding: {encoding}") # Get workspace from the tracking file try: # Initialize Daytona SDK config = Config() daytona = Daytona( config=DaytonaConfig( api_key=config.api_key, server_url=config.server_url, target=config.target ) ) # First, try to use the FileSystem from an existing interpreter instance global _interpreter_instance if _interpreter_instance and _interpreter_instance.workspace and _interpreter_instance.filesystem: logger.info("Using existing workspace from interpreter instance") workspace = _interpreter_instance.workspace else: # Get workspace ID from tracking file workspace_id, _ = get_active_workspace() if not workspace_id: return { "success": False, "error": "No workspace ID found in tracking file" } # Get the current workspace using workspace ID all_workspaces = daytona.list() workspace = None for ws in all_workspaces: if ws.id == workspace_id: workspace = ws break except Exception as e: logger.error(f"Error getting workspace: {e}") return { "success": False, "error": f"Failed to get workspace: {str(e)}" } if not workspace: return { "success": False, "error": "No workspace available" } # Get file system instance from workspace fs = workspace.fs # Check if file exists if not overwrite: try: # Try to check file info, which will raise an exception if file doesn't exist # Use get_file_info instead of stat (which doesn't exist in the FileSystem class) fs.get_file_info(file_path) return { "success": False, "error": f"File '{file_path}' already exists and overwrite=False" } except Exception: # File doesn't exist, which is good in this case pass # Prepare content based on encoding if encoding.lower() == "base64": try: # Decode base64 content binary_content = base64.b64decode(content) except Exception as e: return { "success": False, "error": f"Invalid base64 encoding: {str(e)}" } else: # Default is text encoding binary_content = content.encode('utf-8') # Create parent directories if they don't exist parent_dir = os.path.dirname(file_path) if parent_dir: try: # The SDK will create parent directories as needed, but we'll check first if not fs.dir_exists(parent_dir): fs.create_folder(parent_dir) except Exception as e: logger.warning(f"Error checking/creating parent directory: {e}") # Continue anyway, as upload_file might handle this # Upload the file fs.upload_file(file_path, binary_content) # Get file size for information file_info = fs.get_file_info(file_path) file_size = file_info.size # The size attribute in FileInfo class file_size_kb = file_size / 1024 return { "success": True, "message": f"File uploaded successfully: {file_path} ({file_size_kb:.2f} KB)", "file_path": file_path, "file_size_bytes": file_size } except Exception as e: logger.error(f"Error uploading file: {e}", exc_info=True) return { "success": False, "error": str(e), "file_path": file_path } def file_downloader(path: str, max_size_mb: float = 5.0, download_option: str = None, chunk_size_kb: int = 100): """ Download files from Daytona workspace with advanced handling for large files. Used by the file_download tool. Args: path: Path to the file in the Daytona workspace max_size_mb: Maximum file size in MB to download automatically download_option: Option to handle large files: 'download_partial', 'convert_to_text', 'compress_file', or None chunk_size_kb: Size of each chunk in KB when downloading partially Returns: Dict containing file content and metadata or download options Raises: FileNotAccessibleError: When the file cannot be accessed FileTooLargeError: When file exceeds size limits without download option FileSystemError: For filesystem-related errors NetworkError: For network-related errors """ logger = logging.getLogger("daytona-interpreter") logger.info(f"Downloading file: {path}, max_size: {max_size_mb}MB, option: {download_option}") workspace = None filesystem = None needs_cleanup = False daytona = None try: # Validate inputs if not path: raise ValueError("File path is required") if max_size_mb <= 0: logger.warning(f"Invalid max_size_mb value: {max_size_mb}, using default of 5.0") max_size_mb = 5.0 if chunk_size_kb <= 0: logger.warning(f"Invalid chunk_size_kb value: {chunk_size_kb}, using default of 100") chunk_size_kb = 100 if download_option and download_option not in ["download_partial", "convert_to_text", "compress_file", "force_download"]: logger.warning(f"Unrecognized download option: {download_option}") # Initialize Daytona using the current interpreter's instance if possible global _interpreter_instance if _interpreter_instance and _interpreter_instance.workspace and _interpreter_instance.filesystem: logger.info("Using existing workspace from interpreter instance") workspace = _interpreter_instance.workspace filesystem = _interpreter_instance.filesystem needs_cleanup = False else: try: logger.info("Creating new Daytona workspace") daytona = Daytona() try: workspace = daytona.create() filesystem = workspace.fs needs_cleanup = True except Exception as create_err: error_str = str(create_err) if "Total CPU quota exceeded" in error_str or "quota" in error_str.lower(): raise WorkspaceQuotaExceededError(f"CPU quota exceeded when creating workspace for file download: {error_str}") elif "Connection" in error_str or "Timeout" in error_str: raise NetworkError(f"Network error when creating workspace for file download: {error_str}") elif "Unauthorized" in error_str or "401" in str(create_err): raise NetworkError("Authentication failed when creating workspace for file download") else: raise WorkspaceInitializationError(f"Failed to create workspace for file download: {error_str}") except (WorkspaceQuotaExceededError, NetworkError, WorkspaceInitializationError) as specific_error: # Re-raise specific exceptions logger.error(f"Error creating workspace: {specific_error}") return { "success": False, "error": str(specific_error), "error_type": specific_error.__class__.__name__, "file_path": path } # First check if file exists and get file info using Daytona FileSystem try: # Use filesystem.get_file_info to check if file exists and get size information try: file_info = filesystem.get_file_info(path) logger.info(f"File exists: {path}") except Exception as fs_err: if "not found" in str(fs_err).lower() or "not exist" in str(fs_err).lower(): raise FileNotAccessibleError(f"File not found: {path}") elif "permission" in str(fs_err).lower(): raise FileNotAccessibleError(f"Permission denied accessing file: {path}") else: logger.warning(f"Error accessing file with FileSystem API: {fs_err}") raise FileSystemError(f"Error accessing file with FileSystem API: {fs_err}") # Get additional information with process.exec for backward compatibility try: # Get mime type mime_cmd = f"file --mime-type -b {shlex.quote(path)}" mime_result = workspace.process.exec(mime_cmd) mime_type = str(mime_result.result).strip() # Set complete file info file_info = { "name": os.path.basename(path), "size": file_info.size, # Use size from FileSystem's get_file_info "mime_type": mime_type } logger.debug(f"Enhanced file info with MIME type: {mime_type}") except Exception as e: # If getting additional info fails, create minimal file_info dict logger.warning(f"Error getting additional file info: {e}") file_info = { "name": os.path.basename(path), "size": file_info.size } logger.info(f"File info: {file_info}") except (FileNotAccessibleError, FileSystemError) as specific_error: # First try with process.exec as fallback logger.warning(f"Using process.exec fallback for file check: {specific_error}") try: # Check if file exists response = workspace.process.exec(f"test -f {shlex.quote(path)} && echo 'exists' || echo 'not exists'") if "exists" not in str(response.result): if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") raise FileNotAccessibleError(f"File not found: {path}") # Get file size using stat command try: size_cmd = f"stat -f %z {shlex.quote(path)}" size_result = workspace.process.exec(size_cmd) file_size = int(str(size_result.result).strip()) except Exception: # Try Linux stat format as fallback try: size_cmd = f"stat -c %s {shlex.quote(path)}" size_result = workspace.process.exec(size_cmd) file_size = int(str(size_result.result).strip()) except Exception as stat_err: logger.error(f"Failed to get file size: {stat_err}") raise FileSystemError(f"Failed to get file size: {stat_err}") # Get mime type try: mime_cmd = f"file --mime-type -b {shlex.quote(path)}" mime_result = workspace.process.exec(mime_cmd) mime_type = str(mime_result.result).strip() except Exception: # Default mime type based on extension mime_type = get_content_type(path) file_info = { "name": os.path.basename(path), "size": file_size, "mime_type": mime_type } logger.info(f"File info (via exec): {file_info}") except (FileNotAccessibleError, FileSystemError) as exec_error: # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") # Re-raise with more context return { "success": False, "error": str(exec_error), "error_type": exec_error.__class__.__name__, "file_path": path } except Exception as e: logger.error(f"File does not exist or cannot be accessed: {e}") # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": False, "error": f"File not found or inaccessible: {path}", "error_type": "FileNotAccessibleError", "file_path": path } # Calculate size in MB size_mb = file_info["size"] / (1024 * 1024) if isinstance(file_info, dict) else file_info.size / (1024 * 1024) logger.info(f"File size: {size_mb:.2f}MB") # If file is too large and no download option is specified, offer options if size_mb > max_size_mb and download_option is None: options = { "success": True, "file_too_large": True, "file_size_mb": round(size_mb, 2), "max_size_mb": max_size_mb, "file_path": path, "filename": os.path.basename(path), "content_type": get_content_type(path), "options": [ "download_partial", "convert_to_text", "compress_file", "force_download" ], "message": f"File is {round(size_mb, 2)}MB which exceeds the {max_size_mb}MB limit. Choose an option to proceed." } # Clean up if needed if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) logger.info("Cleaned up temporary workspace") except Exception as e: logger.error(f"Error during cleanup: {e}") return options # Process according to download option for large files if size_mb > max_size_mb and download_option: if download_option == "download_partial": # Download first chunk of the file using combined approach chunk_size_bytes = chunk_size_kb * 1024 try: # Try to get a chunk using filesystem API if available # Since FileSystem doesn't have a direct partial download method, # we'll use process.exec to create a temporary file with the chunk temp_chunk_path = f"/tmp/chunk_{uuid.uuid4()}.tmp" try: # Create the chunk workspace.process.exec(f"head -c {chunk_size_bytes} {shlex.quote(path)} > {temp_chunk_path}") # Use filesystem to download the chunk content = filesystem.download_file(temp_chunk_path) # Remove temp file workspace.process.exec(f"rm {temp_chunk_path}") except Exception as e: logger.warning(f"Error using filesystem for partial download: {e}, falling back to process.exec") # Fallback to direct base64 encoding head_cmd = f"head -c {chunk_size_bytes} {shlex.quote(path)} | base64" head_result = workspace.process.exec(head_cmd) # Decode base64 content content_b64 = str(head_result.result).strip() content = base64.b64decode(content_b64) # Clean up if needed if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": True, "filename": os.path.basename(path), "content_type": get_content_type(path), "size": file_info["size"] if isinstance(file_info, dict) else file_info.size, "content": content, "partial": True, "downloaded_bytes": len(content), "total_bytes": file_info["size"] if isinstance(file_info, dict) else file_info.size, "message": f"Downloaded first {chunk_size_kb}KB of file." } except Exception as partial_err: logger.error(f"Error downloading partial file: {partial_err}") # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": False, "error": f"Error downloading partial file: {str(partial_err)}", "error_type": "FileSystemError" if isinstance(partial_err, FileSystemError) else "DownloadError", "file_path": path } elif download_option == "convert_to_text": # Try to convert file to text (works best for PDFs, code files, etc.) try: # Check file type and use appropriate conversion method if path.lower().endswith('.pdf'): # Try to extract text from PDF text_cmd = f"pdftotext {shlex.quote(path)} - 2>/dev/null || echo 'PDF text extraction failed'" text_result = workspace.process.exec(text_cmd) content = str(text_result.result).encode('utf-8') else: # For other files, try to extract as text text_cmd = f"cat {shlex.quote(path)} | head -c 100000" text_result = workspace.process.exec(text_cmd) content = str(text_result.result).encode('utf-8') # Clean up if needed if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": True, "filename": os.path.basename(path), "content_type": "text/plain", "size": len(content), "content": content, "converted": True, "original_size": file_info["size"] if isinstance(file_info, dict) else file_info.size, "message": "File was converted to text format." } except Exception as convert_err: logger.error(f"Error converting to text: {convert_err}") # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": False, "error": f"Error converting file to text: {str(convert_err)}", "error_type": "FileSystemError" if isinstance(convert_err, FileSystemError) else "ConversionError", "file_path": path } elif download_option == "compress_file": # Compress the file before downloading try: temp_path = f"/tmp/compressed_{uuid.uuid4().hex}.gz" compress_cmd = f"gzip -c {shlex.quote(path)} > {temp_path}" workspace.process.exec(compress_cmd) # Get compressed file size try: size_cmd = f"stat -f %z {temp_path}" size_result = workspace.process.exec(size_cmd) compressed_size = int(str(size_result.result).strip()) except Exception: # Try Linux stat format as fallback size_cmd = f"stat -c %s {temp_path}" size_result = workspace.process.exec(size_cmd) compressed_size = int(str(size_result.result).strip()) # Download the compressed file if hasattr(filesystem, 'download_file'): content = filesystem.download_file(temp_path) else: # Fallback to base64 encoding cat_cmd = f"cat {temp_path} | base64" cat_result = workspace.process.exec(cat_cmd) content = base64.b64decode(str(cat_result.result).strip()) # Clean up temporary file try: workspace.process.exec(f"rm {temp_path}") except Exception as rm_err: logger.warning(f"Error removing temporary file: {rm_err}") # Clean up workspace if needed if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": True, "filename": f"{os.path.basename(path)}.gz", "content_type": "application/gzip", "size": compressed_size, "content": content, "compressed": True, "original_size": file_info["size"] if isinstance(file_info, dict) else file_info.size, "compression_ratio": round(compressed_size / (file_info["size"] if isinstance(file_info, dict) else file_info.size), 2), "message": f"File was compressed before download. Original: {size_mb:.2f}MB, Compressed: {compressed_size/(1024*1024):.2f}MB" } except Exception as compress_err: logger.error(f"Error compressing file: {compress_err}") # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") return { "success": False, "error": f"Error compressing file: {str(compress_err)}", "error_type": "FileSystemError" if isinstance(compress_err, FileSystemError) else "CompressionError", "file_path": path } elif download_option == "force_download": # Force download despite size logger.info(f"Forcing download of large file: {path}") # Fall through to regular download else: return { "success": False, "error": f"Unknown download option: {download_option}", "options": ["download_partial", "convert_to_text", "compress_file", "force_download"], "error_type": "InvalidOptionError" } # Download the file normally try: # Download using Daytona FileSystem API content = filesystem.download_file(path) logger.info(f"Successfully downloaded file: {path}, size: {len(content)} bytes") # Clean up if needed if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) logger.info("Cleaned up temporary workspace") except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") # Return metadata along with content return { "success": True, "filename": os.path.basename(path), "content_type": get_content_type(path), "size": len(content), "content": content, "message": f"Successfully downloaded file ({len(content)/1024:.1f}KB)" } except Exception as e: logger.error(f"Error downloading file: {e}") # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace: {cleanup_err}") error_type = "FileSystemError" if "permission" in str(e).lower(): error_type = "FileNotAccessibleError" elif "network" in str(e).lower() or "connection" in str(e).lower(): error_type = "NetworkError" return { "success": False, "error": f"Error downloading file: {str(e)}", "error_type": error_type, "file_path": path } except Exception as e: logger.error(f"File download failed: {e}", exc_info=True) # Cleanup on error if needs_cleanup and daytona and workspace: try: daytona.remove(workspace) except Exception as cleanup_err: logger.warning(f"Error cleaning up workspace during exception handling: {cleanup_err}") error_type = "UnknownError" if isinstance(e, FileNotAccessibleError): error_type = "FileNotAccessibleError" elif isinstance(e, FileTooLargeError): error_type = "FileTooLargeError" elif isinstance(e, FileSystemError): error_type = "FileSystemError" elif isinstance(e, NetworkError): error_type = "NetworkError" elif isinstance(e, WorkspaceError): error_type = "WorkspaceError" return { "success": False, "error": str(e), "error_type": error_type, "file_path": path } def get_active_workspace(filesystem=None): """ Get the active workspace ID from the tracking file. Returns a tuple of (workspace_id, creation_time) or (None, None). Args: filesystem: Optional Daytona FileSystem instance to use for file operations """ logger = logging.getLogger("daytona-interpreter") # Try using Daytona FileSystem if available if filesystem: try: # Check if the file exists using Daytona response = filesystem.instance.process.exec(f"test -f {shlex.quote(WORKSPACE_TRACKING_FILE)} && echo 'exists'") if response.stdout.strip() == 'exists': # Use Daytona to read the file content = filesystem.download_file(WORKSPACE_TRACKING_FILE) if content: data = json.loads(content.decode('utf-8')) logger.debug(f"Read workspace tracking file using Daytona FileSystem") return data.get('workspace_id'), data.get('created_at') except Exception as e: logger.warning(f"Failed to use Daytona FileSystem to read workspace tracking: {e}") # Fallback to standard file operations try: if os.path.exists(WORKSPACE_TRACKING_FILE): with open(WORKSPACE_TRACKING_FILE, 'r') as f: data = json.load(f) return data.get('workspace_id'), data.get('created_at') except Exception as e: logger.error(f"Failed to read workspace tracking file: {e}") return None, None def set_active_workspace(workspace_id, filesystem=None): """ Set the active workspace ID in the tracking file. Uses Daytona FileSystem if available, falls back to standard file operations. Args: workspace_id: ID of the workspace to set as active filesystem: Optional Daytona FileSystem instance to use for file operations """ logger = logging.getLogger("daytona-interpreter") data = { 'workspace_id': workspace_id, 'created_at': int(time.time()), 'pid': os.getpid() } # Use Daytona FileSystem if available if filesystem: try: # Convert to JSON string and encode as bytes content = json.dumps(data).encode('utf-8') # Create directory if needed (using process.exec) tracking_dir = os.path.dirname(WORKSPACE_TRACKING_FILE) filesystem.instance.process.exec(f"mkdir -p {shlex.quote(tracking_dir)}") # Use filesystem to write the file filesystem.upload_file(WORKSPACE_TRACKING_FILE, content) logger.debug(f"Workspace tracking file updated using Daytona FileSystem") return except Exception as e: logger.warning(f"Failed to use Daytona FileSystem for workspace tracking: {e}") # Fallback to standard file operations try: # Create directory if needed tracking_dir = os.path.dirname(WORKSPACE_TRACKING_FILE) try: os.makedirs(tracking_dir, mode=0o777, exist_ok=True) except Exception as e: logger.warning(f"Failed to create tracking directory with permissions: {e}") # Try again without setting permissions os.makedirs(tracking_dir, exist_ok=True) with open(WORKSPACE_TRACKING_FILE, 'w') as f: json.dump(data, f) except Exception as e: logger.error(f"Failed to update workspace tracking file: {e}") def clear_active_workspace(filesystem=None): """ Clear the active workspace ID from the tracking file. Uses Daytona FileSystem if available, falls back to standard file operations. Args: filesystem: Optional Daytona FileSystem instance to use for file operations """ logger = logging.getLogger("daytona-interpreter") # Try using Daytona FileSystem if available if filesystem: try: # Check if the file exists using Daytona process.exec response = filesystem.instance.process.exec(f"test -f {shlex.quote(WORKSPACE_TRACKING_FILE)} && echo 'exists'") if response.stdout.strip() == 'exists': # Use rm command with process.exec filesystem.instance.process.exec(f"rm {shlex.quote(WORKSPACE_TRACKING_FILE)}") logger.debug(f"Workspace tracking file removed using Daytona FileSystem") return except Exception as e: logger.warning(f"Failed to use Daytona FileSystem to clear workspace tracking: {e}") # Fallback to standard file operations try: if os.path.exists(WORKSPACE_TRACKING_FILE): os.unlink(WORKSPACE_TRACKING_FILE) except Exception as e: logger.error(f"Failed to remove workspace tracking file: {e}") def cleanup_stale_workspaces(daytona_instance, max_age_days=1, logger=None): """ Utility function to clean up workspaces older than the specified age. Args: daytona_instance: Initialized Daytona SDK instance max_age_days: Maximum age in days to keep workspaces (default: 1 day) logger: Logger instance for output messages Returns: tuple: (cleaned_count, error_count, remaining_count) """ if logger is None: logger = logging.getLogger("daytona-interpreter") logger.info(f"Starting cleanup of workspaces older than {max_age_days} days") current_time = time.time() max_age_seconds = max_age_days * 24 * 60 * 60 cleaned_count = 0 error_count = 0 remaining_count = 0 try: # Get list of all workspaces workspaces = daytona_instance.list() logger.info(f"Found {len(workspaces)} total workspaces") for workspace in workspaces: try: # Check if workspace has creation time info if hasattr(workspace, 'created_at'): # Parse the timestamp (format depends on API) try: # Try parsing as Unix timestamp if isinstance(workspace.created_at, (int, float)): created_timestamp = workspace.created_at # Try parsing as ISO string elif isinstance(workspace.created_at, str): from datetime import datetime created_timestamp = datetime.fromisoformat(workspace.created_at.replace('Z', '+00:00')).timestamp() else: # Unknown format, skip this workspace logger.warning(f"Unknown timestamp format for workspace {workspace.id}") remaining_count += 1 continue # Check if workspace is older than threshold age_seconds = current_time - created_timestamp if age_seconds > max_age_seconds: logger.info(f"Removing old workspace {workspace.id} (age: {age_seconds/86400:.1f} days)") daytona_instance.remove(workspace) cleaned_count += 1 else: logger.debug(f"Keeping workspace {workspace.id} (age: {age_seconds/86400:.1f} days)") remaining_count += 1 except Exception as e: logger.warning(f"Error parsing timestamp for workspace {workspace.id}: {e}") remaining_count += 1 else: # If no creation time, just count it logger.debug(f"Workspace {workspace.id} has no creation timestamp, skipping") remaining_count += 1 except Exception as e: logger.warning(f"Error processing workspace {workspace.id}: {e}") error_count += 1 logger.info(f"Cleanup complete: {cleaned_count} removed, {error_count} errors, {remaining_count} remaining") return (cleaned_count, error_count, remaining_count) except Exception as e: logger.error(f"Error during workspace cleanup: {e}") return (cleaned_count, error_count + 1, remaining_count) async def main(): """ Application entry point: 1. Set up logging 2. Load configuration 3. Create and run interpreter instance 4. Handle interrupts and cleanup IMPORTANT: This function ensures only one interpreter instance exists per process. It also coordinates workspace usage across multiple processes to ensure a single sandbox is used for the entire session. """ global _interpreter_instance # Check if interpreter is already running in this process - prevents multiple instances if _interpreter_instance is not None: print("Server is already running in this process, reusing existing instance") return # Create tmp directory with proper permissions if it doesn't exist tmp_dir = '/tmp' try: if not os.path.exists(tmp_dir): os.makedirs(tmp_dir, mode=0o777, exist_ok=True) logger.info(f"Created {tmp_dir} directory with permissions 0o777") else: # Ensure appropriate permissions on existing directory current_mode = os.stat(tmp_dir).st_mode & 0o777 if current_mode != 0o777: os.chmod(tmp_dir, 0o777) logger.info(f"Updated {tmp_dir} permissions to 0o777") except Exception as e: fallback_tmp = os.path.join(os.path.expanduser('~'), '.daytona_tmp') logger.warning(f"Failed to create/update /tmp: {e}. Using fallback directory: {fallback_tmp}") os.makedirs(fallback_tmp, exist_ok=True) # Update log and workspace file paths to use fallback directory global LOG_FILE, WORKSPACE_TRACKING_FILE, WORKSPACE_LOCK_FILE LOG_FILE = os.path.join(fallback_tmp, 'daytona-interpreter.log') WORKSPACE_TRACKING_FILE = os.path.join(fallback_tmp, 'daytona-workspace.json') WORKSPACE_LOCK_FILE = os.path.join(fallback_tmp, 'daytona-workspace.lock') logger = setup_logging() logger.info("Initializing server...") # Enable stderr logging for better debugging console_handler = logging.StreamHandler(sys.stderr) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # Log the server address logger.info(f"MCP Server is configured for {HOST}:{PORT}") # Create a variable for the interpreter outside the try block interpreter = None try: # Load configuration with retry mechanism max_config_retries = 3 config_retry_count = 0 config_retry_delay = 1.0 while config_retry_count < max_config_retries: try: logger.debug(f"Loading configuration (attempt {config_retry_count + 1})") config = Config() logger.info("Configuration loaded successfully") break except Exception as e: config_retry_count += 1 if config_retry_count >= max_config_retries: logger.error(f"Failed to load configuration after {max_config_retries} attempts: {e}") raise logger.warning(f"Configuration loading failed: {e}, retrying in {config_retry_delay}s") await asyncio.sleep(config_retry_delay) config_retry_delay *= 1.5 # Create interpreter and store in global variable - this is the only instance for this process logger.debug("Creating interpreter instance") _interpreter_instance = DaytonaInterpreter(logger, config) logger.info("Server started and connected successfully") # Set up signal handlers for graceful shutdown def signal_handler(): logger.info("Received termination signal") if _interpreter_instance: # No await here, just schedule the cleanup asyncio.create_task(_interpreter_instance.cleanup()) # Run the server - this will handle workspace initialization/reuse await _interpreter_instance.run() except KeyboardInterrupt: logger.info("Received interrupt signal") if _interpreter_instance: await _interpreter_instance.cleanup() _interpreter_instance = None except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) if _interpreter_instance: await _interpreter_instance.cleanup() _interpreter_instance = None # Don't exit with error code to allow the service to restart # sys.exit(1) finally: logger.info("Server shutdown complete") # Always ensure the interpreter is reset on shutdown _interpreter_instance = None if __name__ == "__main__": asyncio.run(main())