Skip to main content
Glama

Databricks MCP Server

by stephenjhsu
test_complete_serverless_dlt_workflow.pyβ€’6.42 kB
#!/usr/bin/env python3 """ Complete test of the serverless DLT Databricks MCP workflow: 1. Create folder 2. Upload DLT pipeline notebook 3. Create and run serverless DLT pipeline 4. Check pipeline status """ import os import sys import tempfile from dotenv import load_dotenv # Load environment variables load_dotenv() def test_complete_serverless_dlt_workflow(): """Test the complete serverless DLT workflow""" print("πŸ§ͺ Testing Complete Serverless DLT Databricks MCP Workflow") print("=" * 70) # Import MCP functions from main import ( mcp_create_folder, mcp_upload_notebook, mcp_create_and_run_serverless_dlt_pipeline, mcp_check_dlt_pipeline_status ) # Test parameters username = "stephen.hsu@databricks.com" folder_path = f"/Users/{username}/claude_generated_folder" # Create a test DLT pipeline notebook dlt_code = ''' # Databricks notebook source import dlt from pyspark.sql.functions import * # COMMAND ---------- # Configure Spark for Delta schema auto-merging spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") print("βœ“ Configured Spark for Delta schema auto-merging") # COMMAND ---------- # Sample DLT Pipeline - Bronze Layer @dlt.table( name="bronze_test_data", comment="Bronze layer for test data" ) def bronze_test_data(): # Create sample data from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("value", IntegerType(), True), StructField("timestamp", TimestampType(), True) ]) data = [ (1, "Test Item A", 100, "2024-01-01 10:00:00"), (2, "Test Item B", 200, "2024-01-01 11:00:00"), (3, "Test Item C", 300, "2024-01-01 12:00:00") ] df = spark.createDataFrame(data, schema) return df # COMMAND ---------- # Silver Layer - Clean and validate data @dlt.table( name="silver_test_data", comment="Silver layer with cleaned and validated data" ) def silver_test_data(): bronze_df = dlt.read("bronze_test_data") # Clean and validate data silver_df = bronze_df.filter(col("id").isNotNull() & col("name").isNotNull()) \ .withColumn("processed_timestamp", current_timestamp()) \ .withColumn("data_quality_score", when(col("value") > 0, "good").otherwise("needs_review")) return silver_df # COMMAND ---------- print("βœ… DLT Pipeline execution complete") print("Created tables:") print("- bronze_test_data") print("- silver_test_data") ''' try: # Step 1: Skip folder creation (Databricks folder already exists) print("\nπŸ“ Step 1: Using existing Databricks folder...") print(f"Using folder: {folder_path}") print("βœ… Databricks folder already exists from previous tests") # Step 2: Upload DLT notebook print("\nπŸ“€ Step 2: Uploading DLT pipeline notebook...") filename = "complete_dlt_test.py" upload_result = mcp_upload_notebook( code=dlt_code, filename=filename, username=username ) print(f"Upload result: {upload_result}") if upload_result.get('status') != 'success': print("❌ DLT notebook upload failed") return notebook_path = upload_result.get('workspace_path') print(f"βœ… DLT notebook uploaded to: {notebook_path}") # Step 3: Create and run serverless DLT pipeline print("\n⚑ Step 3: Creating and running serverless DLT pipeline...") pipeline_result = mcp_create_and_run_serverless_dlt_pipeline( notebook_path=notebook_path, pipeline_name="Complete Serverless DLT Test", username=username ) print(f"Pipeline result: {pipeline_result}") if pipeline_result.get('status') == 'success': print("\nπŸŽ‰ SUCCESS: Complete serverless DLT workflow executed!") pipeline_id = pipeline_result.get('pipeline_id') update_id = pipeline_result.get('update_id') print(f"Execution ID: {pipeline_result.get('execution_id')}") print(f"Pipeline ID: {pipeline_id}") print(f"Update ID: {update_id}") print(f"Pipeline Name: {pipeline_result.get('pipeline_name')}") print(f"Pipeline URL: {pipeline_result.get('pipeline_url')}") print(f"Update URL: {pipeline_result.get('update_url')}") print(f"Message: {pipeline_result.get('message')}") # Step 4: Check pipeline status print("\nπŸ“Š Step 4: Checking pipeline status...") status_result = mcp_check_dlt_pipeline_status( pipeline_id=pipeline_id, update_id=update_id ) print(f"Status result: {status_result}") if status_result.get('status') == 'success': print(f"βœ… Pipeline status retrieved successfully!") print(f"Life Cycle State: {status_result.get('life_cycle_state')}") print(f"Result State: {status_result.get('result_state')}") print(f"Status Message: {status_result.get('message')}") print("\nπŸ“Š Summary:") print("βœ… Folder created successfully") print("βœ… DLT notebook uploaded to Databricks") print("βœ… Serverless DLT pipeline created and started") print("βœ… Pipeline status checking working") print("βœ… No cluster permission errors encountered") print("\nπŸ”— Monitor your DLT pipeline in Databricks UI using the provided URLs") else: print(f"❌ DLT pipeline creation failed: {pipeline_result.get('detail')}") except Exception as e: print(f"❌ Exception occurred: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": test_complete_serverless_dlt_workflow()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stephenjhsu/Databricks-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server