Skip to main content
Glama

MCP Background Job Server

by dylan-gluck
SPEC.md10.8 kB
# MCP Background Job Server - Technical Specification ## Overview The MCP Background Job Server enables coding agents to execute long-running shell commands asynchronously. This server provides process management capabilities through the Model Context Protocol, allowing clients to start, monitor, control, and interact with background processes. ## Architecture ### Core Components 1. **Job Manager Service** - Central service for managing background processes 2. **Process Wrapper** - Abstraction layer for child processes with I/O handling 3. **MCP Server** - FastMCP server exposing tools to clients 4. **Storage Layer** - In-memory job registry with optional persistence ### Technology Stack - **Runtime**: Node.js / Python (recommend Python for consistency with repo) - **MCP Framework**: FastMCP (Python) - **Process Management**: subprocess (Python) or child_process (Node.js) - **Data Validation**: Pydantic (Python) or Zod (Node.js) - **Testing**: pytest + testcontainers (Python) or Jest (Node.js) ## Data Models ### Job Model ```python from pydantic import BaseModel from datetime import datetime from enum import Enum from typing import Optional, List class JobStatus(str, Enum): RUNNING = "running" COMPLETED = "completed" FAILED = "failed" KILLED = "killed" class BackgroundJob(BaseModel): job_id: str # UUID v4 command: str status: JobStatus started: datetime # UTC timestamp completed: Optional[datetime] = None exit_code: Optional[int] = None pid: Optional[int] = None class JobSummary(BaseModel): job_id: str status: JobStatus command: str started: datetime class ProcessOutput(BaseModel): stdout: str stderr: str class JobInteractionResult(BaseModel): stdout: str stderr: str ``` ### Tool Input/Output Models ```python class ExecuteInput(BaseModel): command: str class ExecuteOutput(BaseModel): job_id: str class TailInput(BaseModel): job_id: str lines: int = 50 # Default to last 50 lines class StatusInput(BaseModel): job_id: str class StatusOutput(BaseModel): status: JobStatus class KillInput(BaseModel): job_id: str class KillOutput(BaseModel): status: str # "killed", "already_terminated", "not_found" class OutputInput(BaseModel): job_id: str class InteractInput(BaseModel): job_id: str input: str class ListOutput(BaseModel): jobs: List[JobSummary] ``` ## Service Architecture ### JobManager Class ```python class JobManager: def __init__(self): self._jobs: Dict[str, BackgroundJob] = {} self._processes: Dict[str, subprocess.Popen] = {} self._output_buffers: Dict[str, Dict[str, List[str]]] = {} async def execute_command(self, command: str) -> str: """Execute command as background job, return job_id""" async def get_job_status(self, job_id: str) -> JobStatus: """Get current status of job""" async def kill_job(self, job_id: str) -> str: """Kill running job""" async def get_job_output(self, job_id: str) -> ProcessOutput: """Get full stdout/stderr output""" async def tail_job_output(self, job_id: str, lines: int) -> ProcessOutput: """Get last N lines of output""" async def interact_with_job(self, job_id: str, input_text: str) -> ProcessOutput: """Send input to job stdin, return immediate output""" async def list_jobs(self) -> List[JobSummary]: """List all jobs""" def _cleanup_completed_jobs(self): """Clean up terminated processes""" ``` ### Process Wrapper ```python class ProcessWrapper: def __init__(self, job_id: str, command: str): self.job_id = job_id self.command = command self.process: Optional[subprocess.Popen] = None self.stdout_buffer: List[str] = [] self.stderr_buffer: List[str] = [] async def start(self): """Start the process with proper I/O handling""" async def send_input(self, text: str) -> ProcessOutput: """Send input to process stdin""" def get_status(self) -> JobStatus: """Get current process status""" def kill(self) -> bool: """Kill the process""" def get_output(self) -> ProcessOutput: """Get all captured output""" def tail_output(self, lines: int) -> ProcessOutput: """Get last N lines of output""" ``` ## MCP Tool Definitions ### 1. execute **Purpose**: Start a new background job **Annotations**: - `readOnlyHint: false` - `destructiveHint: false` - `idempotentHint: false` ```python @mcp.tool() async def execute(command: str = Field(..., description="Shell command to execute")) -> ExecuteOutput: """Execute a command as a background job and return job ID.""" ``` ### 2. list **Purpose**: List all background jobs **Annotations**: - `readOnlyHint: true` - `destructiveHint: false` - `idempotentHint: true` ```python @mcp.tool() async def list() -> ListOutput: """List all background jobs with their status.""" ``` ### 3. status **Purpose**: Get status of specific job **Annotations**: - `readOnlyHint: true` - `destructiveHint: false` - `idempotentHint: true` ```python @mcp.tool() async def status(job_id: str = Field(..., description="Job ID to check")) -> StatusOutput: """Get the current status of a background job.""" ``` ### 4. output **Purpose**: Get full output of a job **Annotations**: - `readOnlyHint: true` - `destructiveHint: false` - `idempotentHint: true` ```python @mcp.tool() async def output(job_id: str = Field(..., description="Job ID to get output from")) -> ProcessOutput: """Get the complete stdout and stderr output of a job.""" ``` ### 5. tail **Purpose**: Get recent output lines from a job **Annotations**: - `readOnlyHint: true` - `destructiveHint: false` - `idempotentHint: true` ```python @mcp.tool() async def tail( job_id: str = Field(..., description="Job ID to tail"), lines: int = Field(50, description="Number of lines to return", ge=1, le=1000) ) -> ProcessOutput: """Get the last N lines of stdout and stderr from a job.""" ``` ### 6. kill **Purpose**: Terminate a running job **Annotations**: - `readOnlyHint: false` - `destructiveHint: true` - `idempotentHint: false` ```python @mcp.tool() async def kill(job_id: str = Field(..., description="Job ID to kill")) -> KillOutput: """Kill a running background job.""" ``` ### 7. interact **Purpose**: Send input to a job and get response **Annotations**: - `readOnlyHint: false` - `destructiveHint: false` - `idempotentHint: false` ```python @mcp.tool() async def interact( job_id: str = Field(..., description="Job ID to interact with"), input: str = Field(..., description="Input to send to the job's stdin") ) -> ProcessOutput: """Send input to a job's stdin and return any immediate output.""" ``` ## Error Handling ### Error Types 1. **JobNotFoundError** - When job_id doesn't exist 2. **JobAlreadyTerminatedError** - When trying to interact with completed job 3. **ProcessExecutionError** - When command fails to start 4. **InvalidCommandError** - When command is malformed or forbidden ### Error Responses All tools should return structured error information: ```python from fastmcp.exceptions import ToolError # Example error handling if job_id not in self._jobs: raise ToolError(f"Job {job_id} not found") ``` ## Security Considerations ### Command Restrictions 1. **Allowlist approach**: Define allowed commands/patterns 2. **Path restrictions**: Restrict execution to specific directories 3. **Environment isolation**: Clean environment variables 4. **Resource limits**: CPU/memory/time limits per process ### Input Validation 1. **Command sanitization**: Prevent shell injection 2. **Job ID validation**: Ensure proper UUID format 3. **Input length limits**: Prevent buffer overflow attacks ### Output Safety 1. **Output size limits**: Prevent memory exhaustion 2. **Sensitive data filtering**: Remove potential secrets from logs ## Performance Considerations ### Scalability 1. **Job limit**: Maximum concurrent jobs (default: 10) 2. **Output buffering**: Ring buffer for stdout/stderr (max 10MB per job) 3. **Cleanup strategy**: Automatic cleanup of old completed jobs 4. **Memory management**: Periodic cleanup of terminated processes ### Resource Management 1. **Process monitoring**: Track CPU/memory usage 2. **Timeout handling**: Optional job timeouts 3. **Graceful shutdown**: Clean termination of all jobs on server shutdown ## Testing Strategy ### Unit Tests 1. **JobManager tests**: Core job management functionality 2. **ProcessWrapper tests**: Process lifecycle management 3. **Tool tests**: MCP tool input/output validation 4. **Error handling tests**: Edge cases and error conditions ### Integration Tests 1. **End-to-end workflow**: Execute → monitor → interact → kill 2. **Concurrent job handling**: Multiple simultaneous jobs 3. **Long-running process tests**: Jobs that run for extended periods 4. **Resource cleanup tests**: Memory and process cleanup verification ### Test Tools and Commands ```python # Example test commands SAFE_TEST_COMMANDS = [ "echo 'hello world'", "sleep 5", "python -c 'import time; time.sleep(2); print(\"done\")'", "ls -la", "pwd" ] # Interactive test commands INTERACTIVE_TEST_COMMANDS = [ "python -i", # Python REPL "node", # Node.js REPL "cat" # Read from stdin ] ``` ## Configuration ### Environment Variables ```bash MCP_BG_MAX_JOBS=10 # Maximum concurrent jobs MCP_BG_MAX_OUTPUT_SIZE=10MB # Maximum output buffer per job MCP_BG_JOB_TIMEOUT=3600 # Default job timeout in seconds MCP_BG_CLEANUP_INTERVAL=300 # Cleanup interval in seconds MCP_BG_ALLOWED_COMMANDS="" # Comma-separated allowed command patterns ``` ### Configuration File Support ```python class BackgroundJobConfig(BaseModel): max_concurrent_jobs: int = 10 max_output_size_bytes: int = 10 * 1024 * 1024 # 10MB default_job_timeout: Optional[int] = None cleanup_interval_seconds: int = 300 allowed_command_patterns: List[str] = [] working_directory: str = "." ``` ## Deployment Considerations ### Transport Support - **stdio**: Primary transport for local development - **HTTP**: For remote access with proper CORS handling - **SSE**: For real-time updates (future enhancement) ### Logging Strategy - Use structured logging to stderr - Include job_id in all log entries - Log process start/stop events - Log resource usage periodically ### Monitoring - Process count metrics - Memory usage metrics - Job completion rates - Error rates by command type

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server