"""MCP server that exposes a terminal tool for running shell commands."""
import asyncio
import shlex
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP, Context
# Default timeout for command execution (seconds)
DEFAULT_TIMEOUT = 30.0
# Create the FastMCP server instance
mcp = FastMCP(
name="shellserver",
instructions="A simple MCP server that provides a terminal tool for executing shell commands."
)
@mcp.tool()
async def terminal(
command: str,
working_directory: Optional[str] = None,
timeout: Optional[float] = DEFAULT_TIMEOUT,
ctx: Optional[Context] = None
) -> str:
"""
Execute a shell command and return the output.
Args:
command: The shell command to execute (e.g., "ls -la", "pwd", "echo hello")
working_directory: Optional working directory to run the command in.
If not provided, uses current directory.
timeout: Optional timeout in seconds (default: 30.0)
ctx: MCP context (automatically provided)
Returns:
A string containing the command output, or an error message if execution fails.
"""
if not command or not command.strip():
return "Error: Empty command provided"
if ctx:
await ctx.info(f"Executing command: {command}")
if working_directory:
await ctx.info(f"Working directory: {working_directory}")
# Parse command safely using shlex to handle quoted arguments
try:
parts = shlex.split(command)
except ValueError as e:
return f"Error: Failed to parse command - {e}"
if not parts:
return "Error: No command provided after parsing"
# Determine working directory
cwd = None
if working_directory:
try:
cwd_path = Path(working_directory).resolve()
if not cwd_path.exists():
return f"Error: Working directory does not exist: {working_directory}"
if not cwd_path.is_dir():
return f"Error: Working directory is not a directory: {working_directory}"
cwd = str(cwd_path)
except Exception as e:
return f"Error: Invalid working directory - {e}"
# Execute the command
try:
proc = await asyncio.create_subprocess_exec(
*parts,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return f"Error: Command timed out after {timeout} seconds"
# Decode output
stdout_text = stdout.decode('utf-8', errors='replace')
stderr_text = stderr.decode('utf-8', errors='replace')
# Build result message
result_parts = []
if proc.returncode != 0:
result_parts.append(f"Exit code: {proc.returncode}")
if stdout_text:
result_parts.append(f"Stdout:\n{stdout_text}")
if stderr_text:
result_parts.append(f"Stderr:\n{stderr_text}")
if not result_parts:
return "(Command executed successfully with no output)"
return "\n\n".join(result_parts)
except FileNotFoundError:
return f"Error: Command not found: {parts[0]}"
except PermissionError:
return f"Error: Permission denied executing command: {command}"
except Exception as e:
return f"Error: Failed to execute command - {e}"
if __name__ == "__main__":
# Run the server using stdio transport (default for MCP)
mcp.run()