Skip to main content
Glama
samhavens

Databricks MCP Server

by samhavens

list_jobs

Retrieve Databricks job listings with pagination and filtering options to manage and monitor scheduled workflows efficiently.

Instructions

List Databricks jobs with pagination and filtering.

Args:
    limit: Number of jobs to return (default: 25, keeps response under token limits)
    offset: Starting position for pagination (default: 0, use pagination_info.next_offset for next page)
    created_by: Filter by creator email (e.g. 'user@company.com'), case-insensitive, optional
    include_run_status: Include latest run status and duration (default: true, set false for faster response)

Returns:
    JSON with jobs array and pagination_info. Each job includes latest_run with state, duration_minutes, etc.
    Use pagination_info.next_offset for next page. Total jobs shown in pagination_info.total_jobs.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
created_byNo
include_run_statusNo

Implementation Reference

  • The primary MCP tool handler for 'list_jobs'. Registers the tool via @mcp.tool(), implements client-side pagination, creator filtering, and enriches each job with latest run status by calling jobs.list_runs(job_id, limit=1). Returns formatted JSON.
    @mcp.tool()
    async def list_jobs(
        limit: int = 25, 
        offset: int = 0, 
        created_by: Optional[str] = None,
        include_run_status: bool = True
    ) -> str:
        """List Databricks jobs with pagination and filtering.
        
        Args:
            limit: Number of jobs to return (default: 25, keeps response under token limits)
            offset: Starting position for pagination (default: 0, use pagination_info.next_offset for next page)
            created_by: Filter by creator email (e.g. 'user@company.com'), case-insensitive, optional
            include_run_status: Include latest run status and duration (default: true, set false for faster response)
        
        Returns:
            JSON with jobs array and pagination_info. Each job includes latest_run with state, duration_minutes, etc.
            Use pagination_info.next_offset for next page. Total jobs shown in pagination_info.total_jobs.
        """
        logger.info(f"Listing jobs (limit={limit}, offset={offset}, created_by={created_by})")
        try:
            # Fetch all jobs from API
            result = await jobs.list_jobs()
            
            if "jobs" in result:
                all_jobs = result["jobs"]
                
                # Filter by creator if specified
                if created_by:
                    all_jobs = [job for job in all_jobs 
                               if job.get("creator_user_name", "").lower() == created_by.lower()]
                
                total_jobs = len(all_jobs)
                
                # Apply client-side pagination
                start_idx = offset
                end_idx = offset + limit
                paginated_jobs = all_jobs[start_idx:end_idx]
                
                # Enhance jobs with run status if requested
                if include_run_status and paginated_jobs:
                    enhanced_jobs = []
                    for job in paginated_jobs:
                        enhanced_job = job.copy()
                        
                        # Get most recent run for this job
                        try:
                            runs_result = await jobs.list_runs(job_id=job["job_id"], limit=1)
                            if "runs" in runs_result and runs_result["runs"]:
                                latest_run = runs_result["runs"][0]
                                
                                # Add run status info
                                enhanced_job["latest_run"] = {
                                    "run_id": latest_run.get("run_id"),
                                    "state": latest_run.get("state", {}).get("life_cycle_state"),
                                    "result_state": latest_run.get("state", {}).get("result_state"),
                                    "start_time": latest_run.get("start_time"),
                                    "end_time": latest_run.get("end_time"),
                                }
                                
                                # Calculate duration if both times available
                                start_time = latest_run.get("start_time")
                                end_time = latest_run.get("end_time")
                                if start_time and end_time:
                                    duration_ms = end_time - start_time
                                    enhanced_job["latest_run"]["duration_seconds"] = duration_ms // 1000
                                    enhanced_job["latest_run"]["duration_minutes"] = duration_ms // 60000
                            else:
                                enhanced_job["latest_run"] = {"status": "no_runs"}
                                
                        except Exception as e:
                            enhanced_job["latest_run"] = {"error": f"Failed to get run info: {str(e)}"}
                        
                        enhanced_jobs.append(enhanced_job)
                    
                    paginated_jobs = enhanced_jobs
                
                # Create paginated response
                paginated_result = {
                    "jobs": paginated_jobs,
                    "pagination_info": {
                        "total_jobs": total_jobs,
                        "returned": len(paginated_jobs),
                        "limit": limit,
                        "offset": offset,
                        "has_more": end_idx < total_jobs,
                        "next_offset": end_idx if end_idx < total_jobs else None,
                        "filtered_by": {"created_by": created_by} if created_by else None
                    }
                }
                
                return json.dumps(paginated_result)
            else:
                return json.dumps(result)
                
        except Exception as e:
            logger.error(f"Error listing jobs: {str(e)}")
            return json.dumps({"error": str(e)})
  • Low-level helper function that makes the actual Databricks API call to /api/2.0/jobs/list. Called by the MCP handler.
    async def list_jobs(limit: Optional[int] = None, page_token: Optional[str] = None) -> Dict[str, Any]:
        """
        List jobs with optional pagination.
        
        Args:
            limit: Maximum number of jobs to return (1-100, default: 20)
            page_token: Token for pagination (from previous response's next_page_token)
        
        Returns:
            Response containing a list of jobs and optional next_page_token
            
        Raises:
            DatabricksAPIError: If the API request fails
        """
        params = {}
        if limit is not None:
            # Databricks API limits: 1-100 for jobs list
            if limit < 1:
                limit = 1
            elif limit > 100:
                limit = 100
            params["limit"] = limit
        if page_token is not None:
            params["page_token"] = page_token
            
        logger.info(f"Listing jobs (limit={limit}, page_token={'***' if page_token else None})")
        return make_api_request("GET", "/api/2.0/jobs/list", params=params if params else None)
  • Supporting helper for listing job runs (/api/2.0/jobs/runs/list), used by the handler to fetch latest run status for each job.
    async def list_runs(job_id: Optional[int] = None, limit: Optional[int] = None) -> Dict[str, Any]:
        """
        List job runs, optionally filtered by job_id.
        
        Args:
            job_id: ID of the job to list runs for (optional)
            limit: Maximum number of runs to return (optional)
            
        Returns:
            Response containing a list of job runs
            
        Raises:
            DatabricksAPIError: If the API request fails
        """
        params = {}
        if job_id is not None:
            params["job_id"] = job_id
        if limit is not None:
            params["limit"] = limit
            
        logger.info(f"Listing runs (job_id={job_id}, limit={limit})")
        return make_api_request("GET", "/api/2.0/jobs/runs/list", params=params if params else None) 

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samhavens/databricks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server