Skip to main content
Glama
PulkitXChadha

Databricks MCP Server

jobs_pipelines.py20.1 kB
"""Jobs and pipelines MCP tools for Databricks.""" import os from databricks.sdk import WorkspaceClient def load_job_tools(mcp_server): """Register jobs and pipelines MCP tools with the server. Args: mcp_server: The FastMCP server instance to register tools with """ @mcp_server.tool() def list_jobs() -> dict: """List all jobs in the Databricks workspace. Returns: Dictionary containing list of jobs with their details """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # List all jobs jobs = w.jobs.list() job_list = [] for job in jobs: job_list.append( { 'job_id': job.job_id, 'name': job.settings.name, 'creator_user_name': job.creator_user_name, 'created_time': job.created_time, 'settings': { 'timeout_seconds': job.settings.timeout_seconds, 'max_concurrent_runs': job.settings.max_concurrent_runs, 'email_notifications': job.settings.email_notifications, } if hasattr(job, 'settings') else {}, } ) return { 'success': True, 'jobs': job_list, 'count': len(job_list), 'message': f'Found {len(job_list)} job(s)', } except Exception as e: print(f'❌ Error listing jobs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'jobs': [], 'count': 0} @mcp_server.tool() def get_job(job_id: str) -> dict: """Get detailed information about a specific job. Args: job_id: The ID of the job to retrieve Returns: Dictionary with detailed job information or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Get job details job = w.jobs.get(job_id) return { 'success': True, 'job': { 'job_id': job.job_id, 'name': job.settings.name, 'creator_user_name': job.creator_user_name, 'created_time': job.created_time, 'settings': { 'timeout_seconds': job.settings.timeout_seconds, 'max_concurrent_runs': job.settings.max_concurrent_runs, 'email_notifications': job.settings.email_notifications, 'tasks': job.settings.tasks, } if hasattr(job, 'settings') else {}, }, 'message': f'Job {job_id} details retrieved successfully', } except Exception as e: print(f'❌ Error getting job details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def create_job(job_config: dict) -> dict: """Create a new job in the Databricks workspace. Args: job_config: Dictionary containing job configuration Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Create job job = w.jobs.create( name=job_config.get('name'), tasks=job_config.get('tasks', []), timeout_seconds=job_config.get('timeout_seconds', 3600), max_concurrent_runs=job_config.get('max_concurrent_runs', 1), email_notifications=job_config.get('email_notifications'), ) return { 'success': True, 'job_id': job.job_id, 'job_name': job.settings.name, 'message': f'Job {job.settings.name} created successfully with ID {job.job_id}', } except Exception as e: print(f'❌ Error creating job: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def update_job(job_id: str, updates: dict) -> dict: """Update an existing job in the Databricks workspace. Args: job_id: The ID of the job to update updates: Dictionary containing the updates to apply Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Update job w.jobs.update( job_id=job_id, new_settings=updates, ) return { 'success': True, 'job_id': job_id, 'message': f'Job {job_id} updated successfully', } except Exception as e: print(f'❌ Error updating job: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def delete_job(job_id: str) -> dict: """Delete a job from the Databricks workspace. Args: job_id: The ID of the job to delete Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Delete job w.jobs.delete(job_id) return { 'success': True, 'job_id': job_id, 'message': f'Job {job_id} deleted successfully', } except Exception as e: print(f'❌ Error deleting job: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def list_job_runs(job_id: str = None) -> dict: """List job runs, either all runs or runs for a specific job. Args: job_id: Optional job ID to filter runs for a specific job Returns: Dictionary containing list of job runs with their details """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # List job runs if job_id: runs = w.jobs.list_runs(job_id=job_id) else: runs = w.jobs.list_runs() run_list = [] for run in runs: run_list.append( { 'run_id': run.run_id, 'job_id': run.job_id, 'run_name': run.run_name, 'state': run.state, 'start_time': run.start_time, 'end_time': run.end_time, 'creator_user_name': run.creator_user_name, } ) return { 'success': True, 'runs': run_list, 'count': len(run_list), 'job_id': job_id, 'message': f'Found {len(run_list)} job run(s)' + (f' for job {job_id}' if job_id else ''), } except Exception as e: print(f'❌ Error listing job runs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'runs': [], 'count': 0} @mcp_server.tool() def get_job_run(run_id: str) -> dict: """Get detailed information about a specific job run. Args: run_id: The ID of the job run to retrieve Returns: Dictionary with detailed job run information or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Get job run details run = w.jobs.get_run(run_id) return { 'success': True, 'run': { 'run_id': run.run_id, 'job_id': run.job_id, 'run_name': run.run_name, 'state': run.state, 'start_time': run.start_time, 'end_time': run.end_time, 'creator_user_name': run.creator_user_name, 'run_page_url': run.run_page_url, 'tasks': run.tasks, }, 'message': f'Job run {run_id} details retrieved successfully', } except Exception as e: print(f'❌ Error getting job run details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def submit_job_run(job_id: str, parameters: dict = None) -> dict: """Submit a new job run. Args: job_id: The ID of the job to run parameters: Optional parameters to pass to the job run Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Submit job run run = w.jobs.submit_run( job_id=job_id, parameters=parameters or {}, ) return { 'success': True, 'job_id': job_id, 'run_id': run.run_id, 'message': f'Job run submitted successfully with run ID {run.run_id}', } except Exception as e: print(f'❌ Error submitting job run: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def cancel_job_run(run_id: str) -> dict: """Cancel a running job run. Args: run_id: The ID of the job run to cancel Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Cancel job run w.jobs.cancel_run(run_id) return { 'success': True, 'run_id': run_id, 'message': f'Job run {run_id} cancelled successfully', } except Exception as e: print(f'❌ Error cancelling job run: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def get_job_run_logs(run_id: str) -> dict: """Get logs from a job run. Args: run_id: The ID of the job run to get logs for Returns: Dictionary with job run logs or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Get job run logs logs = w.jobs.get_run_output(run_id) return { 'success': True, 'run_id': run_id, 'logs': { 'notebook_output': logs.notebook_output, 'sql_output': logs.sql_output, 'dbt_output': logs.dbt_output, 'run_output': logs.run_output, }, 'message': f'Job run logs retrieved successfully for {run_id}', } except Exception as e: print(f'❌ Error getting job run logs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def list_pipelines() -> dict: """List all DLT pipelines in the workspace. Returns: Dictionary containing list of pipelines with their details """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # List all pipelines pipelines = w.pipelines.list_pipelines() pipeline_list = [] for pipeline in pipelines: pipeline_list.append( { 'pipeline_id': pipeline.pipeline_id, 'name': pipeline.name, 'state': pipeline.state, 'creator_user_name': pipeline.creator_user_name, 'created_time': pipeline.created_time, 'updated_time': pipeline.updated_time, } ) return { 'success': True, 'pipelines': pipeline_list, 'count': len(pipeline_list), 'message': f'Found {len(pipeline_list)} pipeline(s)', } except Exception as e: print(f'❌ Error listing pipelines: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'pipelines': [], 'count': 0} @mcp_server.tool() def get_pipeline(pipeline_id: str) -> dict: """Get details of a specific DLT pipeline. Args: pipeline_id: The ID of the pipeline to get details for Returns: Dictionary with pipeline details or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Get pipeline details pipeline = w.pipelines.get(pipeline_id) return { 'success': True, 'pipeline': { 'pipeline_id': pipeline.pipeline_id, 'name': pipeline.name, 'state': pipeline.state, 'creator_user_name': pipeline.creator_user_name, 'created_time': pipeline.created_time, 'updated_time': pipeline.updated_time, 'specification': pipeline.specification, }, 'message': f'Pipeline {pipeline_id} details retrieved successfully', } except Exception as e: print(f'❌ Error getting pipeline details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def create_pipeline(pipeline_config: dict) -> dict: """Create a new DLT pipeline. Args: pipeline_config: Dictionary containing pipeline configuration Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Create pipeline pipeline = w.pipelines.create( name=pipeline_config.get('name'), specification=pipeline_config.get('specification'), ) return { 'success': True, 'pipeline_id': pipeline.pipeline_id, 'pipeline_name': pipeline.name, 'message': f'Pipeline {pipeline.name} created successfully with ID {pipeline.pipeline_id}', } except Exception as e: print(f'❌ Error creating pipeline: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def update_pipeline(pipeline_id: str, updates: dict) -> dict: """Update an existing DLT pipeline. Args: pipeline_id: The ID of the pipeline to update updates: Dictionary containing updates to apply Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Update pipeline w.pipelines.edit( pipeline_id=pipeline_id, **updates, ) return { 'success': True, 'pipeline_id': pipeline_id, 'message': f'Pipeline {pipeline_id} updated successfully', } except Exception as e: print(f'❌ Error updating pipeline: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def delete_pipeline(pipeline_id: str) -> dict: """Delete a DLT pipeline. Args: pipeline_id: The ID of the pipeline to delete Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Delete pipeline w.pipelines.delete(pipeline_id) return { 'success': True, 'pipeline_id': pipeline_id, 'message': f'Pipeline {pipeline_id} deleted successfully', } except Exception as e: print(f'❌ Error deleting pipeline: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def list_pipeline_runs(pipeline_id: str = None) -> dict: """List DLT pipeline runs. Args: pipeline_id: Optional pipeline ID to filter runs (default: None for all) Returns: Dictionary containing list of pipeline runs with their details """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # List pipeline runs if pipeline_id: runs = w.pipelines.list_pipeline_runs(pipeline_id=pipeline_id) else: runs = w.pipelines.list_pipeline_runs() run_list = [] for run in runs: run_list.append( { 'run_id': run.run_id, 'pipeline_id': run.pipeline_id, 'state': run.state, 'start_time': run.start_time, 'end_time': run.end_time, 'creator_user_name': run.creator_user_name, } ) return { 'success': True, 'runs': run_list, 'count': len(run_list), 'pipeline_id': pipeline_id, 'message': f'Found {len(run_list)} pipeline run(s)' + (f' for pipeline {pipeline_id}' if pipeline_id else ''), } except Exception as e: print(f'❌ Error listing pipeline runs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'runs': [], 'count': 0} @mcp_server.tool() def get_pipeline_run(run_id: str) -> dict: """Get details of a specific DLT pipeline run. Args: run_id: The ID of the pipeline run to get details for Returns: Dictionary with pipeline run details or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Get pipeline run details run = w.pipelines.get_pipeline_run(run_id) return { 'success': True, 'run': { 'run_id': run.run_id, 'pipeline_id': run.pipeline_id, 'state': run.state, 'start_time': run.start_time, 'end_time': run.end_time, 'creator_user_name': run.creator_user_name, 'run_page_url': run.run_page_url, }, 'message': f'Pipeline run {run_id} details retrieved successfully', } except Exception as e: print(f'❌ Error getting pipeline run details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def start_pipeline_update(pipeline_id: str, parameters: dict = None) -> dict: """Start a DLT pipeline update. Args: pipeline_id: The ID of the pipeline to start parameters: Optional parameters for the pipeline update Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Start pipeline update run = w.pipelines.start_update( pipeline_id=pipeline_id, parameters=parameters or {}, ) return { 'success': True, 'pipeline_id': pipeline_id, 'run_id': run.run_id, 'message': f'Pipeline update started successfully with run ID {run.run_id}', } except Exception as e: print(f'❌ Error starting pipeline update: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def stop_pipeline_update(pipeline_id: str) -> dict: """Stop a running DLT pipeline update. Args: pipeline_id: The ID of the pipeline to stop Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK w = WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Stop pipeline update w.pipelines.stop_update(pipeline_id) return { 'success': True, 'pipeline_id': pipeline_id, 'message': f'Pipeline update stopped successfully for {pipeline_id}', } except Exception as e: print(f'❌ Error stopping pipeline update: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} pass # All tools are commented out

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PulkitXChadha/awesome-databricks-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server