Skip to main content
Glama

Command Line Interface Enhancer

by jon2allen
mcp_command_server_enh.py11.2 kB
import os import platform import sys import json import base64 import asyncio import logging import argparse import subprocess from typing import Dict, Any, List, Optional import tomli # Import tomli to load the config import re # Import re for robust command checking from pexpect_auto import PexpectAutomator # The user's template uses FastMCP, so we'll import that. from fastmcp import FastMCP # --- Logging setup (from template) --- LOG_FILE = "mcp_command_server.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stderr), # Log to stderr for visibility logging.FileHandler(LOG_FILE), ], ) logger = logging.getLogger("mcp_command_server") # --- Configuration Loading --- CONFIG_FILE = "config.toml" SERVER_CONFIG = {} DEFAULT_CONFIG = { "command_blocking": { "prohibited_commands": ['rm ', 'mv ', 'sudo ', 'su '], "override": False }, "restricted_files": [ "mcp_command_server_enh.py", "mcp_command_server.log", "pexpect_auto.py", "config.toml" ] } def load_config(): """Load configuration from the TOML file.""" global SERVER_CONFIG try: with open(CONFIG_FILE, "rb") as f: SERVER_CONFIG = tomli.load(f) logger.info("Configuration loaded successfully from %s", CONFIG_FILE) except FileNotFoundError: logger.error("Configuration file '%s' not found. Using safe defaults.", CONFIG_FILE) SERVER_CONFIG = DEFAULT_CONFIG except Exception as e: logger.error("Failed to parse configuration file '%s': %s. Using safe defaults.", CONFIG_FILE, e) SERVER_CONFIG = DEFAULT_CONFIG # --- Command Execution Logic (from mcp-server-commands) --- class ExecResult: """A simple data class to hold the result of a command execution.""" def __init__(self, stdout: str, stderr: str, code: Optional[int] = None): self.stdout = stdout self.stderr = stderr self.code = code # --- FUNCTION FOR COMMAND BLOCKING --- def is_command_blocked(command: str) -> bool: """Checks if a command contains prohibited substrings loaded from config.""" prohibited_cmds = SERVER_CONFIG.get("command_blocking", {}).get("prohibited_commands", []) # 1. Prepare list for robust checking (e.g., just 'rm', 'mv') cleaned_prohibited = [cmd.strip() for cmd in prohibited_cmds] if not cleaned_prohibited: return False # Pattern to catch command followed by space or dash (e.g., 'rm -rf') pattern = r'\b(' + '|'.join(re.escape(cmd) for cmd in cleaned_prohibited) + r')[\s-]' # Check command parts (for 'command1 && command2') command_parts = re.split(r'[;&|]+', command) command_lower = command.lower() for part in command_parts: # Check against the robust regex pattern if re.search(pattern, part.strip().lower()): return True # 2. Check the original strict match for block in prohibited_cmds: if block in command_lower: return True return False def is_restricted_file_access(command: str) -> bool: """Checks if a command involves accessing restricted files.""" restricted_files = SERVER_CONFIG.get("restricted_files", []) for restricted_file in restricted_files: # logger.info("restricted1 file %s", restricted_file ) if command.find(restricted_file) > -1: return True return False def check_override() -> bool: """Checks if the command execution override is enabled in the config.""" # Returns False if 'command_blocking' or 'override' key is missing, maintaining default security. return SERVER_CONFIG.get("command_blocking", {}).get("override", False) # ---------------------------------------------------- async def fish_workaround(interpreter: str, stdin: str, options: Dict[str, Any]) -> ExecResult: """A specific workaround for piping stdin to the fish shell.""" base64_stdin = base64.b64encode(stdin.encode('utf-8')).decode('utf-8') command = f'{interpreter} -c "echo {base64_stdin} | base64 -d | fish"' logger.info("Using fish workaround command: %s", interpreter) proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **options ) stdout, stderr = await proc.communicate() return ExecResult(stdout.decode('utf-8'), stderr.decode('utf-8'), proc.returncode) async def exec_command(command: str, stdin: Optional[str] = None, options: Optional[Dict[str, Any]] = None) -> ExecResult: """Executes a shell command asynchronously, capturing its output.""" options = options or {} try: # Apply the fish shell workaround if needed if command.split(" ")[0] == "fish" and stdin: return await fish_workaround(command, stdin, options) proc = await asyncio.create_subprocess_shell( command, stdin=asyncio.subprocess.PIPE if stdin else None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **options ) stdout_bytes, stderr_bytes = await proc.communicate( input=stdin.encode('utf-8') if stdin else None ) return ExecResult(stdout_bytes.decode('utf-8'), stderr_bytes.decode('utf-8'), proc.returncode) except FileNotFoundError: return ExecResult("", f"Command not found: {command}\n", 127) except Exception as e: logger.error("exec_command failed unexpectedly for command '%s': %s", command, e) return ExecResult("", str(e), 1) def format_result_messages(result: ExecResult) -> List[Dict[str, Any]]: """Formats the execution result into a list of MCP content dictionaries.""" messages = [] if result.code is not None: messages.append({"type": "text", "text": str(result.code), "name": "EXIT_CODE"}) if result.stdout: messages.append({"type": "text", "text": result.stdout, "name": "STDOUT"}) if result.stderr: messages.append({"type": "text", "text": result.stderr, "name": "STDERR"}) return messages # --- MCP Server Initialization --- mcp = FastMCP("mcp-server-commands") @mcp.resource("resource://system_info") def system_info() -> Dict[str, str]: """Provides basic information about the host operating system.""" logger.info("system_info() called") return { "os_name": platform.system(), "os_release": platform.release(), "architecture": platform.machine() } @mcp.tool async def run_command( command: str, workdir: Optional[str] = None, stdin: Optional[str] = None ) -> Dict[str, Any]: """ Run a shell command on the local machine and get the output. Args: command: The shell command to execute. workdir: The working directory for the command. If None, uses the current directory. stdin: Optional stdin to pipe into the command. Returns: A dictionary containing the command's output, exit code, and error status. """ logger.info("Received run_command request: command='%s', workdir='%s'", command, workdir) blocked_feedback = "This server is not authorized to run these commands" restricted_feedback = "This server is not authorized to access restricted files" # --- RESTRICTION BYPASS LOGIC --- if check_override(): logger.warning("Command restrictions bypassed via config override.") elif is_command_blocked(command): logger.warning("Blocked command denied: '%s'", command) return { "content": [ {"type": "text", "text": "1", "name": "EXIT_CODE"}, {"type": "text", "text": blocked_feedback + "\n", "name": "STDERR"}, ], "is_error": True } elif is_restricted_file_access(command): logger.warning("Restricted file access denied: '%s'", command) return { "content": [ {"type": "text", "text": "1", "name": "EXIT_CODE"}, {"type": "text", "text": restricted_feedback + "\n", "name": "STDERR"}, ], "is_error": True } # -------------------------------- options = {"cwd": workdir} if workdir else {} exec_result = await exec_command(command, stdin, options) is_error = exec_result.code != 0 if is_error: logger.warning("Command '%s' failed with exit code %d", command, exec_result.code) return { "content": format_result_messages(exec_result), "is_error": is_error } @mcp.tool async def get_current_dir() -> str: """ Get the current working directory returns str -> directory ( ex "/home/user1" """ current_dir = os.getcwd() logger.info("Received get_current_dir requst: current dir is '%s'", current_dir ) return current_dir @mcp.tool async def change_dir(c_dir: str) -> str: """Change the directory to specified string relative and absolute paths are supported If error - will return string "error: invalid directory" """ logger.info("Received change_dir request: '%s'", c_dir) # Expand $HOME to the full home directory path if "$HOME" in c_dir: home_dir = os.environ.get('HOME', '') expanded_dir = c_dir.replace("$HOME", home_dir) else: expanded_dir = os.path.expanduser(c_dir) try: # Attempt to change the directory os.chdir(expanded_dir) return expanded_dir except Exception as e: # Log the error and return an error message logger.info("invalid dir: '%s'", expanded_dir) return "error: invalid directory" @mcp.tool() def run_expect_script( program: str, actions: list[dict[str, str]] ) -> str: """ Run a program with a sequence of expect/send actions for programs that are interactive. Programs that require inputs. important: do not send carriage return or line feed with text on send. Args: program: The command to run (e.g. "python3 myscript.py"). Can be any command actions: A list of dicts, e.g. [{"action": "expect", "text": "foo"}, {"action":"send","text":"bar"}] Returns: The output from the interaction. """ logger.info("running pexpect for pgm : '%s'", program ) logger.info("Actions: %s", json.dumps(actions, indent=2)) # Translate from dicts to your internal format tuple_actions = [] for act_d in actions: act = act_d.get("action") text = act_d.get("text") # maybe validate if act not in ("expect", "send"): raise ValueError(f"Invalid action {act}") tuple_actions.append((act, text)) autom = PexpectAutomator(program, tuple_actions) output = autom.run() if output is None: # You could choose to raise, or return error info raise RuntimeError("PexpectAutomator failed") # Optionally log or return structured output return output if __name__ == "__main__": # Load configuration before starting the server load_config() logger.info("Starting MCP Command Server") mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jon2allen/mcp_command_serv'

If you have feedback or need assistance with the MCP directory API, please join our Discord server