Skip to main content
Glama

execute_command

Run shell commands as background jobs using a job ID for tracking and management. Enables asynchronous execution and monitoring of long-running processes on the MCP Background Job Server.

Instructions

Execute a command as a background job and return job ID.

Args: command: Shell command to execute in the background

Returns: ExecuteOutput containing the job ID (UUID) of the started job

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYesShell command to execute

Implementation Reference

  • MCP tool handler for 'execute_command'. Validates input, delegates to JobManager.execute_command(), and returns ExecuteOutput with job_id or raises ToolError on failure.
    @mcp.tool() async def execute_command( command: str = Field(..., description="Shell command to execute"), ) -> ExecuteOutput: """Execute a command as a background job and return job ID. Args: command: Shell command to execute in the background Returns: ExecuteOutput containing the job ID (UUID) of the started job """ try: job_manager = get_job_manager() job_id = await job_manager.execute_command(command) return ExecuteOutput(job_id=job_id) except ValueError as e: raise ToolError(f"Invalid command: {str(e)}") except RuntimeError as e: if "Maximum concurrent jobs limit" in str(e): raise ToolError(f"Job limit reached: {str(e)}") else: raise ToolError(f"Failed to start job: {str(e)}") except Exception as e: logger.error(f"Error executing command '{command}': {e}") raise ToolError(f"Failed to execute command: {str(e)}")
  • JobManager.execute_command: Core logic that performs security validation, checks concurrent job limits, creates and starts a ProcessWrapper for the command, stores the job, and returns the job_id.
    async def execute_command(self, command: str) -> str: """Execute command as background job, return job_id. Args: command: Shell command to execute Returns: UUID v4 job identifier Raises: RuntimeError: If maximum concurrent jobs limit is reached ValueError: If command is empty or invalid """ if not command or not command.strip(): raise ValueError("Command cannot be empty") # Validate command security self._validate_command_security(command.strip()) # Check job limit running_jobs = sum( 1 for job in self._jobs.values() if job.status == JobStatus.RUNNING ) if running_jobs >= self.config.max_concurrent_jobs: raise RuntimeError( f"Maximum concurrent jobs limit ({self.config.max_concurrent_jobs}) reached" ) # Generate unique job ID job_id = str(uuid.uuid4()) # Create job record job = BackgroundJob( job_id=job_id, command=command.strip(), status=JobStatus.RUNNING, started=datetime.now(timezone.utc), ) # Create process wrapper process_wrapper = ProcessWrapper( job_id=job_id, command=command.strip(), max_output_size=self.config.max_output_size_bytes, ) try: # Start the process await process_wrapper.start() # Update job with process info job.pid = process_wrapper.get_pid() # Store job and process self._jobs[job_id] = job self._processes[job_id] = process_wrapper logger.info(f"Started job {job_id}: {command.strip()}") return job_id except Exception as e: logger.error(f"Failed to start job {job_id}: {e}") # Clean up on failure try: process_wrapper.cleanup() except Exception: pass raise
  • Pydantic model defining the output schema for the execute_command tool, containing the job_id.
    class ExecuteOutput(BaseModel): """Output from execute tool.""" job_id: str = Field(..., description="UUID v4 job identifier")
  • FastMCP decorator registering the execute_command function as an MCP tool.
    @mcp.tool()

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server