list_job_runs
Retrieve recent Databricks job runs with status, duration, and results. Filter by job ID or view all jobs to monitor execution history.
Instructions
List recent job runs with detailed status and duration information.
Args:
job_id: Specific job ID to list runs for (optional, omit to see runs across all jobs)
limit: Number of runs to return (default: 10, most recent first)
Returns:
JSON with runs array. Each run includes state (RUNNING/SUCCESS/FAILED), result_state,
duration_minutes for completed runs, current_duration_minutes for running jobs.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | No | ||
| limit | No |
Implementation Reference
- The primary handler for the 'list_job_runs' MCP tool. Decorated with @mcp.tool() for automatic registration. Fetches runs via jobs.list_runs(), enhances with duration calculations, and returns JSON.@mcp.tool() async def list_job_runs(job_id: Optional[int] = None, limit: int = 10) -> str: """List recent job runs with detailed status and duration information. Args: job_id: Specific job ID to list runs for (optional, omit to see runs across all jobs) limit: Number of runs to return (default: 10, most recent first) Returns: JSON with runs array. Each run includes state (RUNNING/SUCCESS/FAILED), result_state, duration_minutes for completed runs, current_duration_minutes for running jobs. """ logger.info(f"Listing job runs (job_id={job_id}, limit={limit})") try: result = await jobs.list_runs(job_id=job_id, limit=limit) if "runs" in result: enhanced_runs = [] for run in result["runs"]: enhanced_run = run.copy() # Calculate duration if both times available start_time = run.get("start_time") end_time = run.get("end_time") if start_time and end_time: duration_ms = end_time - start_time enhanced_run["duration_seconds"] = duration_ms // 1000 enhanced_run["duration_minutes"] = duration_ms // 60000 elif start_time and not end_time: # Running job - calculate current duration import time current_time = int(time.time() * 1000) duration_ms = current_time - start_time enhanced_run["current_duration_seconds"] = duration_ms // 1000 enhanced_run["current_duration_minutes"] = duration_ms // 60000 enhanced_runs.append(enhanced_run) result["runs"] = enhanced_runs return json.dumps(result) except Exception as e: logger.error(f"Error listing job runs: {str(e)}") return json.dumps({"error": str(e)})
- src/api/jobs.py:175-196 (helper)API helper function that performs the actual Databricks Jobs API call to list runs (GET /api/2.0/jobs/runs/list). Called by the main handler.async def list_runs(job_id: Optional[int] = None, limit: Optional[int] = None) -> Dict[str, Any]: """ List job runs, optionally filtered by job_id. Args: job_id: ID of the job to list runs for (optional) limit: Maximum number of runs to return (optional) Returns: Response containing a list of job runs Raises: DatabricksAPIError: If the API request fails """ params = {} if job_id is not None: params["job_id"] = job_id if limit is not None: params["limit"] = limit logger.info(f"Listing runs (job_id={job_id}, limit={limit})") return make_api_request("GET", "/api/2.0/jobs/runs/list", params=params if params else None)