Skip to main content
Glama

Databricks MCP Server

by stephenjhsu
main.py28.2 kB
from mcp.server.fastmcp import FastMCP from file_manager import create_folder, create_py_file, edit_file_content from databricks_manager import submit_code, create_job, run_job, create_dlt_pipeline, get_job_error, get_manager import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) mcp = FastMCP("Databricks MCP Server") @mcp.tool() def mcp_create_folder(path: str) -> dict: try: logger.info(f"Creating folder: {path}") create_folder(path) return {"status": "success", "message": f"Folder created: {path}"} except Exception as e: logger.error(f"Error creating folder: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_py_file(path: str, content: str = "", upload_to_databricks: bool = False, username: str = "stephen.hsu@databricks.com") -> dict: try: logger.info(f"Creating Python file: {path}") # Create file in a writable directory (temp directory) import tempfile import os # If path doesn't have a directory, create it in temp if '/' not in path and '\\' not in path: temp_dir = tempfile.gettempdir() local_path = os.path.join(temp_dir, path) else: local_path = path create_py_file(local_path, content) result = {"status": "success", "message": f"Python file created: {local_path}"} # Upload to Databricks workspace if requested if upload_to_databricks: try: from databricks_manager import get_manager manager = get_manager() # Use claude_generated_folder for organization folder_name = "claude_generated_folder" workspace_folder = f"/Users/{username}/{folder_name}" workspace_path = f"{workspace_folder}/{path}" # Check if folder exists and create it if needed try: import requests headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } folder_payload = { "path": workspace_folder, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" response = requests.post(create_url, headers=headers, json=folder_payload) if response.status_code == 200: logger.info(f"✓ Created new folder: {workspace_folder}") elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Using existing folder: {workspace_folder}") else: logger.warning(f"Could not create folder: {response.text}") except Exception as folder_error: logger.warning(f"Could not create folder: {str(folder_error)}") # Upload using the same method as other_databricks_executor upload_result = manager.upload_notebook(content, workspace_path) result["databricks_upload"] = { "status": "success", "workspace_path": workspace_path, "message": "File uploaded to Databricks workspace" } logger.info(f"File uploaded to Databricks: {workspace_path}") except Exception as upload_error: logger.error(f"Error uploading to Databricks: {str(upload_error)}") result["databricks_upload"] = { "status": "error", "detail": str(upload_error) } return result except Exception as e: logger.error(f"Error creating Python file: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_edit_file(path: str, content: str) -> dict: try: logger.info(f"Editing file: {path}") edit_file_content(path, content) return {"status": "success", "message": f"File edited: {path}"} except Exception as e: logger.error(f"Error editing file: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_submit_code(code: str, cluster_id: str) -> dict: try: logger.info(f"Submitting code to cluster: {cluster_id}") result = submit_code(code, cluster_id) return {"status": "success", "result": result} except Exception as e: logger.error(f"Error submitting code: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_job(job_config: dict) -> dict: """ Create a Databricks job. If job_config doesn't specify a cluster, serverless compute will be used. """ try: logger.info(f"Creating job with config: {job_config.get('name', 'unnamed')}") # Ensure serverless compute if no cluster is specified if 'tasks' in job_config: for task in job_config['tasks']: if 'new_cluster' not in task: logger.info("No cluster specified - using serverless compute") else: logger.info("Cluster specified in task - using provided cluster config") job = create_job(job_config) return {"status": "success", "job": job} except Exception as e: logger.error(f"Error creating job: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_run_job(job_id: str) -> dict: try: logger.info(f"Running job: {job_id}") run = run_job(job_id) return {"status": "success", "run": run} except Exception as e: logger.error(f"Error running job: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_dlt_pipeline(pipeline_config: dict) -> dict: """ Create a Delta Live Tables pipeline. If pipeline_config includes serverless=True, serverless compute will be used. """ try: logger.info(f"Creating DLT pipeline with config: {pipeline_config.get('name', 'unnamed')}") # Check if serverless is enabled if pipeline_config.get('serverless', False): logger.info("Creating serverless DLT pipeline") else: logger.info("Creating DLT pipeline with cluster configuration") pipeline = create_dlt_pipeline(pipeline_config) return {"status": "success", "pipeline": pipeline} except Exception as e: logger.error(f"Error creating DLT pipeline: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_get_job_error(run_id: str) -> dict: try: logger.info(f"Getting job error for run: {run_id}") error = get_job_error(run_id) return {"status": "success", "error": error} except Exception as e: logger.error(f"Error getting job error: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_check_job_status(job_id: str, run_id: str) -> dict: try: logger.info(f"Checking job status for job: {job_id}, run: {run_id}") manager = get_manager() status = manager.check_job_status(job_id, run_id) return {"status": "success", "job_status": status} except Exception as e: logger.error(f"Error checking job status: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_upload_notebook(code: str, filename: str, username: str = "stephen.hsu@databricks.com") -> dict: try: logger.info(f"Uploading notebook to Databricks workspace: {filename}") manager = get_manager() # Use claude_generated_folder for organization folder_name = "claude_generated_folder" workspace_folder = f"/Users/{username}/{folder_name}" workspace_path = f"{workspace_folder}/{filename}" # Check if folder exists and create it if needed import os try: import requests headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } folder_payload = { "path": workspace_folder, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" response = requests.post(create_url, headers=headers, json=folder_payload) if response.status_code == 200: logger.info(f"✓ Created new folder: {workspace_folder}") elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Using existing folder: {workspace_folder}") else: logger.warning(f"Could not create folder: {response.text}") except Exception as folder_error: logger.warning(f"Could not create folder: {str(folder_error)}") uploaded_path = manager.upload_notebook(code, workspace_path) return { "status": "success", "message": f"Notebook uploaded successfully", "workspace_path": uploaded_path } except Exception as e: logger.error(f"Error uploading notebook: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_upload_sample_py_to_databricks() -> dict: """ Upload a hardcoded Python string as a .py file to Databricks workspace at /Users/stephen.hsu@databricks.com/claude_generated_folder/sample_claude_upload.py """ try: from databricks_manager import get_manager manager = get_manager() # Create a proper folder structure using claude_generated_folder folder_name = "claude_generated_folder" workspace_folder = f"/Users/stephen.hsu@databricks.com/{folder_name}" workspace_path = f"{workspace_folder}/sample_claude_upload.py" # Check if folder exists and create it if needed try: import requests headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } folder_payload = { "path": workspace_folder, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" response = requests.post(create_url, headers=headers, json=folder_payload) if response.status_code == 200: # API returns 200 whether folder is created or already exists if response.text.strip() == "{}": logger.info(f"✓ Folder ensured: {workspace_folder}") else: logger.info(f"✓ Created new folder: {workspace_folder}") elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Using existing folder: {workspace_folder}") else: logger.warning(f"Could not create folder: {response.text}") except Exception as folder_error: logger.warning(f"Could not create folder: {str(folder_error)}") # Upload the Python file code = """ def hello(): print('Hello from Claude!') if __name__ == '__main__': hello() """ uploaded_path = manager.upload_notebook(code, workspace_path) return {"status": "success", "message": "Sample Python file uploaded successfully", "workspace_path": uploaded_path} except Exception as e: import logging logging.error(f"Error uploading sample Python file: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_databricks_user_folder(username: str = "stephen.hsu@databricks.com") -> dict: """ Create a user folder in Databricks workspace for file uploads. """ try: from databricks_manager import get_manager manager = get_manager() # Extract username from email username_clean = username.split('@')[0] workspace_path = f"/Users/{username_clean}" # Use the Databricks REST API to create the folder import requests import base64 headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } # Create folder using workspace API folder_payload = { "path": workspace_path, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" logger.info(f"Creating user folder: {workspace_path}") response = requests.post(create_url, headers=headers, json=folder_payload) if response.status_code == 200: logger.info(f"✓ User folder created successfully: {workspace_path}") return { "status": "success", "message": f"User folder created: {workspace_path}", "workspace_path": workspace_path } else: # If folder already exists, that's also fine if "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ User folder already exists: {workspace_path}") return { "status": "success", "message": f"User folder already exists: {workspace_path}", "workspace_path": workspace_path } else: raise Exception(f"Failed to create folder: {response.text}") except Exception as e: logger.error(f"Error creating user folder: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_databricks_subfolder(folder_name: str, username: str = "stephen.hsu@databricks.com") -> dict: """ Create a subfolder within the user's Databricks workspace directory. """ try: from databricks_manager import get_manager manager = get_manager() # Use the full email path since we know it exists workspace_path = f"/Users/{username}/{folder_name}" import requests headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } # Create folder using workspace API folder_payload = { "path": workspace_path, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" logger.info(f"Creating subfolder: {workspace_path}") response = requests.post(create_url, headers=headers, json=folder_payload) if response.status_code == 200: logger.info(f"✓ Subfolder created successfully: {workspace_path}") return { "status": "success", "message": f"Subfolder created: {workspace_path}", "workspace_path": workspace_path } else: # If folder already exists, that's also fine if "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Subfolder already exists: {workspace_path}") return { "status": "success", "message": f"Subfolder already exists: {workspace_path}", "workspace_path": workspace_path } else: raise Exception(f"Failed to create subfolder: {response.text}") except Exception as e: logger.error(f"Error creating subfolder: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_project_folder_and_upload_assets(project_name: str, assets: dict, username: str = "stephen.hsu@databricks.com") -> dict: """ Create a project folder in Databricks workspace within claude_generated_folder and upload multiple assets to it. Args: project_name: Name of the project folder to create assets: Dictionary of assets to upload, e.g., {"main.py": "code content", "config.json": "json content"} username: Databricks username """ try: from databricks_manager import get_manager manager = get_manager() # Create project folder structure within claude_generated_folder base_folder = "claude_generated_folder" workspace_base_folder = f"/Users/{username}/{base_folder}" workspace_folder = f"{workspace_base_folder}/{project_name}" # First ensure the base claude_generated_folder exists try: import requests headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } # Create base folder base_folder_payload = { "path": workspace_base_folder, "object_type": "DIRECTORY" } create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs" response = requests.post(create_url, headers=headers, json=base_folder_payload) if response.status_code == 200: logger.info(f"✓ Created new base folder: {workspace_base_folder}") elif "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Using existing base folder: {workspace_base_folder}") else: logger.warning(f"Could not create base folder: {response.text}") # Create project subfolder project_folder_payload = { "path": workspace_folder, "object_type": "DIRECTORY" } response = requests.post(create_url, headers=headers, json=project_folder_payload) if response.status_code == 200: logger.info(f"✓ Created new project folder: {workspace_folder}") elif "RESOURCE_ALREADY_EXISTS" in response.text: logger.info(f"✓ Using existing project folder: {workspace_folder}") else: raise Exception(f"Could not create project folder: {response.text}") except Exception as folder_error: logger.error(f"Error creating project folder: {str(folder_error)}") return {"status": "error", "detail": f"Failed to create project folder: {str(folder_error)}"} # Upload each asset uploaded_files = [] for filename, content in assets.items(): try: file_path = f"{workspace_folder}/{filename}" uploaded_path = manager.upload_notebook(content, file_path) uploaded_files.append({ "filename": filename, "path": uploaded_path, "status": "success" }) logger.info(f"✓ Uploaded: {filename}") except Exception as upload_error: logger.error(f"Error uploading {filename}: {str(upload_error)}") uploaded_files.append({ "filename": filename, "status": "error", "detail": str(upload_error) }) return { "status": "success", "message": f"Project folder created and assets uploaded", "project_folder": workspace_folder, "uploaded_files": uploaded_files } except Exception as e: logger.error(f"Error in project folder creation and upload: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_and_run_job_for_notebook(notebook_path: str, job_name: str = "Claude Python Job", username: str = "stephen.hsu@databricks.com") -> dict: """ Create and run a Databricks job for a given notebook path using serverless compute. Returns job/run IDs and Databricks URLs for monitoring. """ try: from databricks_manager import get_manager manager = get_manager() import os host = manager.host.rstrip('/') # Serverless job config - no cluster specification = serverless compute job_config = { "name": job_name, "tasks": [ { "task_key": "main_task", "notebook_task": { "notebook_path": notebook_path, "source": "WORKSPACE" } # No new_cluster field = serverless compute } ], "timeout_seconds": 3600, # 1 hour timeout "max_concurrent_runs": 1 } # Create job job = manager.create_job(job_config) job_id = getattr(job, 'job_id', None) or job.get('job_id') if not job_id: raise Exception(f"Failed to create job: {job}") # Start job run = manager.run_job(job_id) run_id = getattr(run, 'run_id', None) or run.get('run_id') if not run_id: raise Exception(f"Failed to start job: {run}") # Return job/run info and URLs return { "status": "success", "job_id": job_id, "run_id": run_id, "job_url": f"{host}/jobs/{job_id}", "run_url": f"{host}/jobs/{job_id}/runs/{run_id}", "notebook_path": notebook_path, "message": "Job created and started using serverless compute. Monitor in Databricks UI." } except Exception as e: import logging logging.error(f"Error creating/running job: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_create_and_run_serverless_dlt_pipeline(notebook_path: str, pipeline_name: str = "Claude DLT Pipeline", username: str = "stephen.hsu@databricks.com") -> dict: """ Create and run a serverless Delta Live Tables pipeline for a given notebook path. Returns pipeline/update IDs and Databricks URLs for monitoring. """ try: from databricks_manager import get_manager manager = get_manager() import os import requests import uuid host = manager.host.rstrip('/') # Generate unique execution ID execution_id = str(uuid.uuid4())[:8] unique_pipeline_name = f"{pipeline_name}_{execution_id}" # Serverless DLT pipeline config using REST API (following other_databricks_executor pattern) pipeline_config = { "name": unique_pipeline_name, # Use Unity Catalog approach (no storage field to avoid conflicts) "catalog": "users", "target": username.split('@')[0].replace('.', '_'), # Convert email to valid schema name "serverless": True, # THE KEY FIELD - this enables serverless compute "libraries": [ { "notebook": { "path": notebook_path } } ], "continuous": False, "development": True, # Use development mode for testing "channel": "CURRENT", "edition": "ADVANCED" } logger.info(f"Creating serverless DLT pipeline: {unique_pipeline_name}") logger.info(f"Pipeline config: {pipeline_config}") # Create pipeline using REST API (more reliable than SDK for serverless) headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } create_url = f"{host}/api/2.0/pipelines" response = requests.post(create_url, headers=headers, json=pipeline_config) if response.status_code != 200: raise Exception(f"Failed to create DLT pipeline: {response.text}") pipeline_id = response.json().get("pipeline_id") if not pipeline_id: raise Exception("No pipeline_id in response") logger.info(f"✓ DLT Pipeline created successfully: {pipeline_id}") # Start pipeline and return immediately (no monitoring) start_url = f"{host}/api/2.0/pipelines/{pipeline_id}/updates" start_response = requests.post(start_url, headers=headers, json={}) if start_response.status_code != 200: raise Exception(f"Failed to start DLT pipeline: {start_response.text}") update_id = start_response.json().get("update_id") if not update_id: raise Exception("No update_id in response") logger.info(f"✅ DLT Pipeline started successfully: Pipeline ID {pipeline_id}, Update ID {update_id}") # Return pipeline details immediately with links return { "status": "success", "execution_id": execution_id, "pipeline_id": pipeline_id, "update_id": update_id, "pipeline_name": unique_pipeline_name, "notebook_path": notebook_path, "pipeline_url": f"{host}/pipelines/{pipeline_id}", "update_url": f"{host}/pipelines/{pipeline_id}/updates/{update_id}", "execution_time": 0, # Pipeline just started "logs": "DLT Pipeline started successfully - check Databricks for progress", "validation": { 'pipeline_status': 'STARTED', 'tables_to_create': ['bronze', 'silver', 'gold'], 'status': 'Pipeline running - check Databricks workspace for progress' }, "message": "Serverless DLT pipeline created and started. Monitor in Databricks UI." } except Exception as e: import logging logging.error(f"Error creating/running DLT pipeline: {str(e)}") return {"status": "error", "detail": str(e)} @mcp.tool() def mcp_check_dlt_pipeline_status(pipeline_id: str, update_id: str) -> dict: """ Check the status of a Delta Live Tables pipeline update. """ try: from databricks_manager import get_manager manager = get_manager() import requests import time host = manager.host.rstrip('/') headers = { "Authorization": f"Bearer {manager.token}", "Content-Type": "application/json" } status_url = f"{host}/api/2.0/pipelines/{pipeline_id}/updates/{update_id}" response = requests.get(status_url, headers=headers) if response.status_code != 200: raise Exception(f"Failed to get pipeline status: {response.text}") status_data = response.json() state = status_data.get("state", {}) life_cycle_state = state.get("life_cycle_state", "UNKNOWN") result_state = state.get("result_state", "UNKNOWN") return { "status": "success", "pipeline_id": pipeline_id, "update_id": update_id, "life_cycle_state": life_cycle_state, "result_state": result_state, "state_details": state, "pipeline_url": f"{host}/pipelines/{pipeline_id}", "update_url": f"{host}/pipelines/{pipeline_id}/updates/{update_id}", "message": f"Pipeline {life_cycle_state}, Result: {result_state}" } except Exception as e: import logging logging.error(f"Error checking DLT pipeline status: {str(e)}") return {"status": "error", "detail": str(e)} if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stephenjhsu/Databricks-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server