Skip to main content
Glama

Command-Line MCP Server

by andresthor
server.py41.3 kB
""" Command-line MCP server that safely executes Unix/macOS terminal commands. """ import argparse import asyncio import logging import os from typing import Any, Dict, List, Optional from mcp.server.fastmcp import FastMCP from cmd_line_mcp.config import Config from cmd_line_mcp.security import ( validate_command, extract_directory_from_command, is_directory_whitelisted, normalize_path, ) from cmd_line_mcp.session import SessionManager # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class CommandLineMCP: """Command-line MCP server for Unix/macOS terminal commands.""" def __init__( self, config_path: Optional[str] = None, env_file_path: Optional[str] = None, ): """Initialize the MCP server. Args: config_path: Optional path to a configuration file env_file_path: Optional path to a .env file """ self.config = Config(config_path, env_file_path) self.session_manager = SessionManager() # Create a fixed persistent session for Claude Desktop mode - FIXED ID self.claude_desktop_session_id = "claude_desktop_fixed_session" # NOTE: We don't pre-approve any directories - let the user explicitly approve them # or configure them in the whitelist logger.info( f"Created persistent Claude Desktop session: {self.claude_desktop_session_id}" ) # Set up logging log_level = self.config.get("server", "log_level", "INFO") logger.setLevel(getattr(logging, log_level)) # Load command lists from config command_lists = self.config.get_effective_command_lists() self.read_commands = command_lists["read"] self.write_commands = command_lists["write"] self.system_commands = command_lists["system"] self.blocked_commands = command_lists["blocked"] self.dangerous_patterns = command_lists["dangerous_patterns"] # Get separator support status self.separator_support = self.config.has_separator_support() # Get whitelisted directories from config self.whitelisted_directories = self.config.get_section("security").get( "whitelisted_directories", ["/home", "/tmp"] ) # Initialize MCP app server_config = self.config.get_section("server") version = server_config.get("version", "0.5.1") description = server_config.get( "description", "MCP server for safely executing command-line tools", ) instructions = f"{description} (v{version})" self.app = FastMCP( server_config.get("name", "cmd-line-mcp"), instructions=instructions, ) # Store capabilities data to use in get_command_help tool self.command_capabilities = { "supported_commands": { "read": self.read_commands, "write": self.write_commands, "system": self.system_commands, }, "blocked_commands": self.blocked_commands, "command_chaining": { "pipe": ( "Supported" if self.separator_support["pipe"] else "Not supported" ), "semicolon": ( "Supported" if self.separator_support["semicolon"] else "Not supported" ), "ampersand": ( "Supported" if self.separator_support["ampersand"] else "Not supported" ), }, "command_restrictions": "Special characters like $(), ${}, backticks, and I/O redirection are blocked", } self.usage_examples = [ { "command": "ls ~/Downloads", "description": "List files in downloads directory", }, { "command": "cat ~/.bashrc", "description": "View bash configuration", }, { "command": "du -h ~/Downloads/* | grep G", "description": "Find large files in downloads folder", }, { "command": 'find ~/Downloads -type f -name "*.pdf"', "description": "Find all PDF files in downloads", }, { "command": "head -n 20 ~/Documents/notes.txt", "description": "View the first 20 lines of a file", }, { "command": "ls -la | awk '{print $1, $9}'", "description": "List files showing permissions and names using awk", }, { "command": "cat file.txt | awk '{if($1>10) print $0}'", "description": "Filter lines where first column is greater than 10", }, ] # Register tools execute_command_tool = self.app.tool() @execute_command_tool # Keep decorator reference to satisfy linters async def execute_command( command: str, session_id: Optional[str] = None ) -> Dict[str, Any]: """ Execute a Unix/macOS terminal command. Args: command: The command to execute session_id: Optional session ID for permission management Returns: A dictionary with command output and status """ # For Claude Desktop compatibility, use the fixed session ID require_session_id = self.config.get( "security", "require_session_id", False ) if not session_id or not require_session_id: session_id = self.claude_desktop_session_id logger.info(f"Using persistent Claude Desktop session: {session_id}") return await self._execute_command(command, session_id=session_id) # Store reference to silence linter warnings self._execute_command_func = execute_command execute_read_command_tool = self.app.tool() @execute_read_command_tool # Keep decorator reference to satisfy linters async def execute_read_command( command: str, session_id: Optional[str] = None ) -> Dict[str, Any]: """ Execute a read-only Unix/macOS terminal command (ls, cat, grep, etc.). Args: command: The read-only command to execute session_id: Optional session ID for permission management Returns: A dictionary with command output and status """ # For Claude Desktop compatibility, use the fixed session ID when no session ID provided if not session_id: session_id = self.claude_desktop_session_id logger.info( f"Using persistent Claude Desktop session for read command: {session_id}" ) # Validate command and check directory permissions in one go # Get the latest command lists command_lists = self.config.get_effective_command_lists() allow_separators = self.config.get( "security", "allow_command_separators", True ) validation = validate_command( command, command_lists["read"], command_lists["write"], command_lists["system"], command_lists["blocked"], command_lists["dangerous_patterns"], allow_command_separators=allow_separators, ) if not validation["is_valid"]: return { "success": False, "output": "", "error": validation["error"], } if validation["command_type"] != "read": return { "success": False, "output": "", "error": "This tool only supports read commands. Use execute_command for other command types.", } # Extract directory and check permissions (apply same directory checks as in _execute_command) working_dir = extract_directory_from_command(command) logger.info(f"Read command - extracted working directory: {working_dir}") # Check if directory is whitelisted or has session approval directory_allowed = False if working_dir: # Check global whitelist first if is_directory_whitelisted(working_dir, self.whitelisted_directories): directory_allowed = True logger.info( f"Read command - directory '{working_dir}' is globally whitelisted" ) # Check session approvals if we have a session ID elif session_id and self.session_manager.has_directory_approval( session_id, working_dir ): directory_allowed = True logger.info( f"Read command - directory '{working_dir}' is approved for session {session_id}" ) else: logger.warning( f"Read command - directory '{working_dir}' is not whitelisted or approved" ) # For Claude Desktop compatibility mode (require_session_id = False) require_session_id = self.config.get( "security", "require_session_id", False ) auto_approve_in_desktop = self.config.get_section("security").get( "auto_approve_directories_in_desktop_mode", False ) if not require_session_id: # Check if the directory is approved in the persistent desktop session if self.session_manager.has_directory_approval( self.claude_desktop_session_id, working_dir ): directory_allowed = True logger.info( f"Read command - directory '{working_dir}' is approved in persistent desktop session" ) elif auto_approve_in_desktop: # Auto-approve directories in desktop mode if configured directory_allowed = True # Also add to persistent session for future requests self.session_manager.approve_directory( self.claude_desktop_session_id, working_dir ) logger.warning( f"Read command - auto-approving directory access in desktop mode: {working_dir}" ) else: # Only allow whitelisted directories if auto-approve is off directory_allowed = False logger.warning( f"Read command - directory '{working_dir}' is not whitelisted - restricting access" ) else: # If we couldn't extract a directory, default to requiring permission logger.warning( "Read command - could not extract working directory from command" ) working_dir = os.getcwd() # Default to current directory # Check whitelist for current directory if is_directory_whitelisted(working_dir, self.whitelisted_directories): directory_allowed = True elif session_id and self.session_manager.has_directory_approval( session_id, working_dir ): directory_allowed = True else: # For Claude Desktop compatibility mode require_session_id = self.config.get( "security", "require_session_id", False ) auto_approve_in_desktop = self.config.get_section("security").get( "auto_approve_directories_in_desktop_mode", False ) if not require_session_id: # Check if the directory is approved in the persistent desktop session if self.session_manager.has_directory_approval( self.claude_desktop_session_id, working_dir ): directory_allowed = True logger.info( f"Read command - directory '{working_dir}' is approved in persistent desktop session" ) elif auto_approve_in_desktop: # Auto-approve directories in desktop mode if configured directory_allowed = True # Also add to persistent session for future requests self.session_manager.approve_directory( self.claude_desktop_session_id, working_dir ) logger.warning( f"Read command - auto-approving directory access in desktop mode: {working_dir}" ) else: # Only allow whitelisted directories if auto-approve is off directory_allowed = False # If directory is not allowed if not directory_allowed: # Check if we're in Claude Desktop mode (no session ID or require_session_id=false) require_session_id = self.config.get( "security", "require_session_id", False ) if not session_id or not require_session_id: # Always use the fixed persistent session ID for Claude Desktop desktop_session_id = self.claude_desktop_session_id # Include approval request information for Claude Desktop return { "success": False, "output": "", "error": f"Read command - access to directory '{working_dir}' is not allowed. Only whitelisted directories can be accessed.\n" + f"Whitelisted directories include: {', '.join(self.whitelisted_directories)}\n" + "Note: To request access to this directory, use the approve_directory tool with:\n" + f' approve_directory(directory="{working_dir}", session_id="{desktop_session_id}", remember=True)', "directory": working_dir, "session_id": desktop_session_id, "requires_directory_approval": True, # Signal that approval is needed } else: # For normal mode, request approval return { "success": False, "output": "", "error": f"Read command - directory '{working_dir}' requires approval. Use approve_directory tool with session_id '{session_id}'.", "requires_directory_approval": True, "directory": working_dir, "session_id": session_id, } # Now that we've validated both the command and directory permissions, execute the command return await self._execute_command( command, command_type="read", session_id=session_id ) # Store reference to silence linter warnings self._execute_read_command_func = execute_read_command list_available_commands_tool = self.app.tool() @list_available_commands_tool # Keep decorator reference to satisfy linters async def list_available_commands() -> Dict[str, List[str]]: """ List all available commands by category. Returns: A dictionary with commands grouped by category """ # Get the latest command lists command_lists = self.config.get_effective_command_lists() return { "read_commands": command_lists["read"], "write_commands": command_lists["write"], "system_commands": command_lists["system"], "blocked_commands": command_lists["blocked"], } # Store reference to silence linter warnings self._list_available_commands_func = list_available_commands get_command_help_tool = self.app.tool() @get_command_help_tool # Keep decorator reference to satisfy linters async def get_command_help() -> Dict[str, Any]: """ Get detailed help about command capabilities and usage. This tool provides comprehensive information about: - Supported commands in each category (read, write, system) - Blocked commands for security reasons - Command chaining capabilities (pipes, semicolons, ampersands) - Usage restrictions and examples Returns: A dictionary with detailed information about command capabilities and usage """ # Get the latest command lists and separator support command_lists = self.config.get_effective_command_lists() separator_support = self.config.has_separator_support() # Log the separator support for debugging logger.info(f"Separator support status: {separator_support}") logger.info( f"allow_command_separators setting: {self.config.get('security', 'allow_command_separators')}" ) # Extra check for pipe character in dangerous patterns pipe_in_patterns = any( "|" in p or r"\|" in p for p in command_lists["dangerous_patterns"] ) logger.info( f"Pipe character found in dangerous patterns: {pipe_in_patterns}" ) # Update capabilities updated_capabilities = { "supported_commands": { "read": command_lists["read"], "write": command_lists["write"], "system": command_lists["system"], }, "blocked_commands": command_lists["blocked"], "command_chaining": { "pipe": ( "Supported" if separator_support["pipe"] else "Not supported" ), "semicolon": ( "Supported" if separator_support["semicolon"] else "Not supported" ), "ampersand": ( "Supported" if separator_support["ampersand"] else "Not supported" ), }, "command_restrictions": "Special characters like $(), ${}, backticks, and I/O redirection are blocked", } # Provide helpful information for Claude to understand command usage return { "capabilities": updated_capabilities, "examples": self.usage_examples, "recommended_approach": { "finding_large_files": "Use 'du -h <directory>/* | sort -hr | head -n 10' to find the 10 largest files", "file_searching": "Use 'find <directory> -type f -name \"pattern\"' for file searches", "text_searching": "Use 'grep \"pattern\" <file>' to search in files", "file_viewing": "Use 'cat', 'head', or 'tail' for viewing files", "sorting": "Use 'sort' with options like -n (numeric), -r (reverse), -h (human readable sizes)", "text_processing": "Use 'awk' for advanced text processing. For example: 'ls -la | awk \"{print $1, $9}\"' to show permissions and filenames", "column_filtering": "Use 'awk' to filter by column values: 'cat data.txt | awk \"{if($3 > 100) print}\"' to show lines where column 3 exceeds 100", }, "permissions": { "read_commands": "Can be executed without confirmation", "write_commands": "Require approval for first use in a session", "system_commands": "Require approval for first use in a session", }, } # Store reference to silence linter warnings self._get_command_help_func = get_command_help approve_command_type_tool = self.app.tool() @approve_command_type_tool # Keep decorator reference to satisfy linters async def approve_command_type( command_type: str, session_id: str, remember: bool = False ) -> Dict[str, Any]: """ Approve a command type for the current session. Args: command_type: The command type to approve (read, write, system) session_id: The session ID remember: Whether to remember this approval for the session Returns: A dictionary with approval status """ if command_type not in ["read", "write", "system"]: return { "success": False, "message": f"Invalid command type: {command_type}", } if remember: self.session_manager.approve_command_type(session_id, command_type) return { "success": True, "message": f"Command type '{command_type}' approved for this session", } else: return { "success": True, "message": f"Command type '{command_type}' approved for one-time use", } # Store reference to silence linter warnings self._approve_command_type_func = approve_command_type # Add tool for directory approval approve_directory_tool = self.app.tool() @approve_directory_tool # Keep decorator reference to satisfy linters async def approve_directory( directory: str, session_id: str, remember: bool = True ) -> Dict[str, Any]: """ Approve access to a directory for the current session. Args: directory: The directory to approve access to session_id: The session ID remember: Whether to remember this approval for the session Returns: A dictionary with approval status """ # Normalize the directory path normalized_dir = normalize_path(directory) # Check if directory is already whitelisted globally if is_directory_whitelisted(normalized_dir, self.whitelisted_directories): return { "success": True, "message": f"Directory '{normalized_dir}' is already globally whitelisted", "directory": normalized_dir, } # Check if directory is already approved for this session if self.session_manager.has_directory_approval(session_id, normalized_dir): return { "success": True, "message": f"Directory '{normalized_dir}' is already approved for this session", "directory": normalized_dir, } # Approve the directory for this session if remember: self.session_manager.approve_directory(session_id, normalized_dir) return { "success": True, "message": f"Directory '{normalized_dir}' approved for this session", "directory": normalized_dir, } else: return { "success": True, "message": f"Directory '{normalized_dir}' approved for one-time use", "directory": normalized_dir, } # Store reference to silence linter warnings self._approve_directory_func = approve_directory # Tool to list whitelisted and approved directories list_directories_tool = self.app.tool() @list_directories_tool # Keep decorator reference to satisfy linters async def list_directories(session_id: Optional[str] = None) -> Dict[str, Any]: """ List all whitelisted and approved directories. Args: session_id: Optional session ID to get session-specific approvals Returns: A dictionary with globally whitelisted and session-approved directories """ result = { "whitelisted_directories": self.whitelisted_directories, "session_approved_directories": [], } if session_id: result["session_approved_directories"] = list( self.session_manager.get_approved_directories(session_id) ) return result # Store reference to silence linter warnings self._list_directories_func = list_directories # Register new tools for configuration get_configuration_tool = self.app.tool() @get_configuration_tool # Keep decorator reference to satisfy linters async def get_configuration() -> Dict[str, Any]: """ Get the current configuration settings. Returns: The current configuration settings """ # Get a safe copy of the configuration config_copy = self.config.get_all() # Format it for display return { "server": config_copy["server"], "security": config_copy["security"], "commands": { "read_count": len(config_copy["commands"]["read"]), "write_count": len(config_copy["commands"]["write"]), "system_count": len(config_copy["commands"]["system"]), "blocked_count": len(config_copy["commands"]["blocked"]), "dangerous_patterns_count": len( config_copy["commands"]["dangerous_patterns"] ), "full_command_lists": config_copy["commands"], }, "output": config_copy["output"], "separator_support": self.config.has_separator_support(), "directory_whitelisting": { "enabled": True, "whitelisted_directories": self.whitelisted_directories, "note": "Directories not in this list will require session approval", }, } # Store reference to silence linter warnings self._get_configuration_func = get_configuration # update_configuration tool removed async def _execute_command( self, command: str, command_type: Optional[str] = None, session_id: Optional[str] = None, ) -> Dict[str, Any]: """Execute a Unix/macOS terminal command. Args: command: The command to execute command_type: Optional type of command (read, write, system) session_id: Optional session ID for permission management Returns: A dictionary with command output and status """ # Get the latest command lists and separator settings command_lists = self.config.get_effective_command_lists() allow_separators = self.config.get("security", "allow_command_separators", True) # Validate the command validation = validate_command( command, command_lists["read"], command_lists["write"], command_lists["system"], command_lists["blocked"], command_lists["dangerous_patterns"], allow_command_separators=allow_separators, ) if not validation["is_valid"]: return { "success": False, "output": "", "error": validation["error"], } # If command_type is specified, ensure it matches the validated type if command_type and validation["command_type"] != command_type: return { "success": False, "output": "", "error": f"Command type mismatch. Expected {command_type}, got {validation['command_type']}", } actual_command_type = validation["command_type"] # Extract the working directory from the command working_dir = extract_directory_from_command(command) logger.info(f"Extracted working directory from command: {working_dir}") # Check if directory is whitelisted or has session approval directory_allowed = False if working_dir: # Check global whitelist first if is_directory_whitelisted(working_dir, self.whitelisted_directories): directory_allowed = True logger.info(f"Directory '{working_dir}' is globally whitelisted") # Check session approvals if we have a session ID elif session_id and self.session_manager.has_directory_approval( session_id, working_dir ): directory_allowed = True logger.info( f"Directory '{working_dir}' is approved for session {session_id}" ) else: logger.warning( f"Directory '{working_dir}' is not whitelisted or approved" ) # For Claude Desktop compatibility mode (require_session_id = False) require_session_id = self.config.get( "security", "require_session_id", False ) auto_approve_in_desktop = self.config.get_section("security").get( "auto_approve_directories_in_desktop_mode", False ) if not require_session_id: # Check if the directory is approved in the persistent desktop session if self.session_manager.has_directory_approval( self.claude_desktop_session_id, working_dir ): directory_allowed = True logger.info( f"Directory '{working_dir}' is approved in persistent desktop session" ) elif auto_approve_in_desktop: # Auto-approve directories in desktop mode if configured directory_allowed = True # Also add to persistent session for future requests self.session_manager.approve_directory( self.claude_desktop_session_id, working_dir ) logger.warning( f"Auto-approving directory access in desktop mode: {working_dir}" ) else: # Only allow whitelisted directories if auto-approve is off directory_allowed = False logger.warning( f"Directory '{working_dir}' is not whitelisted - restricting access" ) else: # If we couldn't extract a directory, default to requiring permission logger.warning("Could not extract working directory from command") working_dir = os.getcwd() # Default to current directory # Check whitelist for current directory if is_directory_whitelisted(working_dir, self.whitelisted_directories): directory_allowed = True elif session_id and self.session_manager.has_directory_approval( session_id, working_dir ): directory_allowed = True else: # For Claude Desktop compatibility mode require_session_id = self.config.get( "security", "require_session_id", False ) auto_approve_in_desktop = self.config.get_section("security").get( "auto_approve_directories_in_desktop_mode", False ) if not require_session_id: # Check if the directory is approved in the persistent desktop session if self.session_manager.has_directory_approval( self.claude_desktop_session_id, working_dir ): directory_allowed = True logger.info( f"Directory '{working_dir}' is approved in persistent desktop session" ) elif auto_approve_in_desktop: # Auto-approve directories in desktop mode if configured directory_allowed = True # Also add to persistent session for future requests self.session_manager.approve_directory( self.claude_desktop_session_id, working_dir ) logger.warning( f"Auto-approving directory access in desktop mode: {working_dir}" ) else: # Only allow whitelisted directories if auto-approve is off directory_allowed = False # If directory is not allowed if not directory_allowed: # Check if we're in Claude Desktop mode (no session ID or require_session_id=false) require_session_id = self.config.get( "security", "require_session_id", False ) if not session_id or not require_session_id: # Always use the fixed persistent session ID for Claude Desktop desktop_session_id = self.claude_desktop_session_id # Include approval request information for Claude Desktop return { "success": False, "output": "", "error": f"Access to directory '{working_dir}' is not allowed. Only whitelisted directories can be accessed.\n" + f"Whitelisted directories include: {', '.join(self.whitelisted_directories)}\n" + "Note: To request access to this directory, use the approve_directory tool with:\n" + f' approve_directory(directory="{working_dir}", session_id="{desktop_session_id}", remember=True)', "directory": working_dir, "session_id": desktop_session_id, "requires_directory_approval": True, # Signal that approval is needed } else: # For normal mode, request approval return { "success": False, "output": "", "error": f"Directory '{working_dir}' requires approval. Use approve_directory tool with session_id '{session_id}'.", "requires_directory_approval": True, "directory": working_dir, "session_id": session_id, } # For read commands, bypass command type approval if actual_command_type == "read": # No approval needed for read commands pass # For write and system commands with user confirmation enabled elif actual_command_type in ["write", "system"] and self.config.get( "security", "allow_user_confirmation", True ): # Check if we require a session ID (turn off for Claude Desktop compatibility) require_session_id = self.config.get( "security", "require_session_id", False ) # WORKAROUND FOR CLAUDE DESKTOP: # Either auto-approve if no session_id provided OR if require_session_id is False if not session_id or not require_session_id: if not session_id: logger.warning( "No session ID provided, auto-approving command: %s", command, ) else: logger.warning( "Session validation disabled, auto-approving command: %s", command, ) # Auto-approve without requiring explicit permission pass # Normal session-based approval when require_session_id is True elif not self.session_manager.has_command_approval( session_id, command ) and not self.session_manager.has_command_type_approval( session_id, actual_command_type ): return { "success": False, "output": "", "error": f"Command '{command}' requires approval. Use approve_command_type tool with session_id '{session_id}'.", "requires_approval": True, "command_type": actual_command_type, "session_id": session_id, } # Execute the command try: logger.info(f"Executing command: {command}") # Using subprocess with shell=True is necessary for many shell commands, # but we've validated the command for safety already process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() output = stdout.decode("utf-8", errors="replace") error = stderr.decode("utf-8", errors="replace") # Limit output size to prevent huge responses max_output_size = self.config.get( "output", "max_size", 100 * 1024 ) # 100KB default if len(output) > max_output_size: output = ( output[:max_output_size] + "\n... [output truncated due to size]" ) return { "success": process.returncode == 0, "output": output, "error": error, "exit_code": process.returncode, "command_type": actual_command_type, } except Exception as e: logger.error(f"Error executing command: {str(e)}") return {"success": False, "output": "", "error": str(e)} async def run_async(self): """Run the MCP server asynchronously.""" # Clean old sessions periodically async def clean_sessions(): while True: session_timeout = self.config.get( "security", "session_timeout", 3600 ) # 1 hour default await asyncio.sleep(session_timeout) self.session_manager.clean_old_sessions(session_timeout) # Start session cleaning task asyncio.create_task(clean_sessions()) # Run the MCP server using its internal method # This is used when we want to run within an existing event loop await self.app.run_stdio_async() def run(self): """Run the MCP server with its own event loop.""" # Let the app handle the event loop directly self.app.run() def main(): """Run the command-line MCP server.""" parser = argparse.ArgumentParser(description="Command-line MCP server") parser.add_argument("--config", "-c", help="Path to configuration file") parser.add_argument("--env", "-e", help="Path to .env file") args = parser.parse_args() server = CommandLineMCP(config_path=args.config, env_file_path=args.env) server.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/andresthor/cmd-line-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server