kill_job
Terminate a running background job by its UUID to stop processes and manage system resources.
Instructions
Kill a running background job.
Args: job_id: The UUID of the job to terminate
Returns: KillOutput indicating the result of the kill operation
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes | Job ID to kill |
Implementation Reference
- src/mcp_background_job/server.py:181-200 (handler)FastMCP tool handler for 'kill_job': validates job_id input using Pydantic Field, calls JobManager.kill_job, returns KillOutput with result status.@mcp.tool() async def kill_job( job_id: str = Field(..., description="Job ID to kill"), ) -> KillOutput: """Kill a running background job. Args: job_id: The UUID of the job to terminate Returns: KillOutput indicating the result of the kill operation """ try: job_manager = get_job_manager() kill_result = await job_manager.kill_job(job_id) return KillOutput(status=kill_result) except Exception as e: logger.error(f"Error killing job {job_id}: {e}") raise ToolError(f"Failed to kill job: {str(e)}")
- Pydantic schemas for kill_job tool: KillInput defines the job_id parameter, KillOutput defines the status response field with possible values.class KillInput(BaseModel): """Input for kill tool.""" job_id: str = Field(..., description="Job ID to kill") class KillOutput(BaseModel): """Output from kill tool.""" status: str = Field( ..., description="Kill result: 'killed', 'already_terminated', or 'not_found'" )
- Core JobManager.kill_job implementation: checks if job exists and is running, terminates the process using ProcessWrapper.kill(), updates job status and metadata, returns result status string.async def kill_job(self, job_id: str) -> str: """Kill running job. Args: job_id: Job identifier Returns: Kill result: 'killed', 'already_terminated', or 'not_found' """ if job_id not in self._jobs: return "not_found" job = self._jobs[job_id] process_wrapper = self._processes.get(job_id) # Update status first await self._update_job_status(job_id) if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.KILLED]: return "already_terminated" if process_wrapper is None: job.status = JobStatus.FAILED return "already_terminated" # Kill the process if process_wrapper.kill(): job.status = JobStatus.KILLED job.completed = datetime.now(timezone.utc) job.exit_code = process_wrapper.get_exit_code() logger.info(f"Killed job {job_id}") return "killed" else: return "already_terminated"