AiDD MCP Server
by skydeckai
import os
import stat
import subprocess
from typing import Any, Dict, List
import mcp.types as types
from .state import state
# Language configurations
LANGUAGE_CONFIGS = {
'python': {
'file_extension': '.py',
'command': ['python3'],
'comment_prefix': '#'
},
'javascript': {
'file_extension': '.js',
'command': ['node'],
'comment_prefix': '//'
},
'ruby': {
'file_extension': '.rb',
'command': ['ruby'],
'comment_prefix': '#'
},
'php': {
'file_extension': '.php',
'command': ['php'],
'comment_prefix': '//'
},
'go': {
'file_extension': '.go',
'command': ['go', 'run'],
'comment_prefix': '//',
'wrapper_start': 'package main\nfunc main() {',
'wrapper_end': '}'
},
'rust': {
'file_extension': '.rs',
'command': ['rustc', '-o'], # Special handling needed
'comment_prefix': '//',
'wrapper_start': 'fn main() {',
'wrapper_end': '}'
}
}
def execute_code_tool() -> Dict[str, Any]:
return {
"name": "execute_code",
"description": (
"Execute arbitrary code in various programming languages on the user's local machine within the current working directory. "
"Supported languages: " + ", ".join(LANGUAGE_CONFIGS.keys()) + ". "
"Always review the code carefully before execution to prevent unintended consequences. "
"Examples: "
"- Python: code='print(sum(range(10)))'. "
"- JavaScript: code='console.log(Array.from({length: 5}, (_, i) => i*2))'. "
"- Ruby: code='puts (1..5).reduce(:+)'. "
),
"inputSchema": {
"type": "object",
"properties": {
"language": {
"type": "string",
"enum": list(LANGUAGE_CONFIGS.keys()),
"description": "Programming language to use"
},
"code": {
"type": "string",
"description": "Code to execute on the user's local machine in the current working directory"
},
"timeout": {
"type": "integer",
"description": "Maximum execution time in seconds",
"default": 5,
"minimum": 1,
"maximum": 30
}
},
"required": ["language", "code"]
}
}
def execute_shell_script_tool() -> Dict[str, Any]:
return {
"name": "execute_shell_script",
"description": (
"Execute a shell script (bash/sh) on the user's local machine within the current working directory. "
"This tool can execute shell commands and scripts for system automation and management tasks. "
"It is designed to perform tasks on the user's local environment, such as opening applications, installing packages and more. "
"Always review the script carefully before execution to prevent unintended consequences. "
"Examples: "
"- script='echo \"Current directory:\" && pwd'. "
"- script='for i in {1..5}; do echo $i; done'. "
),
"inputSchema": {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": "Shell script to execute on the user's local machine"
},
"timeout": {
"type": "integer",
"description": "Maximum execution time in seconds (default: 300, max: 600)",
"default": 300
}
},
"required": ["script"]
}
}
def is_command_available(command: str) -> bool:
"""Check if a command is available in the system."""
try:
subprocess.run(['which', command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True)
return True
except subprocess.CalledProcessError:
return False
def prepare_code(code: str, language: str) -> str:
"""Prepare code for execution based on language requirements."""
config = LANGUAGE_CONFIGS[language]
if language == 'go':
if 'package main' not in code and 'func main()' not in code:
return f"{config['wrapper_start']}\n{code}\n{config['wrapper_end']}"
elif language == 'rust':
if 'fn main()' not in code:
return f"{config['wrapper_start']}\n{code}\n{config['wrapper_end']}"
elif language == 'php':
if '<?php' not in code:
return f"<?php\n{code}"
return code
async def execute_code_in_temp_file(language: str, code: str, timeout: int) -> tuple[str, str, int]:
"""Execute code in a temporary file and return stdout, stderr, and return code."""
config = LANGUAGE_CONFIGS[language]
temp_file = f"temp_script{config['file_extension']}"
try:
# Change to allowed directory first
os.chdir(state.allowed_directory)
# Write code to temp file
with open(temp_file, 'w') as f:
# Prepare and write code
prepared_code = prepare_code(code, language)
f.write(prepared_code)
f.flush()
# Prepare command
if language == 'rust':
# Special handling for Rust
output_path = 'temp_script.exe'
compile_cmd = ['rustc', temp_file, '-o', output_path]
try:
subprocess.run(compile_cmd,
check=True,
capture_output=True,
timeout=timeout)
cmd = [output_path]
except subprocess.CalledProcessError as e:
return '', e.stderr.decode(), e.returncode
else:
cmd = config['command'] + [temp_file]
# Execute code
try:
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
text=True,
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return '', f'Execution timed out after {timeout} seconds', 124
finally:
# Cleanup
# Note: We stay in the allowed directory as all operations should happen there
try:
os.unlink(temp_file)
if language == 'rust' and os.path.exists(output_path):
os.unlink(output_path)
except Exception:
pass
async def handle_execute_code(arguments: dict) -> List[types.TextContent]:
"""Handle code execution in various programming languages."""
language = arguments.get("language")
code = arguments.get("code")
timeout = arguments.get("timeout", 5)
if not language or not code:
raise ValueError("Both language and code must be provided")
if language not in LANGUAGE_CONFIGS:
raise ValueError(f"Unsupported language: {language}")
# Check if required command is available
command = LANGUAGE_CONFIGS[language]['command'][0]
if not is_command_available(command):
return [types.TextContent(
type="text",
text=f"Error: {command} is not installed on the system"
)]
try:
stdout, stderr, returncode = await execute_code_in_temp_file(language, code, timeout)
result = []
if stdout:
result.append(f"=== stdout ===\n{stdout.rstrip()}")
if stderr:
result.append(f"=== stderr ===\n{stderr.rstrip()}")
if not stdout and not stderr:
result.append("Code executed successfully with no output")
if returncode != 0:
result.append(f"\nProcess exited with code {returncode}")
return [types.TextContent(
type="text",
text="\n\n".join(result)
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error executing code:\n{str(e)}"
)]
async def execute_shell_script_in_temp_file(script: str, timeout: int) -> tuple[str, str, int]:
"""Execute a shell script in a temporary file and return stdout, stderr, and return code."""
temp_file = "temp_script.sh"
try:
# Change to allowed directory first
os.chdir(state.allowed_directory)
# Write script to temp file
with open(temp_file, 'w') as f:
f.write("#!/bin/sh\n") # Use sh for maximum compatibility
f.write(script)
f.flush()
# Make the script executable
os.chmod(temp_file, os.stat(temp_file).st_mode | stat.S_IEXEC)
# Execute script
try:
result = subprocess.run(
["/bin/sh", temp_file], # Use sh explicitly for consistent behavior
capture_output=True,
timeout=timeout,
text=True,
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return '', f'Execution timed out after {timeout} seconds', 124
finally:
# Cleanup
try:
os.unlink(temp_file)
except Exception:
pass
async def handle_execute_shell_script(arguments: dict) -> List[types.TextContent]:
"""Handle shell script execution."""
script = arguments.get("script")
timeout = min(arguments.get("timeout", 300), 600) # Default 5 minutes, cap at 10 minutes
try:
stdout, stderr, returncode = await execute_shell_script_in_temp_file(script, timeout)
result = []
if stdout:
result.append(f"=== stdout ===\n{stdout.rstrip()}")
if stderr:
result.append(f"=== stderr ===\n{stderr.rstrip()}")
if not stdout and not stderr:
result.append("Script executed successfully with no output")
if returncode != 0:
result.append(f"\nScript exited with code {returncode}")
return [types.TextContent(
type="text",
text="\n\n".join(result)
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error executing shell script:\n{str(e)}"
)]