kill_job
Terminate a running background job by specifying its UUID on the MCP Background Job Server. This tool ensures precise job lifecycle management and process control.
Instructions
Kill a running background job.
Args: job_id: The UUID of the job to terminate
Returns: KillOutput indicating the result of the kill operation
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes | Job ID to kill |
Implementation Reference
- src/mcp_background_job/server.py:181-199 (handler)MCP tool handler for kill_job: decorated with @mcp.tool(), validates input with Field, delegates to JobManager.kill_job(), returns KillOutput(status=result). Handles exceptions by raising ToolError.@mcp.tool() async def kill_job( job_id: str = Field(..., description="Job ID to kill"), ) -> KillOutput: """Kill a running background job. Args: job_id: The UUID of the job to terminate Returns: KillOutput indicating the result of the kill operation """ try: job_manager = get_job_manager() kill_result = await job_manager.kill_job(job_id) return KillOutput(status=kill_result) except Exception as e: logger.error(f"Error killing job {job_id}: {e}") raise ToolError(f"Failed to kill job: {str(e)}")
- Pydantic BaseModel defining the output schema for the kill_job tool, with 'status' field describing possible results.class KillOutput(BaseModel): """Output from kill tool.""" status: str = Field( ..., description="Kill result: 'killed', 'already_terminated', or 'not_found'" )
- JobManager.kill_job method: core logic that checks job existence and status, updates status, kills the process if running, and returns appropriate status string ('not_found', 'already_terminated', 'killed').async def kill_job(self, job_id: str) -> str: """Kill running job. Args: job_id: Job identifier Returns: Kill result: 'killed', 'already_terminated', or 'not_found' """ if job_id not in self._jobs: return "not_found" job = self._jobs[job_id] process_wrapper = self._processes.get(job_id) # Update status first await self._update_job_status(job_id) if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.KILLED]: return "already_terminated" if process_wrapper is None: job.status = JobStatus.FAILED return "already_terminated" # Kill the process if process_wrapper.kill(): job.status = JobStatus.KILLED job.completed = datetime.now(timezone.utc) job.exit_code = process_wrapper.get_exit_code() logger.info(f"Killed job {job_id}") return "killed" else: return "already_terminated"