Skip to main content
Glama
dylan-gluck

MCP Background Job Server

by dylan-gluck

execute_command

Run shell commands as background jobs to handle long-running processes asynchronously, returning a job ID for monitoring and management.

Instructions

Execute a command as a background job and return job ID.

Args: command: Shell command to execute in the background

Returns: ExecuteOutput containing the job ID (UUID) of the started job

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYesShell command to execute

Implementation Reference

  • MCP tool handler for execute_command. Validates input, calls JobManager.execute_command, handles errors, returns ExecuteOutput with job_id.
    @mcp.tool()
    async def execute_command(
        command: str = Field(..., description="Shell command to execute"),
    ) -> ExecuteOutput:
        """Execute a command as a background job and return job ID.
    
        Args:
            command: Shell command to execute in the background
    
        Returns:
            ExecuteOutput containing the job ID (UUID) of the started job
        """
        try:
            job_manager = get_job_manager()
            job_id = await job_manager.execute_command(command)
            return ExecuteOutput(job_id=job_id)
        except ValueError as e:
            raise ToolError(f"Invalid command: {str(e)}")
        except RuntimeError as e:
            if "Maximum concurrent jobs limit" in str(e):
                raise ToolError(f"Job limit reached: {str(e)}")
            else:
                raise ToolError(f"Failed to start job: {str(e)}")
        except Exception as e:
            logger.error(f"Error executing command '{command}': {e}")
            raise ToolError(f"Failed to execute command: {str(e)}")
  • Core JobManager method implementing the command execution logic: security validation, job limits check, job creation, process startup, and storage.
    async def execute_command(self, command: str) -> str:
        """Execute command as background job, return job_id.
    
        Args:
            command: Shell command to execute
    
        Returns:
            UUID v4 job identifier
    
        Raises:
            RuntimeError: If maximum concurrent jobs limit is reached
            ValueError: If command is empty or invalid
        """
        if not command or not command.strip():
            raise ValueError("Command cannot be empty")
    
        # Validate command security
        self._validate_command_security(command.strip())
    
        # Check job limit
        running_jobs = sum(
            1 for job in self._jobs.values() if job.status == JobStatus.RUNNING
        )
        if running_jobs >= self.config.max_concurrent_jobs:
            raise RuntimeError(
                f"Maximum concurrent jobs limit ({self.config.max_concurrent_jobs}) reached"
            )
    
        # Generate unique job ID
        job_id = str(uuid.uuid4())
    
        # Create job record
        job = BackgroundJob(
            job_id=job_id,
            command=command.strip(),
            status=JobStatus.RUNNING,
            started=datetime.now(timezone.utc),
        )
    
        # Create process wrapper
        process_wrapper = ProcessWrapper(
            job_id=job_id,
            command=command.strip(),
            max_output_size=self.config.max_output_size_bytes,
        )
    
        try:
            # Start the process
            await process_wrapper.start()
    
            # Update job with process info
            job.pid = process_wrapper.get_pid()
    
            # Store job and process
            self._jobs[job_id] = job
            self._processes[job_id] = process_wrapper
    
            logger.info(f"Started job {job_id}: {command.strip()}")
            return job_id
    
        except Exception as e:
            logger.error(f"Failed to start job {job_id}: {e}")
            # Clean up on failure
            try:
                process_wrapper.cleanup()
            except Exception:
                pass
            raise
  • Pydantic model for the output schema of the execute_command tool, containing the job_id.
    class ExecuteOutput(BaseModel):
        """Output from execute tool."""
    
        job_id: str = Field(..., description="UUID v4 job identifier")
  • FastMCP @mcp.tool() decorator registering the execute_command function as an MCP tool.
    @mcp.tool()
  • Pydantic model defining the input schema for execute_command (though used inline in tool).
    class ExecuteInput(BaseModel):
        """Input for execute tool."""
    
        command: str = Field(..., description="Shell command to execute")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server