import logging
import time
import uuid
from typing import Any
import pexpect
from fastmcp import FastMCP
# Initialize FastMCP Server
mcp = FastMCP('Interactive-Terminal-MCP')
# Global State: Maps Session ID to session dictionary
# Structure: {'process': pexpect.spawn, 'command': List[str], 'history': str}
SESSIONS: dict[str, Any] = {}
# Configure Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger('mcp-cli')
@mcp.tool()
def spawn_process(command: list[str]) -> str:
"""
Spawns a new interactive CLI process.
Args:
command: A list containing the command and its arguments (e.g., ["bash"] or ["python3", "-i"]).
Returns:
str: The session ID for subsequent interactions, or an error message.
"""
if not command:
logger.error('Attempted to spawn process with empty command list.')
return 'Error: Command list cannot be empty.'
session_id = str(uuid.uuid4())[:8]
cmd_executable = command[0]
cmd_args = command[1:]
try:
# Spawn the process using utf-8 encoding to handle text I/O
child = pexpect.spawn(cmd_executable, cmd_args, encoding='utf-8')
# Set terminal window size to prevent weird line wrapping issues in output
child.setwinsize(24, 120)
SESSIONS[session_id] = {
'process': child,
'command': command,
'history': '',
}
logger.info(f"Session {session_id} started: {' '.join(command)}")
return f"Session started. ID: {session_id}"
except Exception as e:
logger.exception(f"Failed to spawn process: {cmd_executable}")
return f"Error starting process: {str(e)}"
@mcp.tool()
def send_command(
session_id: str,
cmd: str,
wait_for: str | None = None,
timeout: float = 2.0,
) -> str:
"""
Sends a command to a specific session and reads the output.
Args:
session_id: The ID returned by spawn_process.
cmd: The command string to send to the process.
wait_for: (Optional) A regex pattern indicating the shell prompt or expected end of output.
If provided, reads until pattern match.
If None, waits for a short duration and reads available buffer.
timeout: Time in seconds to wait for output (default: 2.0s).
Returns:
str: The standard output captured from the process.
"""
if session_id not in SESSIONS:
return 'Error: Session ID not found.'
session = SESSIONS[session_id]
child = session['process']
try:
logger.debug(f"Sending command to {session_id}: {cmd}")
child.sendline(cmd)
output = ''
if wait_for:
# Strategy A: Expect Pattern (Reliable)
# Blocks until the specific regex pattern is found or timeout occurs
index = child.expect(
[wait_for, pexpect.EOF, pexpect.TIMEOUT], timeout=timeout,
)
# 'before' contains output up to the match; 'after' contains the match itself
output = child.before + child.after
if index == 1:
output += '\n[Process finished: EOF]'
elif index == 2:
output += f"\n[Timeout: Pattern '{wait_for}' not found]"
else:
# Strategy B: Blind Read (Fallback)
# Useful when the prompt is unknown.
# Note: Brief sleep allows the subprocess to process the command and fill the buffer.
time.sleep(0.5)
try:
while True:
# Read chunks non-blockingly until buffer is empty
chunk = child.read_nonblocking(size=1024, timeout=0.1)
output += chunk
except pexpect.TIMEOUT:
# Buffer is empty, stop reading
pass
except pexpect.EOF:
output += '\n[Process finished: EOF]'
# Update session history
session['history'] += f"\n> {cmd}\n{output}"
return output
except Exception as e:
logger.error(f"Error during interaction in session {session_id}: {e}")
return f"Error during interaction: {str(e)}"
@mcp.tool()
def read_buffer(session_id: str) -> str:
"""
Reads remaining content from the buffer without sending a command.
Useful for checking asynchronous logs or delayed output.
"""
if session_id not in SESSIONS:
return 'Error: Session ID not found.'
child = SESSIONS[session_id]['process']
output = ''
try:
while True:
chunk = child.read_nonblocking(size=1024, timeout=0.1)
output += chunk
except (pexpect.TIMEOUT, pexpect.EOF):
pass
if not output:
return '[No new output]'
return output
@mcp.tool()
def kill_session(session_id: str) -> str:
"""Terminates the process and cleans up resources."""
if session_id in SESSIONS:
try:
SESSIONS[session_id]['process'].close()
del SESSIONS[session_id]
logger.info(f"Session {session_id} killed.")
return f"Session {session_id} killed."
except Exception as e:
logger.error(f"Error closing session {session_id}: {e}")
return f"Error closing session: {e}"
return 'Error: Session not found.'
@mcp.resource('cli://{session_id}/history')
def get_history(session_id: str) -> str:
"""Retrieves the full interaction history for a session."""
if session_id in SESSIONS:
return SESSIONS[session_id]['history']
return 'Error: Session not found.'
def main():
mcp.run()
if __name__ == '__main__':
main()