list_jobs
Retrieve Databricks job listings with pagination and filtering options to manage and monitor scheduled workflows efficiently.
Instructions
List Databricks jobs with pagination and filtering.
Args:
limit: Number of jobs to return (default: 25, keeps response under token limits)
offset: Starting position for pagination (default: 0, use pagination_info.next_offset for next page)
created_by: Filter by creator email (e.g. 'user@company.com'), case-insensitive, optional
include_run_status: Include latest run status and duration (default: true, set false for faster response)
Returns:
JSON with jobs array and pagination_info. Each job includes latest_run with state, duration_minutes, etc.
Use pagination_info.next_offset for next page. Total jobs shown in pagination_info.total_jobs.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| created_by | No | ||
| include_run_status | No |
Implementation Reference
- The primary MCP tool handler for 'list_jobs'. Registers the tool via @mcp.tool(), implements client-side pagination, creator filtering, and enriches each job with latest run status by calling jobs.list_runs(job_id, limit=1). Returns formatted JSON.@mcp.tool() async def list_jobs( limit: int = 25, offset: int = 0, created_by: Optional[str] = None, include_run_status: bool = True ) -> str: """List Databricks jobs with pagination and filtering. Args: limit: Number of jobs to return (default: 25, keeps response under token limits) offset: Starting position for pagination (default: 0, use pagination_info.next_offset for next page) created_by: Filter by creator email (e.g. 'user@company.com'), case-insensitive, optional include_run_status: Include latest run status and duration (default: true, set false for faster response) Returns: JSON with jobs array and pagination_info. Each job includes latest_run with state, duration_minutes, etc. Use pagination_info.next_offset for next page. Total jobs shown in pagination_info.total_jobs. """ logger.info(f"Listing jobs (limit={limit}, offset={offset}, created_by={created_by})") try: # Fetch all jobs from API result = await jobs.list_jobs() if "jobs" in result: all_jobs = result["jobs"] # Filter by creator if specified if created_by: all_jobs = [job for job in all_jobs if job.get("creator_user_name", "").lower() == created_by.lower()] total_jobs = len(all_jobs) # Apply client-side pagination start_idx = offset end_idx = offset + limit paginated_jobs = all_jobs[start_idx:end_idx] # Enhance jobs with run status if requested if include_run_status and paginated_jobs: enhanced_jobs = [] for job in paginated_jobs: enhanced_job = job.copy() # Get most recent run for this job try: runs_result = await jobs.list_runs(job_id=job["job_id"], limit=1) if "runs" in runs_result and runs_result["runs"]: latest_run = runs_result["runs"][0] # Add run status info enhanced_job["latest_run"] = { "run_id": latest_run.get("run_id"), "state": latest_run.get("state", {}).get("life_cycle_state"), "result_state": latest_run.get("state", {}).get("result_state"), "start_time": latest_run.get("start_time"), "end_time": latest_run.get("end_time"), } # Calculate duration if both times available start_time = latest_run.get("start_time") end_time = latest_run.get("end_time") if start_time and end_time: duration_ms = end_time - start_time enhanced_job["latest_run"]["duration_seconds"] = duration_ms // 1000 enhanced_job["latest_run"]["duration_minutes"] = duration_ms // 60000 else: enhanced_job["latest_run"] = {"status": "no_runs"} except Exception as e: enhanced_job["latest_run"] = {"error": f"Failed to get run info: {str(e)}"} enhanced_jobs.append(enhanced_job) paginated_jobs = enhanced_jobs # Create paginated response paginated_result = { "jobs": paginated_jobs, "pagination_info": { "total_jobs": total_jobs, "returned": len(paginated_jobs), "limit": limit, "offset": offset, "has_more": end_idx < total_jobs, "next_offset": end_idx if end_idx < total_jobs else None, "filtered_by": {"created_by": created_by} if created_by else None } } return json.dumps(paginated_result) else: return json.dumps(result) except Exception as e: logger.error(f"Error listing jobs: {str(e)}") return json.dumps({"error": str(e)})
- src/api/jobs.py:54-81 (helper)Low-level helper function that makes the actual Databricks API call to /api/2.0/jobs/list. Called by the MCP handler.async def list_jobs(limit: Optional[int] = None, page_token: Optional[str] = None) -> Dict[str, Any]: """ List jobs with optional pagination. Args: limit: Maximum number of jobs to return (1-100, default: 20) page_token: Token for pagination (from previous response's next_page_token) Returns: Response containing a list of jobs and optional next_page_token Raises: DatabricksAPIError: If the API request fails """ params = {} if limit is not None: # Databricks API limits: 1-100 for jobs list if limit < 1: limit = 1 elif limit > 100: limit = 100 params["limit"] = limit if page_token is not None: params["page_token"] = page_token logger.info(f"Listing jobs (limit={limit}, page_token={'***' if page_token else None})") return make_api_request("GET", "/api/2.0/jobs/list", params=params if params else None)
- src/api/jobs.py:175-196 (helper)Supporting helper for listing job runs (/api/2.0/jobs/runs/list), used by the handler to fetch latest run status for each job.async def list_runs(job_id: Optional[int] = None, limit: Optional[int] = None) -> Dict[str, Any]: """ List job runs, optionally filtered by job_id. Args: job_id: ID of the job to list runs for (optional) limit: Maximum number of runs to return (optional) Returns: Response containing a list of job runs Raises: DatabricksAPIError: If the API request fails """ params = {} if job_id is not None: params["job_id"] = job_id if limit is not None: params["limit"] = limit logger.info(f"Listing runs (job_id={job_id}, limit={limit})") return make_api_request("GET", "/api/2.0/jobs/runs/list", params=params if params else None)