Daytona MCP Python Interpreter

by nkkko
Verified
  • src
  • daytona_mcp_interpreter
import shlex import asyncio import json import logging from logging.handlers import RotatingFileHandler import os from pathlib import Path import sys import uuid from typing import List, Optional, Any, Union from dotenv import load_dotenv from daytona_sdk import Daytona, DaytonaConfig, CreateWorkspaceParams from daytona_sdk.workspace import Workspace from daytona_sdk.process import ExecuteResponse from daytona_sdk.filesystem import FileSystem from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource # Uncomment the following line only if api_client is necessary and correctly imported # from daytona_sdk import api_client # Configure logging LOG_FILE = '/tmp/daytona-interpreter.log' LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", 8000)) def setup_logging() -> logging.Logger: """Configure logging with file and console output""" logger = logging.getLogger("daytona-interpreter") logger.setLevel(logging.DEBUG) if not logger.hasHandlers(): # File handler file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) # Console handler # console_handler = logging.StreamHandler(sys.stderr) # console_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(file_handler) # logger.addHandler(console_handler) return logger class Config: """Server configuration class that loads environment variables for MCP Daytona setup""" def __init__(self): # Load environment variables from .env file load_dotenv() # Required API key for authentication self.api_key = os.getenv('MCP_DAYTONA_API_KEY') if not self.api_key: raise ValueError("MCP_DAYTONA_API_KEY environment variable is required") else: logging.getLogger("daytona-interpreter").info("MCP_DAYTONA_API_KEY loaded successfully.") # Optional configuration with defaults self.server_url = os.getenv('MCP_DAYTONA_SERVER_URL', 'https://daytona.work/api') # Renamed self.target = os.getenv('MCP_DAYTONA_TARGET', 'local') self.timeout = float(os.getenv('MCP_DAYTONA_TIMEOUT', '180.0')) self.verify_ssl = os.getenv('MCP_VERIFY_SSL', 'false').lower() == 'true' # Optional debug logging self._log_config() def _log_config(self) -> None: """Logs the current configuration settings excluding sensitive information.""" logger = logging.getLogger("daytona-interpreter") logger.debug("Configuration Loaded:") logger.debug(f" Server URL: {self.server_url}") logger.debug(f" Target: {self.target}") logger.debug(f" Timeout: {self.timeout}") logger.debug(f" Verify SSL: {self.verify_ssl}") class DaytonaInterpreter: """ MCP Server implementation for executing Python code and shell commands in Daytona workspaces using the Daytona SDK. Handles workspace creation, file operations, and command execution. """ def __init__(self, logger: logging.Logger, config: Config): # Initialize core components self.logger = logger self.config = config # Initialize Daytona SDK client self.daytona = Daytona( config=DaytonaConfig( api_key=self.config.api_key, server_url=self.config.server_url, target=self.config.target ) ) self.workspace: Optional[Workspace] = None # Current workspace instance # Initialize MCP server self.server = Server("daytona-interpreter") # Setup MCP handlers self.setup_handlers() self.logger.info("Initialized DaytonaInterpreter with Daytona SDK and MCP Server") def setup_notification_handlers(self): """ Configure handlers for various MCP protocol notifications. Each handler processes specific notification types and performs appropriate actions. """ async def handle_cancel_request(params: dict[str, Any]) -> None: self.logger.info("Received cancellation request") await self.cleanup_workspace() async def handle_progress(params: dict[str, Any]) -> None: if 'progressToken' in params and 'progress' in params: self.logger.debug(f"Progress update: {params}") async def handle_initialized(params: dict[str, Any]) -> None: self.logger.debug("Received initialized notification") async def handle_roots_list_changed(params: dict[str, Any]) -> None: self.logger.debug("Received roots list changed notification") async def handle_cancelled(params: dict[str, Any]) -> None: self.logger.info(f"Received cancelled notification: {params}") await self.cleanup_workspace() async def handle_unknown_notification(method: str, params: dict[str, Any]) -> None: """Handle any unknown notifications gracefully.""" self.logger.warning(f"Received unknown notification method: {method} with params: {params}") # Register notification handlers self.server.notification_handlers.update({ "$/cancelRequest": handle_cancel_request, "notifications/progress": handle_progress, "notifications/initialized": handle_initialized, "notifications/roots/list_changed": handle_roots_list_changed, "cancelled": handle_cancelled # Added handler for 'cancelled' method }) def setup_handlers(self): """ Configure server request handlers for tool listing and execution. Defines available tools and their execution logic using the Daytona SDK. """ self.setup_notification_handlers() @self.server.list_tools() async def list_tools() -> List[Tool]: """ Define available tools: 1. python_interpreter: Executes Python code in workspace 2. command_executor: Executes shell commands in workspace """ return [ Tool( name="python_interpreter", description="Execute Python code in a Daytona workspace", inputSchema={ "type": "object", "properties": { "code": {"type": "string", "description": "Python code to execute"} }, "required": ["code"] } ), Tool( name="command_executor", description="Execute a single-line shell command in a Daytona workspace", inputSchema={ "type": "object", "properties": { "command": {"type": "string", "description": "Shell command to execute"} }, "required": ["command"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: """ Handle tool execution requests from MCP. Uses Daytona SDK to execute Python code or shell commands within the workspace. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") if name == "python_interpreter": code = arguments.get("code") if not code: raise ValueError("Code argument is required") try: result = await self.execute_python_code(code) return [TextContent(type="text", text=result)] except Exception as e: self.logger.error(f"Error executing tool '{name}': {e}", exc_info=True) return [TextContent(type="text", text=f"Error executing tool: {e}")] elif name == "command_executor": command = arguments.get("command") if not command: raise ValueError("Command argument is required") try: result = await self.execute_command(command) return [TextContent(type="text", text=result)] except Exception as e: self.logger.error(f"Error executing tool '{name}': {e}", exc_info=True) return [TextContent(type="text", text=f"Error executing tool: {e}")] else: self.logger.error(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}") async def initialize_workspace(self) -> None: """ Initialize the Daytona workspace using the SDK. Creates a new workspace if it doesn't exist. """ if not self.workspace: self.logger.info("Creating a new Daytona workspace") params = CreateWorkspaceParams( language="python" #image="python:3.13.1-bullseye" # Additional parameters can be defined here ) try: self.workspace = self.daytona.create(params) self.logger.info(f"Created Workspace ID: {self.workspace.id}") except Exception as e: self.logger.error(f"Failed to create workspace: {e}", exc_info=True) raise else: self.logger.info("Workspace already exists") async def execute_python_code(self, code: str) -> str: """ Execute Python code in the Daytona workspace using the SDK. Returns the execution result as a JSON string. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") try: # Execute Python code using the SDK response: ExecuteResponse = self.workspace.process.code_run(code) self.logger.debug(f"ExecuteResponse: {response}") # Handle the response result result = str(response.result).strip() if response.result else "" exit_code = response.exit_code if hasattr(response, 'exit_code') else -1 self.logger.info(f"Execution Output:\n{result}") # Return the execution output as JSON return json.dumps({ "stdout": result, "stderr": "", "exit_code": exit_code }, indent=2) except Exception as e: self.logger.error(f"Error executing Python code: {e}", exc_info=True) return json.dumps({ "stdout": "", "stderr": str(e), "exit_code": -1 }, indent=2) async def execute_command(self, command: str) -> str: """ Execute a shell command in the Daytona workspace using the SDK. Returns the execution result as a JSON string. """ if not self.workspace: self.logger.error("Workspace is not initialized.") raise RuntimeError("Workspace is not initialized.") try: # For commands containing &&, execute them as a single shell command if '&&' in command: # Wrap the entire command in /bin/sh -c command = f'/bin/sh -c {shlex.quote(command)}' else: # For simple commands, just use shlex.quote on arguments if needed command = command.strip() self.logger.debug(f"Executing command: {command}") # Execute shell command using the SDK response: ExecuteResponse = self.workspace.process.exec(command) self.logger.debug(f"ExecuteResponse: {response}") # Handle the response result result = str(response.result).strip() if response.result else "" self.logger.info(f"Command Output:\n{result}") # Return the execution output as JSON return json.dumps({ "stdout": result, "stderr": "", "exit_code": response.exit_code if hasattr(response, 'exit_code') else -1 }, indent=2) except Exception as e: self.logger.error(f"Error executing command: {e}", exc_info=True) return json.dumps({ "stdout": "", "stderr": str(e), "exit_code": -1 }, indent=2) async def cleanup_workspace(self) -> None: """ Clean up the Daytona workspace by removing it using the SDK. """ if self.workspace: try: self.daytona.remove(self.workspace) self.logger.info(f"Removed Workspace ID: {self.workspace.id}") self.workspace = None except Exception as e: self.logger.error(f"Failed to remove workspace: {e}", exc_info=True) async def cleanup(self) -> None: """ Perform full cleanup of resources: 1. Clean up workspace if it exists 2. Close Daytona SDK client connection if necessary """ await self.cleanup_workspace() # Additional cleanup steps can be added here if the SDK requires async def run(self) -> None: """ Main server execution loop: 1. Initialize workspace 2. Run MCP server with stdio communication 3. Handle cleanup on shutdown """ try: await self.initialize_workspace() async with stdio_server() as streams: try: await self.server.run( streams[0], streams[1], self.server.create_initialization_options() ) except BaseExceptionGroup as e: # Handle ExceptionGroup (introduced in Python 3.11) if any(isinstance(exc, asyncio.CancelledError) for exc in e.exceptions): self.logger.info("Server was cancelled") else: self.logger.error(f"Unhandled exception in TaskGroup: {e}", exc_info=True) except asyncio.CancelledError: self.logger.info("Server task was cancelled") except Exception as e: self.logger.error(f"Unhandled exception in MCP server: {e}", exc_info=True) finally: await self.cleanup() except Exception as e: self.logger.error(f"Server error during run: {e}", exc_info=True) await self.cleanup() raise async def main(): """ Application entry point: 1. Set up logging 2. Load configuration 3. Create and run interpreter instance 4. Handle interrupts and cleanup """ logger = setup_logging() logger.info("Starting Daytona MCP interpreter") # Log the server address logger.info(f"MCP Server is running on {HOST}:{PORT}") try: config = Config() except Exception as e: logger.error(f"Configuration error: {e}") sys.exit(1) interpreter = DaytonaInterpreter(logger, config) try: await interpreter.run() except KeyboardInterrupt: logger.info("Received interrupt signal") await interpreter.cleanup() except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) await interpreter.cleanup() sys.exit(1) if __name__ == "__main__": asyncio.run(main())