execute_command
Run shell commands as background jobs using a job ID for tracking and management. Enables asynchronous execution and monitoring of long-running processes on the MCP Background Job Server.
Instructions
Execute a command as a background job and return job ID.
Args: command: Shell command to execute in the background
Returns: ExecuteOutput containing the job ID (UUID) of the started job
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | Shell command to execute |
Implementation Reference
- src/mcp_background_job/server.py:123-149 (handler)MCP tool handler for 'execute_command'. Validates input, delegates to JobManager.execute_command(), and returns ExecuteOutput with job_id or raises ToolError on failure.@mcp.tool() async def execute_command( command: str = Field(..., description="Shell command to execute"), ) -> ExecuteOutput: """Execute a command as a background job and return job ID. Args: command: Shell command to execute in the background Returns: ExecuteOutput containing the job ID (UUID) of the started job """ try: job_manager = get_job_manager() job_id = await job_manager.execute_command(command) return ExecuteOutput(job_id=job_id) except ValueError as e: raise ToolError(f"Invalid command: {str(e)}") except RuntimeError as e: if "Maximum concurrent jobs limit" in str(e): raise ToolError(f"Job limit reached: {str(e)}") else: raise ToolError(f"Failed to start job: {str(e)}") except Exception as e: logger.error(f"Error executing command '{command}': {e}") raise ToolError(f"Failed to execute command: {str(e)}")
- JobManager.execute_command: Core logic that performs security validation, checks concurrent job limits, creates and starts a ProcessWrapper for the command, stores the job, and returns the job_id.async def execute_command(self, command: str) -> str: """Execute command as background job, return job_id. Args: command: Shell command to execute Returns: UUID v4 job identifier Raises: RuntimeError: If maximum concurrent jobs limit is reached ValueError: If command is empty or invalid """ if not command or not command.strip(): raise ValueError("Command cannot be empty") # Validate command security self._validate_command_security(command.strip()) # Check job limit running_jobs = sum( 1 for job in self._jobs.values() if job.status == JobStatus.RUNNING ) if running_jobs >= self.config.max_concurrent_jobs: raise RuntimeError( f"Maximum concurrent jobs limit ({self.config.max_concurrent_jobs}) reached" ) # Generate unique job ID job_id = str(uuid.uuid4()) # Create job record job = BackgroundJob( job_id=job_id, command=command.strip(), status=JobStatus.RUNNING, started=datetime.now(timezone.utc), ) # Create process wrapper process_wrapper = ProcessWrapper( job_id=job_id, command=command.strip(), max_output_size=self.config.max_output_size_bytes, ) try: # Start the process await process_wrapper.start() # Update job with process info job.pid = process_wrapper.get_pid() # Store job and process self._jobs[job_id] = job self._processes[job_id] = process_wrapper logger.info(f"Started job {job_id}: {command.strip()}") return job_id except Exception as e: logger.error(f"Failed to start job {job_id}: {e}") # Clean up on failure try: process_wrapper.cleanup() except Exception: pass raise
- Pydantic model defining the output schema for the execute_command tool, containing the job_id.class ExecuteOutput(BaseModel): """Output from execute tool.""" job_id: str = Field(..., description="UUID v4 job identifier")
- src/mcp_background_job/server.py:123-123 (registration)FastMCP decorator registering the execute_command function as an MCP tool.@mcp.tool()