MCP Command Server
- src
- mcp_command_server
"""MCP server for secure command execution with full security and audit capabilities"""
import asyncio
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional
from mcp.server.fastmcp import FastMCP, Context
from mcp_command_server.security.validator import CommandValidator, ValidationError
from mcp_command_server.security.sanitizer import CommandSanitizer
from mcp_command_server.security.confirmation import UserConfirmationHandler
from mcp_command_server.security.audit import AuditLogger
class CommandServer:
"""MCP server for secure command execution"""
DEFAULT_ALLOWED_COMMANDS = {
"ls": {"args": ["-l", "-a", "-h"], "paths": ["/"]},
"cd": {"args": [], "paths": ["/"]},
"npm": {"args": ["run", "dev", "install", "test"], "paths": ["./"]},
"python": {"args": ["-m", "pytest", "run"], "paths": ["./"]},
"pip": {"args": ["install", "list", "freeze"], "paths": ["./"]},
"git": {"args": ["status", "pull", "push", "checkout"], "paths": ["./"]},
}
def __init__(
self,
allowed_commands: Optional[Dict[str, Dict[str, Any]]] = None,
audit_log_path: str = "logs/audit.log",
user: str = "default_user"
):
"""
Initialize the command execution server.
Args:
allowed_commands: Dictionary of allowed commands and their configurations
audit_log_path: Path to audit log file
user: Username for audit logging
"""
self.mcp = FastMCP("CommandServer", dependencies=["asyncio", "subprocess"])
self.allowed_commands = allowed_commands or self.DEFAULT_ALLOWED_COMMANDS
self.user = user
# Initialize security components
self.validator = CommandValidator(self.allowed_commands)
self.sanitizer = CommandSanitizer()
self.confirmation = UserConfirmationHandler()
self.audit = AuditLogger(audit_log_path)
# Register MCP tools
self._register_tools()
def _register_tools(self) -> None:
"""Register all command execution tools with MCP server"""
@self.mcp.tool()
async def execute_command(
command: str,
args: List[str],
path: str,
ctx: Context
) -> str:
"""
Execute a system command with full security checks.
Args:
command: Command to execute
args: Command arguments
path: Working directory
ctx: MCP context for progress reporting
Returns:
str: Command output or error message
"""
try:
# Security validation
ctx.info(f"Validating command: {command} {' '.join(args)}")
await ctx.report_progress(10, 100)
# Sanitize inputs
command = self.sanitizer.sanitize_command(command)
args = self.sanitizer.sanitize_arguments(args)
path = self.sanitizer.sanitize_path(path)
await ctx.report_progress(30, 100)
# Validate against allowed commands
self.validator.validate_command(command, args, path)
await ctx.report_progress(50, 100)
# Get user confirmation
if not self.confirmation.require_confirmation(command, args, path, self.user):
raise PermissionError("Command execution not confirmed by user")
await ctx.report_progress(70, 100)
# Execute command
result = await self._execute_command_async(command, args, path)
# Log successful execution
self.audit.log_command_execution(
command=command,
arguments=args,
path=path,
status="success",
user=self.user
)
await ctx.report_progress(100, 100)
return result
except Exception as e:
# Log failure
self.audit.log_command_execution(
command=command,
arguments=args,
path=path,
status="failed",
user=self.user,
error_message=str(e)
)
raise
@self.mcp.tool()
async def check_command_allowed(
command: str,
args: List[str],
path: str
) -> bool:
"""
Check if a command would be allowed without executing it.
Args:
command: Command to check
args: Command arguments
path: Working directory
Returns:
bool: True if command would be allowed
"""
try:
# Sanitize and validate
command = self.sanitizer.sanitize_command(command)
args = self.sanitizer.sanitize_arguments(args)
path = self.sanitizer.sanitize_path(path)
self.validator.validate_command(command, args, path)
return True
except (ValueError, ValidationError):
return False
async def _execute_command_async(
self,
command: str,
args: List[str],
path: str
) -> str:
"""
Execute a command asynchronously.
Args:
command: Command to execute
args: Command arguments
path: Working directory
Returns:
str: Command output
Raises:
subprocess.CalledProcessError: If command execution fails
"""
cmd = [command] + args
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = stderr.decode() if stderr else "Command execution failed"
raise subprocess.CalledProcessError(
process.returncode,
cmd,
output=stdout,
stderr=stderr
)
return stdout.decode()
def run(self) -> None:
"""Start the MCP server"""
self.mcp.run()