Terminal Controller for MCP
by GongRzhe
import asyncio
import os
import subprocess
import platform
from typing import List, Dict, Optional
from datetime import datetime
from mcp.server.fastmcp import FastMCP
# Initialize MCP server
mcp = FastMCP("terminal-controller")
# List to store command history
command_history = []
# Maximum history size
MAX_HISTORY_SIZE = 50
async def run_command(cmd: str, timeout: int = 30) -> Dict:
"""
Execute command and return results
Args:
cmd: Command to execute
timeout: Command timeout in seconds
Returns:
Dictionary containing command execution results
"""
start_time = datetime.now()
try:
# Create command appropriate for current OS
if platform.system() == "Windows":
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
shell=True
)
else:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
shell=True,
executable="/bin/bash"
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout)
stdout = stdout.decode('utf-8', errors='replace')
stderr = stderr.decode('utf-8', errors='replace')
return_code = process.returncode
except asyncio.TimeoutError:
try:
process.kill()
except:
pass
return {
"success": False,
"stdout": "",
"stderr": f"Command timed out after {timeout} seconds",
"return_code": -1,
"duration": str(datetime.now() - start_time),
"command": cmd
}
duration = datetime.now() - start_time
result = {
"success": return_code == 0,
"stdout": stdout,
"stderr": stderr,
"return_code": return_code,
"duration": str(duration),
"command": cmd
}
# Add to history
command_history.append({
"timestamp": datetime.now().isoformat(),
"command": cmd,
"success": return_code == 0
})
# If history is too long, remove oldest record
if len(command_history) > MAX_HISTORY_SIZE:
command_history.pop(0)
return result
except Exception as e:
return {
"success": False,
"stdout": "",
"stderr": f"Error executing command: {str(e)}",
"return_code": -1,
"duration": str(datetime.now() - start_time),
"command": cmd
}
@mcp.tool()
async def execute_command(command: str, timeout: int = 30) -> str:
"""
Execute terminal command and return results
Args:
command: Command line command to execute
timeout: Command timeout in seconds, default is 30 seconds
Returns:
Output of the command execution
"""
# Check for dangerous commands (can add more security checks)
dangerous_commands = ["rm -rf /", "mkfs"]
if any(dc in command.lower() for dc in dangerous_commands):
return "For security reasons, this command is not allowed."
result = await run_command(command, timeout)
if result["success"]:
output = f"Command executed successfully (duration: {result['duration']})\n\n"
if result["stdout"]:
output += f"Output:\n{result['stdout']}\n"
else:
output += "Command had no output.\n"
if result["stderr"]:
output += f"\nWarnings/Info:\n{result['stderr']}"
return output
else:
output = f"Command execution failed (duration: {result['duration']})\n"
if result["stdout"]:
output += f"\nOutput:\n{result['stdout']}\n"
if result["stderr"]:
output += f"\nError:\n{result['stderr']}"
output += f"\nReturn code: {result['return_code']}"
return output
@mcp.tool()
async def get_command_history(count: int = 10) -> str:
"""
Get recent command execution history
Args:
count: Number of recent commands to return
Returns:
Formatted command history record
"""
if not command_history:
return "No command execution history."
count = min(count, len(command_history))
recent_commands = command_history[-count:]
output = f"Recent {count} command history:\n\n"
for i, cmd in enumerate(recent_commands):
status = "✓" if cmd["success"] else "✗"
output += f"{i+1}. [{status}] {cmd['timestamp']}: {cmd['command']}\n"
return output
@mcp.tool()
async def get_current_directory() -> str:
"""
Get current working directory
Returns:
Path of current working directory
"""
return os.getcwd()
@mcp.tool()
async def change_directory(path: str) -> str:
"""
Change current working directory
Args:
path: Directory path to switch to
Returns:
Operation result information
"""
try:
os.chdir(path)
return f"Switched to directory: {os.getcwd()}"
except FileNotFoundError:
return f"Error: Directory '{path}' does not exist"
except PermissionError:
return f"Error: No permission to access directory '{path}'"
except Exception as e:
return f"Error changing directory: {str(e)}"
@mcp.tool()
async def list_directory(path: Optional[str] = None) -> str:
"""
List files and subdirectories in the specified directory
Args:
path: Directory path to list contents, default is current directory
Returns:
List of directory contents
"""
if path is None:
path = os.getcwd()
try:
items = os.listdir(path)
dirs = []
files = []
for item in items:
full_path = os.path.join(path, item)
if os.path.isdir(full_path):
dirs.append(f"📁 {item}/")
else:
files.append(f"📄 {item}")
# Sort directories and files
dirs.sort()
files.sort()
if not dirs and not files:
return f"Directory '{path}' is empty"
output = f"Contents of directory '{path}':\n\n"
if dirs:
output += "Directories:\n"
output += "\n".join(dirs) + "\n\n"
if files:
output += "Files:\n"
output += "\n".join(files)
return output
except FileNotFoundError:
return f"Error: Directory '{path}' does not exist"
except PermissionError:
return f"Error: No permission to access directory '{path}'"
except Exception as e:
return f"Error listing directory contents: {str(e)}"
if __name__ == "__main__":
# Initialize and run server
mcp.run(transport='stdio')