interact_with_job
Send input to a job's stdin via its ID and retrieve immediate stdout/stderr output. Designed for asynchronous command execution and interaction within the MCP Background Job Server.
Instructions
Send input to a job's stdin and return any immediate output.
Args: job_id: The UUID of the job to interact with input: Text to send to the job's stdin
Returns: ProcessOutput containing any immediate stdout/stderr output after sending input
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| input | Yes | Input to send to the job's stdin | |
| job_id | Yes | Job ID to interact with |
Implementation Reference
- src/mcp_background_job/server.py:151-178 (handler)MCP tool handler for 'interact_with_job'. Decorated with @mcp.tool() for registration. Defines input schema with Pydantic Fields for job_id and input. Handles errors and delegates execution to JobManager.interact_with_job.@mcp.tool() async def interact_with_job( job_id: str = Field(..., description="Job ID to interact with"), input: str = Field(..., description="Input to send to the job's stdin"), ) -> ProcessOutput: """Send input to a job's stdin and return any immediate output. Args: job_id: The UUID of the job to interact with input: Text to send to the job's stdin Returns: ProcessOutput containing any immediate stdout/stderr output after sending input """ try: job_manager = get_job_manager() interaction_result = await job_manager.interact_with_job(job_id, input) return interaction_result except KeyError: raise ToolError(f"Job {job_id} not found") except RuntimeError as e: if "not running" in str(e): raise ToolError(f"Job {job_id} is not running and cannot accept input") else: raise ToolError(f"Failed to interact with job: {str(e)}") except Exception as e: logger.error(f"Error interacting with job {job_id}: {e}") raise ToolError(f"Failed to interact with job: {str(e)}")
- Core implementation logic in JobManager.interact_with_job. Validates job existence and running status, then sends input to the process wrapper and returns the output.async def interact_with_job(self, job_id: str, input_text: str) -> ProcessOutput: """Send input to job stdin, return immediate output. Args: job_id: Job identifier input_text: Text to send to stdin Returns: ProcessOutput with any immediate stdout/stderr output Raises: KeyError: If job_id doesn't exist RuntimeError: If job is not running or stdin not available """ if job_id not in self._jobs: raise KeyError(f"Job {job_id} not found") # Update job status first await self._update_job_status(job_id) job = self._jobs[job_id] if job.status != JobStatus.RUNNING: raise RuntimeError(f"Job {job_id} is not running (status: {job.status})") process_wrapper = self._processes.get(job_id) if process_wrapper is None: raise RuntimeError(f"Process wrapper for job {job_id} not found") return await process_wrapper.send_input(input_text)
- src/mcp_background_job/server.py:151-151 (registration)The @mcp.tool() decorator registers the interact_with_job function as an MCP tool.@mcp.tool()
- Input schema defined using Pydantic Field for job_id (required string) and input (required string). Output is ProcessOutput model.job_id: str = Field(..., description="Job ID to interact with"), input: str = Field(..., description="Input to send to the job's stdin"), ) -> ProcessOutput: