test_complete_serverless_workflow.pyβ’3.56 kB
#!/usr/bin/env python3
"""
Complete test of the serverless Databricks MCP workflow:
1. Create folder
2. Upload Python file
3. Create and run serverless job
"""
import os
import sys
import tempfile
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def test_complete_serverless_workflow():
"""Test the complete serverless workflow"""
print("π§ͺ Testing Complete Serverless Databricks MCP Workflow")
print("=" * 60)
# Import MCP functions
from main import (
mcp_create_folder,
mcp_upload_py_file_to_databricks,
mcp_create_and_run_job_for_notebook
)
# Test parameters
username = "stephen.hsu@databricks.com"
folder_path = f"/Users/{username}/claude_generated_folder"
# Create a test Python file
test_code = '''
# Test Python file for serverless execution
import datetime
print("π Starting serverless job execution...")
print(f"Timestamp: {datetime.datetime.now()}")
# Simulate some work
import time
time.sleep(2)
print("β
Serverless job completed successfully!")
print(f"Final timestamp: {datetime.datetime.now()}")
'''
try:
# Step 1: Create folder
print("\nπ Step 1: Creating folder...")
folder_result = mcp_create_folder(folder_path)
print(f"Folder creation result: {folder_result}")
if folder_result.get('status') != 'success':
print("β Folder creation failed")
return
# Step 2: Upload Python file
print("\nπ€ Step 2: Uploading Python file...")
filename = "serverless_test.py"
upload_result = mcp_upload_py_file_to_databricks(
filename=filename,
content=test_code,
username=username
)
print(f"Upload result: {upload_result}")
if upload_result.get('status') != 'success':
print("β File upload failed")
return
notebook_path = upload_result.get('notebook_path')
print(f"β
File uploaded to: {notebook_path}")
# Step 3: Create and run serverless job
print("\nβ‘ Step 3: Creating and running serverless job...")
job_result = mcp_create_and_run_job_for_notebook(
notebook_path=notebook_path,
job_name="Complete Serverless Test Job",
username=username
)
print(f"Job result: {job_result}")
if job_result.get('status') == 'success':
print("\nπ SUCCESS: Complete serverless workflow executed!")
print(f"Job ID: {job_result.get('job_id')}")
print(f"Run ID: {job_result.get('run_id')}")
print(f"Job URL: {job_result.get('job_url')}")
print(f"Run URL: {job_result.get('run_url')}")
print(f"Message: {job_result.get('message')}")
print("\nπ Summary:")
print("β
Folder created successfully")
print("β
Python file uploaded to Databricks")
print("β
Serverless job created and started")
print("β
No cluster permission errors encountered")
print("\nπ Monitor your job in Databricks UI using the provided URLs")
else:
print(f"β Job creation failed: {job_result.get('detail')}")
except Exception as e:
print(f"β Exception occurred: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_complete_serverless_workflow()