list_jobs
View all background jobs with their current status, command, and start time to monitor ongoing processes.
Instructions
List all background jobs with their status.
Returns a list of all background jobs, including their job ID, status, command, and start time. Jobs are sorted by start time (newest first).
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/mcp_background_job/server.py:34-48 (handler)MCP tool handler for 'list_jobs', decorated with @mcp.tool() which registers it. Calls JobManager.list_jobs() and returns ListOutput.@mcp.tool() async def list_jobs() -> ListOutput: """List all background jobs with their status. Returns a list of all background jobs, including their job ID, status, command, and start time. Jobs are sorted by start time (newest first). """ try: job_manager = get_job_manager() jobs = await job_manager.list_jobs() return ListOutput(jobs=jobs) except Exception as e: logger.error(f"Error listing jobs: {e}") raise ToolError(f"Failed to list jobs: {str(e)}")
- Pydantic model for the output schema of list_jobs tool, containing list of JobSummary.class ListOutput(BaseModel): """Output from list tool.""" jobs: List[JobSummary] = Field(..., description="List of all background jobs")
- Pydantic model for individual job summary used in ListOutput.class JobSummary(BaseModel): """Minimal job information for listing operations.""" job_id: str = Field(..., description="UUID v4 job identifier") status: JobStatus = Field(..., description="Current job status") command: str = Field(..., description="Shell command being executed") started: datetime = Field(..., description="UTC timestamp when job started")
- Core logic in JobManager service class that implements listing jobs: updates statuses, creates summaries, sorts by start time.async def list_jobs(self) -> List[JobSummary]: """List all jobs. Returns: List of JobSummary objects for all jobs """ # Update all job statuses for job_id in list(self._jobs.keys()): try: await self._update_job_status(job_id) except Exception as e: logger.warning(f"Failed to update status for job {job_id}: {e}") # Create summaries summaries = [] for job in self._jobs.values(): summaries.append( JobSummary( job_id=job.job_id, status=job.status, command=job.command, started=job.started, ) ) # Sort by start time (newest first) summaries.sort(key=lambda x: x.started, reverse=True) return summaries