main.py•28.2 kB
from mcp.server.fastmcp import FastMCP
from file_manager import create_folder, create_py_file, edit_file_content
from databricks_manager import submit_code, create_job, run_job, create_dlt_pipeline, get_job_error, get_manager
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
mcp = FastMCP("Databricks MCP Server")
@mcp.tool()
def mcp_create_folder(path: str) -> dict:
try:
logger.info(f"Creating folder: {path}")
create_folder(path)
return {"status": "success", "message": f"Folder created: {path}"}
except Exception as e:
logger.error(f"Error creating folder: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_py_file(path: str, content: str = "", upload_to_databricks: bool = False, username: str = "stephen.hsu@databricks.com") -> dict:
try:
logger.info(f"Creating Python file: {path}")
# Create file in a writable directory (temp directory)
import tempfile
import os
# If path doesn't have a directory, create it in temp
if '/' not in path and '\\' not in path:
temp_dir = tempfile.gettempdir()
local_path = os.path.join(temp_dir, path)
else:
local_path = path
create_py_file(local_path, content)
result = {"status": "success", "message": f"Python file created: {local_path}"}
# Upload to Databricks workspace if requested
if upload_to_databricks:
try:
from databricks_manager import get_manager
manager = get_manager()
# Use claude_generated_folder for organization
folder_name = "claude_generated_folder"
workspace_folder = f"/Users/{username}/{folder_name}"
workspace_path = f"{workspace_folder}/{path}"
# Check if folder exists and create it if needed
try:
import requests
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
folder_payload = {
"path": workspace_folder,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
response = requests.post(create_url, headers=headers, json=folder_payload)
if response.status_code == 200:
logger.info(f"✓ Created new folder: {workspace_folder}")
elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Using existing folder: {workspace_folder}")
else:
logger.warning(f"Could not create folder: {response.text}")
except Exception as folder_error:
logger.warning(f"Could not create folder: {str(folder_error)}")
# Upload using the same method as other_databricks_executor
upload_result = manager.upload_notebook(content, workspace_path)
result["databricks_upload"] = {
"status": "success",
"workspace_path": workspace_path,
"message": "File uploaded to Databricks workspace"
}
logger.info(f"File uploaded to Databricks: {workspace_path}")
except Exception as upload_error:
logger.error(f"Error uploading to Databricks: {str(upload_error)}")
result["databricks_upload"] = {
"status": "error",
"detail": str(upload_error)
}
return result
except Exception as e:
logger.error(f"Error creating Python file: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_edit_file(path: str, content: str) -> dict:
try:
logger.info(f"Editing file: {path}")
edit_file_content(path, content)
return {"status": "success", "message": f"File edited: {path}"}
except Exception as e:
logger.error(f"Error editing file: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_submit_code(code: str, cluster_id: str) -> dict:
try:
logger.info(f"Submitting code to cluster: {cluster_id}")
result = submit_code(code, cluster_id)
return {"status": "success", "result": result}
except Exception as e:
logger.error(f"Error submitting code: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_job(job_config: dict) -> dict:
"""
Create a Databricks job. If job_config doesn't specify a cluster, serverless compute will be used.
"""
try:
logger.info(f"Creating job with config: {job_config.get('name', 'unnamed')}")
# Ensure serverless compute if no cluster is specified
if 'tasks' in job_config:
for task in job_config['tasks']:
if 'new_cluster' not in task:
logger.info("No cluster specified - using serverless compute")
else:
logger.info("Cluster specified in task - using provided cluster config")
job = create_job(job_config)
return {"status": "success", "job": job}
except Exception as e:
logger.error(f"Error creating job: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_run_job(job_id: str) -> dict:
try:
logger.info(f"Running job: {job_id}")
run = run_job(job_id)
return {"status": "success", "run": run}
except Exception as e:
logger.error(f"Error running job: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_dlt_pipeline(pipeline_config: dict) -> dict:
"""
Create a Delta Live Tables pipeline. If pipeline_config includes serverless=True, serverless compute will be used.
"""
try:
logger.info(f"Creating DLT pipeline with config: {pipeline_config.get('name', 'unnamed')}")
# Check if serverless is enabled
if pipeline_config.get('serverless', False):
logger.info("Creating serverless DLT pipeline")
else:
logger.info("Creating DLT pipeline with cluster configuration")
pipeline = create_dlt_pipeline(pipeline_config)
return {"status": "success", "pipeline": pipeline}
except Exception as e:
logger.error(f"Error creating DLT pipeline: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_get_job_error(run_id: str) -> dict:
try:
logger.info(f"Getting job error for run: {run_id}")
error = get_job_error(run_id)
return {"status": "success", "error": error}
except Exception as e:
logger.error(f"Error getting job error: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_check_job_status(job_id: str, run_id: str) -> dict:
try:
logger.info(f"Checking job status for job: {job_id}, run: {run_id}")
manager = get_manager()
status = manager.check_job_status(job_id, run_id)
return {"status": "success", "job_status": status}
except Exception as e:
logger.error(f"Error checking job status: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_upload_notebook(code: str, filename: str, username: str = "stephen.hsu@databricks.com") -> dict:
try:
logger.info(f"Uploading notebook to Databricks workspace: {filename}")
manager = get_manager()
# Use claude_generated_folder for organization
folder_name = "claude_generated_folder"
workspace_folder = f"/Users/{username}/{folder_name}"
workspace_path = f"{workspace_folder}/{filename}"
# Check if folder exists and create it if needed
import os
try:
import requests
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
folder_payload = {
"path": workspace_folder,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
response = requests.post(create_url, headers=headers, json=folder_payload)
if response.status_code == 200:
logger.info(f"✓ Created new folder: {workspace_folder}")
elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Using existing folder: {workspace_folder}")
else:
logger.warning(f"Could not create folder: {response.text}")
except Exception as folder_error:
logger.warning(f"Could not create folder: {str(folder_error)}")
uploaded_path = manager.upload_notebook(code, workspace_path)
return {
"status": "success",
"message": f"Notebook uploaded successfully",
"workspace_path": uploaded_path
}
except Exception as e:
logger.error(f"Error uploading notebook: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_upload_sample_py_to_databricks() -> dict:
"""
Upload a hardcoded Python string as a .py file to Databricks workspace at /Users/stephen.hsu@databricks.com/claude_generated_folder/sample_claude_upload.py
"""
try:
from databricks_manager import get_manager
manager = get_manager()
# Create a proper folder structure using claude_generated_folder
folder_name = "claude_generated_folder"
workspace_folder = f"/Users/stephen.hsu@databricks.com/{folder_name}"
workspace_path = f"{workspace_folder}/sample_claude_upload.py"
# Check if folder exists and create it if needed
try:
import requests
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
folder_payload = {
"path": workspace_folder,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
response = requests.post(create_url, headers=headers, json=folder_payload)
if response.status_code == 200:
# API returns 200 whether folder is created or already exists
if response.text.strip() == "{}":
logger.info(f"✓ Folder ensured: {workspace_folder}")
else:
logger.info(f"✓ Created new folder: {workspace_folder}")
elif response.status_code == 400 and "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Using existing folder: {workspace_folder}")
else:
logger.warning(f"Could not create folder: {response.text}")
except Exception as folder_error:
logger.warning(f"Could not create folder: {str(folder_error)}")
# Upload the Python file
code = """
def hello():
print('Hello from Claude!')
if __name__ == '__main__':
hello()
"""
uploaded_path = manager.upload_notebook(code, workspace_path)
return {"status": "success", "message": "Sample Python file uploaded successfully", "workspace_path": uploaded_path}
except Exception as e:
import logging
logging.error(f"Error uploading sample Python file: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_databricks_user_folder(username: str = "stephen.hsu@databricks.com") -> dict:
"""
Create a user folder in Databricks workspace for file uploads.
"""
try:
from databricks_manager import get_manager
manager = get_manager()
# Extract username from email
username_clean = username.split('@')[0]
workspace_path = f"/Users/{username_clean}"
# Use the Databricks REST API to create the folder
import requests
import base64
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
# Create folder using workspace API
folder_payload = {
"path": workspace_path,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
logger.info(f"Creating user folder: {workspace_path}")
response = requests.post(create_url, headers=headers, json=folder_payload)
if response.status_code == 200:
logger.info(f"✓ User folder created successfully: {workspace_path}")
return {
"status": "success",
"message": f"User folder created: {workspace_path}",
"workspace_path": workspace_path
}
else:
# If folder already exists, that's also fine
if "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ User folder already exists: {workspace_path}")
return {
"status": "success",
"message": f"User folder already exists: {workspace_path}",
"workspace_path": workspace_path
}
else:
raise Exception(f"Failed to create folder: {response.text}")
except Exception as e:
logger.error(f"Error creating user folder: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_databricks_subfolder(folder_name: str, username: str = "stephen.hsu@databricks.com") -> dict:
"""
Create a subfolder within the user's Databricks workspace directory.
"""
try:
from databricks_manager import get_manager
manager = get_manager()
# Use the full email path since we know it exists
workspace_path = f"/Users/{username}/{folder_name}"
import requests
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
# Create folder using workspace API
folder_payload = {
"path": workspace_path,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
logger.info(f"Creating subfolder: {workspace_path}")
response = requests.post(create_url, headers=headers, json=folder_payload)
if response.status_code == 200:
logger.info(f"✓ Subfolder created successfully: {workspace_path}")
return {
"status": "success",
"message": f"Subfolder created: {workspace_path}",
"workspace_path": workspace_path
}
else:
# If folder already exists, that's also fine
if "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Subfolder already exists: {workspace_path}")
return {
"status": "success",
"message": f"Subfolder already exists: {workspace_path}",
"workspace_path": workspace_path
}
else:
raise Exception(f"Failed to create subfolder: {response.text}")
except Exception as e:
logger.error(f"Error creating subfolder: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_project_folder_and_upload_assets(project_name: str, assets: dict, username: str = "stephen.hsu@databricks.com") -> dict:
"""
Create a project folder in Databricks workspace within claude_generated_folder and upload multiple assets to it.
Args:
project_name: Name of the project folder to create
assets: Dictionary of assets to upload, e.g., {"main.py": "code content", "config.json": "json content"}
username: Databricks username
"""
try:
from databricks_manager import get_manager
manager = get_manager()
# Create project folder structure within claude_generated_folder
base_folder = "claude_generated_folder"
workspace_base_folder = f"/Users/{username}/{base_folder}"
workspace_folder = f"{workspace_base_folder}/{project_name}"
# First ensure the base claude_generated_folder exists
try:
import requests
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
# Create base folder
base_folder_payload = {
"path": workspace_base_folder,
"object_type": "DIRECTORY"
}
create_url = f"{manager.host.rstrip('/')}/api/2.0/workspace/mkdirs"
response = requests.post(create_url, headers=headers, json=base_folder_payload)
if response.status_code == 200:
logger.info(f"✓ Created new base folder: {workspace_base_folder}")
elif "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Using existing base folder: {workspace_base_folder}")
else:
logger.warning(f"Could not create base folder: {response.text}")
# Create project subfolder
project_folder_payload = {
"path": workspace_folder,
"object_type": "DIRECTORY"
}
response = requests.post(create_url, headers=headers, json=project_folder_payload)
if response.status_code == 200:
logger.info(f"✓ Created new project folder: {workspace_folder}")
elif "RESOURCE_ALREADY_EXISTS" in response.text:
logger.info(f"✓ Using existing project folder: {workspace_folder}")
else:
raise Exception(f"Could not create project folder: {response.text}")
except Exception as folder_error:
logger.error(f"Error creating project folder: {str(folder_error)}")
return {"status": "error", "detail": f"Failed to create project folder: {str(folder_error)}"}
# Upload each asset
uploaded_files = []
for filename, content in assets.items():
try:
file_path = f"{workspace_folder}/{filename}"
uploaded_path = manager.upload_notebook(content, file_path)
uploaded_files.append({
"filename": filename,
"path": uploaded_path,
"status": "success"
})
logger.info(f"✓ Uploaded: {filename}")
except Exception as upload_error:
logger.error(f"Error uploading {filename}: {str(upload_error)}")
uploaded_files.append({
"filename": filename,
"status": "error",
"detail": str(upload_error)
})
return {
"status": "success",
"message": f"Project folder created and assets uploaded",
"project_folder": workspace_folder,
"uploaded_files": uploaded_files
}
except Exception as e:
logger.error(f"Error in project folder creation and upload: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_and_run_job_for_notebook(notebook_path: str, job_name: str = "Claude Python Job", username: str = "stephen.hsu@databricks.com") -> dict:
"""
Create and run a Databricks job for a given notebook path using serverless compute. Returns job/run IDs and Databricks URLs for monitoring.
"""
try:
from databricks_manager import get_manager
manager = get_manager()
import os
host = manager.host.rstrip('/')
# Serverless job config - no cluster specification = serverless compute
job_config = {
"name": job_name,
"tasks": [
{
"task_key": "main_task",
"notebook_task": {
"notebook_path": notebook_path,
"source": "WORKSPACE"
}
# No new_cluster field = serverless compute
}
],
"timeout_seconds": 3600, # 1 hour timeout
"max_concurrent_runs": 1
}
# Create job
job = manager.create_job(job_config)
job_id = getattr(job, 'job_id', None) or job.get('job_id')
if not job_id:
raise Exception(f"Failed to create job: {job}")
# Start job
run = manager.run_job(job_id)
run_id = getattr(run, 'run_id', None) or run.get('run_id')
if not run_id:
raise Exception(f"Failed to start job: {run}")
# Return job/run info and URLs
return {
"status": "success",
"job_id": job_id,
"run_id": run_id,
"job_url": f"{host}/jobs/{job_id}",
"run_url": f"{host}/jobs/{job_id}/runs/{run_id}",
"notebook_path": notebook_path,
"message": "Job created and started using serverless compute. Monitor in Databricks UI."
}
except Exception as e:
import logging
logging.error(f"Error creating/running job: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_create_and_run_serverless_dlt_pipeline(notebook_path: str, pipeline_name: str = "Claude DLT Pipeline", username: str = "stephen.hsu@databricks.com") -> dict:
"""
Create and run a serverless Delta Live Tables pipeline for a given notebook path. Returns pipeline/update IDs and Databricks URLs for monitoring.
"""
try:
from databricks_manager import get_manager
manager = get_manager()
import os
import requests
import uuid
host = manager.host.rstrip('/')
# Generate unique execution ID
execution_id = str(uuid.uuid4())[:8]
unique_pipeline_name = f"{pipeline_name}_{execution_id}"
# Serverless DLT pipeline config using REST API (following other_databricks_executor pattern)
pipeline_config = {
"name": unique_pipeline_name,
# Use Unity Catalog approach (no storage field to avoid conflicts)
"catalog": "users",
"target": username.split('@')[0].replace('.', '_'), # Convert email to valid schema name
"serverless": True, # THE KEY FIELD - this enables serverless compute
"libraries": [
{
"notebook": {
"path": notebook_path
}
}
],
"continuous": False,
"development": True, # Use development mode for testing
"channel": "CURRENT",
"edition": "ADVANCED"
}
logger.info(f"Creating serverless DLT pipeline: {unique_pipeline_name}")
logger.info(f"Pipeline config: {pipeline_config}")
# Create pipeline using REST API (more reliable than SDK for serverless)
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
create_url = f"{host}/api/2.0/pipelines"
response = requests.post(create_url, headers=headers, json=pipeline_config)
if response.status_code != 200:
raise Exception(f"Failed to create DLT pipeline: {response.text}")
pipeline_id = response.json().get("pipeline_id")
if not pipeline_id:
raise Exception("No pipeline_id in response")
logger.info(f"✓ DLT Pipeline created successfully: {pipeline_id}")
# Start pipeline and return immediately (no monitoring)
start_url = f"{host}/api/2.0/pipelines/{pipeline_id}/updates"
start_response = requests.post(start_url, headers=headers, json={})
if start_response.status_code != 200:
raise Exception(f"Failed to start DLT pipeline: {start_response.text}")
update_id = start_response.json().get("update_id")
if not update_id:
raise Exception("No update_id in response")
logger.info(f"✅ DLT Pipeline started successfully: Pipeline ID {pipeline_id}, Update ID {update_id}")
# Return pipeline details immediately with links
return {
"status": "success",
"execution_id": execution_id,
"pipeline_id": pipeline_id,
"update_id": update_id,
"pipeline_name": unique_pipeline_name,
"notebook_path": notebook_path,
"pipeline_url": f"{host}/pipelines/{pipeline_id}",
"update_url": f"{host}/pipelines/{pipeline_id}/updates/{update_id}",
"execution_time": 0, # Pipeline just started
"logs": "DLT Pipeline started successfully - check Databricks for progress",
"validation": {
'pipeline_status': 'STARTED',
'tables_to_create': ['bronze', 'silver', 'gold'],
'status': 'Pipeline running - check Databricks workspace for progress'
},
"message": "Serverless DLT pipeline created and started. Monitor in Databricks UI."
}
except Exception as e:
import logging
logging.error(f"Error creating/running DLT pipeline: {str(e)}")
return {"status": "error", "detail": str(e)}
@mcp.tool()
def mcp_check_dlt_pipeline_status(pipeline_id: str, update_id: str) -> dict:
"""
Check the status of a Delta Live Tables pipeline update.
"""
try:
from databricks_manager import get_manager
manager = get_manager()
import requests
import time
host = manager.host.rstrip('/')
headers = {
"Authorization": f"Bearer {manager.token}",
"Content-Type": "application/json"
}
status_url = f"{host}/api/2.0/pipelines/{pipeline_id}/updates/{update_id}"
response = requests.get(status_url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to get pipeline status: {response.text}")
status_data = response.json()
state = status_data.get("state", {})
life_cycle_state = state.get("life_cycle_state", "UNKNOWN")
result_state = state.get("result_state", "UNKNOWN")
return {
"status": "success",
"pipeline_id": pipeline_id,
"update_id": update_id,
"life_cycle_state": life_cycle_state,
"result_state": result_state,
"state_details": state,
"pipeline_url": f"{host}/pipelines/{pipeline_id}",
"update_url": f"{host}/pipelines/{pipeline_id}/updates/{update_id}",
"message": f"Pipeline {life_cycle_state}, Result: {result_state}"
}
except Exception as e:
import logging
logging.error(f"Error checking DLT pipeline status: {str(e)}")
return {"status": "error", "detail": str(e)}
if __name__ == "__main__":
mcp.run()