Skip to main content
Glama

MCP Terminal

by sichang824
MIT License
17
  • Apple
  • Linux
server.py11.6 kB
#!/usr/bin/env python3 """ MCP Terminal Server This module implements a terminal control server using the Model Context Protocol (MCP). It supports both standard input/output (stdio) and Server-Sent Events (SSE) transports. """ import argparse import asyncio import logging import platform import signal import sys from enum import Enum from typing import Optional from mcp.server.fastmcp import FastMCP from mcp_terminal.tools.file import FileTool from mcp_terminal.tools.terminal import TerminalTool # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("MCP:Terminal:Server") class ServerMode(str, Enum): """Server transport modes.""" STDIO = "stdio" SSE = "sse" class MCPTerminalServer: """ MCP Terminal Server that registers and exposes terminal tools. """ def __init__( self, controller_type: Optional[str] = None, mode: ServerMode = ServerMode.STDIO, host: str = "127.0.0.1", port: int = 3000, log_level: str = "INFO", whitelist_file: Optional[str] = None, blacklist_file: Optional[str] = None, whitelist_mode: bool = False, ): """ Initialize the MCP Terminal Server. Args: controller_type: Type of terminal controller to use ("iterm", "applescript", "subprocess") mode: Server transport mode (stdio or sse) host: Host to bind the server to (for SSE mode) port: Port to bind the server to (for SSE mode) log_level: Logging level whitelist_file: Path to command whitelist file blacklist_file: Path to command blacklist file whitelist_mode: If True, only whitelisted commands are allowed """ self.controller_type = controller_type self.mode = mode self.host = host self.port = port self.whitelist_file = whitelist_file self.blacklist_file = blacklist_file self.whitelist_mode = whitelist_mode # Set up logging logging.getLogger().setLevel(getattr(logging, log_level)) # Create MCP server self.mcp = FastMCP("mcp-terminal", host=host, port=port, log_level=log_level) self.tools = {} self.tools_registered = False def register_tools(self) -> None: """ Register all tools with the MCP server. """ if self.tools_registered: return logger.info("Registering terminal tool with MCP server") try: # Create and register the terminal tool terminal_tool = TerminalTool( self.controller_type, whitelist_file=self.whitelist_file, blacklist_file=self.blacklist_file, whitelist_mode=self.whitelist_mode, ) file_tool = FileTool() terminal_tool.register_mcp(self.mcp) file_tool.register_mcp(self.mcp) self.tools["terminal"] = terminal_tool self.tools["file"] = file_tool self.tools_registered = True logger.info("Terminal tool registered with MCP server") except Exception as e: logger.error(f"Failed to register terminal tool: {str(e)}", exc_info=True) raise async def start(self) -> None: """ Start the MCP Terminal Server. """ # Register all tools self.register_tools() # Start the server using the appropriate transport if self.mode == ServerMode.SSE: logger.info( f"Starting MCP Terminal Server in SSE mode on {self.host}:{self.port}" ) await self.mcp.run_sse_async() else: # STDIO mode logger.info("Starting MCP Terminal Server in stdio mode") await self.mcp.run_stdio_async() async def cleanup(self) -> None: """ Clean up resources before shutting down. """ logger.info("Starting cleanup process") # First ensure MCP resources are properly cleaned up if hasattr(self.mcp, "_mcp_server") and hasattr( self.mcp._mcp_server, "_task_group" ): logger.info("Ensuring MCP task groups are properly closed") try: # Give in-flight requests a chance to complete await asyncio.sleep(0.1) except Exception as e: logger.warning(f"Error during MCP cleanup delay: {e}") # Then clean up tool controllers for tool_name, tool in self.tools.items(): if hasattr(tool, "controller") and hasattr(tool.controller, "cleanup"): logger.info(f"Cleaning up {tool_name} controller") try: await tool.controller.cleanup() except Exception as e: logger.warning(f"Error cleaning up {tool_name} controller: {e}") logger.info("Cleanup process completed") def main() -> None: """ Main entry point for the MCP Terminal Server. """ # Parse command line arguments parser = argparse.ArgumentParser(description="MCP Terminal Server") # Terminal controller options controller_group = parser.add_argument_group("Terminal Controller Options") controller_group.add_argument( "--controller", "-c", choices=["auto", "iterm", "applescript", "subprocess"], default="auto", help="Terminal controller to use (default: auto-detect)", ) # Server mode options server_group = parser.add_argument_group("Server Options") server_group.add_argument( "--mode", "-m", choices=[mode.value for mode in ServerMode], default=ServerMode.STDIO.value, help=f"Server mode (default: {ServerMode.STDIO.value})", ) server_group.add_argument( "--host", default="127.0.0.1", help="Host to bind the server to in SSE mode (default: 127.0.0.1)", ) server_group.add_argument( "--port", "-p", type=int, default=3000, help="Port to bind the server to in SSE mode (default: 3000)", ) # Command filtering options security_group = parser.add_argument_group("Security Options") security_group.add_argument( "--whitelist-file", type=str, help="Path to whitelist file with allowed commands (one per line)", ) security_group.add_argument( "--blacklist-file", type=str, help="Path to blacklist file with blocked commands (one per line)", ) security_group.add_argument( "--whitelist-mode", action="store_true", help="Enable whitelist mode (only allow commands in whitelist)", ) # Logging options logging_group = parser.add_argument_group("Logging Options") logging_group.add_argument( "--log-level", "-l", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="INFO", help="Logging level (default: INFO)", ) args = parser.parse_args() # Determine controller type controller_type = None if args.controller == "auto" else args.controller # Adjust controller based on platform if controller_type in ["iterm", "applescript"] and platform.system() != "Darwin": logger.warning( f"{controller_type} controller not available on {platform.system()}. Falling back to subprocess." ) controller_type = "subprocess" # Create the server server = MCPTerminalServer( controller_type=controller_type, mode=args.mode, host=args.host, port=args.port, log_level=args.log_level, whitelist_file=args.whitelist_file, blacklist_file=args.blacklist_file, whitelist_mode=args.whitelist_mode, ) # Run the server loop = asyncio.get_event_loop() shutdown_in_progress = False # Define a shutdown handler that ensures we only run shutdown once async def handle_shutdown(): nonlocal shutdown_in_progress if shutdown_in_progress: logger.info("Shutdown already in progress, ignoring additional signal") return shutdown_in_progress = True await shutdown(loop, server) try: # Set up signal handlers for graceful shutdown if ( sys.platform != "win32" ): # Windows doesn't support SIGTERM/SIGINT handling in the same way for signal_name in ["SIGTERM", "SIGINT"]: loop.add_signal_handler( getattr(signal, signal_name), lambda: asyncio.create_task(handle_shutdown()), ) # Run the server loop.run_until_complete(server.start()) except KeyboardInterrupt: logger.info("Received keyboard interrupt") # Handle KeyboardInterrupt manually by running the shutdown coroutine if not shutdown_in_progress: try: # Make sure we properly shut down even if interrupted loop.run_until_complete(handle_shutdown()) except Exception as e: logger.error(f"Error during shutdown after keyboard interrupt: {e}") except Exception as e: logger.error(f"Error during server execution: {e}", exc_info=True) finally: logger.info("Server shutting down") # Close the event loop try: # Gather any remaining tasks pending = asyncio.all_tasks(loop) if pending: # Cancel all remaining tasks for task in pending: task.cancel() # Use a separate try/except block to avoid suppressing the original exception try: # Wait for a short time for tasks to cancel loop.run_until_complete(asyncio.wait(pending, timeout=0.5)) except (asyncio.CancelledError, RuntimeError): # Ignore cancelled errors and "Event loop stopped before Future completed" pass except Exception as e: logger.error(f"Error during final cleanup: {e}") finally: # Close the event loop try: loop.close() except Exception as e: logger.error(f"Error closing event loop: {e}") # Exit with a success code sys.exit(0) async def shutdown(loop, server): """Handle graceful shutdown.""" logger.info("Shutdown signal received") # Give pending tasks a chance to complete await asyncio.sleep(0.2) # Perform cleanup await server.cleanup() # Cancel all running tasks tasks = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()] if tasks: logger.info(f"Cancelling {len(tasks)} pending tasks") for task in tasks: task.cancel() # Wait for tasks to cancel with a timeout try: await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=3.0, # Increased timeout ) except asyncio.TimeoutError: logger.warning("Some tasks did not cancel within the timeout period") except Exception as e: logger.warning(f"Error during task cancellation: {e}") # Stop the loop loop.stop() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sichang824/mcp-terminal'

If you have feedback or need assistance with the MCP directory API, please join our Discord server