Skip to main content
Glama
PulkitXChadha

Databricks MCP Server

build_ldp_pipeline.md9.11 kB
--- name: build_ldp_pipeline description: Build Lakeflow Declarative Pipeline with medallion architecture arguments: - name: catalog description: Unity Catalog name (will create if specified and does not exist) required: true schema: type: string pattern: "^[a-zA-Z][a-zA-Z0-9_]*$" - name: schema description: Schema for Lakeflow tables (will create if specified and does not exist) required: true schema: type: string pattern: "^[a-zA-Z][a-zA-Z0-9_]*$" - name: source_tables description: List of source tables to process - mutually exclusive with schema required: true schema: type: array items: type: string pattern: "^[a-zA-Z][a-zA-Z0-9_]*\\.[a-zA-Z][a-zA-Z0-9_]*\\.[a-zA-Z][a-zA-Z0-9_]*$" minItems: 1 - name: sql_warehouse description: SQL warehouse ID or name for queries required: true schema: type: string - name: workspace_url description: Databricks workspace URL required: true schema: type: string pattern: "^https://(adb-[0-9]{16}\\.[0-9]+\\.(azure|aws|gcp)?databricks\\.(net|com)|[a-zA-Z0-9][a-zA-Z0-9-]*\\.cloud\\.databricks\\.com)/?$" - name: pipeline_name description: Name for your pipeline required: true schema: type: string pattern: "^[a-zA-Z][a-zA-Z0-9_]*$" mutually_exclusive: - [schema, source_tables] --- Build a Lakeflow Declarative Pipeline (LDP) using medallion architecture for scalable data processing with built-in quality controls. ## Context **Configuration Provided:** - Pipeline Name: {pipeline_name} - Workspace URL: {workspace_url} - Catalog: {catalog} - Schema: {schema} - Source Tables: {source_tables} - SQL Warehouse: {sql_warehouse} ## Objective Create a production-ready Lakeflow Declarative Pipeline (formerly Delta Live Tables) that: 1. Implements medallion architecture (Bronze → Silver → Gold) 2. Enforces data quality through expectations 3. Optimizes for analytics with star schema design 4. Deploys via Databricks Asset Bundles with serverless compute ## Workflow ### 1: Validation & Discovery (REQUIRED FIRST) - **STOP**: Verify workspace context and required parameters - Validate source table accessibility - Identify primary keys and business entities - Create target catalog and schema if needed - Define data quality requirements - Plan star schema design for gold layer ### 2. Pipeline Architecture - Analyze source table structures - Design medallion layer schemas - Plan data quality expectations **Bronze Layer (Raw Ingestion):** - Ingest data with minimal transformation - Add audit columns (ingestion_time, source_file) - Enable schema evolution for flexibility - Support multiple formats (Delta, Parquet, CSV, JSON) **Silver Layer (Cleansed & Validated):** - Apply data quality expectations - Standardize data types and formats - Handle nulls and duplicates - Create conformed dimensions **Gold Layer (Business-Ready):** - Implement star schema design - Create fact and dimension tables - Optimize for query performance - Add business calculations ### 3. Implementation **Pipeline Structure:** ```python # Bronze Layer @dlt.table( name="bronze_table", comment="Raw data ingestion" ) @dlt.expect_or_drop("valid_id", "id IS NOT NULL") def bronze_table(): return spark.readStream.table("source_table") # Silver Layer @dlt.table( name="silver_table", comment="Cleansed and validated data" ) def silver_table(): return dlt.read("bronze_table").select(...) # Gold Layer @dlt.table( name="gold_fact_table", comment="Business metrics fact table" ) def gold_fact_table(): return dlt.read("silver_table").join(...) ``` ### 4. Data Quality Expectations **Common Patterns:** ```python # Required fields @dlt.expect_or_drop("not_null", "column IS NOT NULL") # Data validation @dlt.expect_or_fail("valid_range", "value BETWEEN 0 AND 100") # Referential integrity @dlt.expect("foreign_key", "id IN (SELECT id FROM dimension)") # Data freshness @dlt.expect("recent_data", "date >= current_date() - 7") ``` ### 5. Asset Bundle Configuration **Directory Structure:** ``` project/ ├── databricks.yml ├── resources/ │ └── pipeline.yml └── src/ └── pipeline.py ``` **databricks.yml:** ```yaml bundle: name: ${pipeline_name}_bundle targets: dev: workspace: host: ${workspace_url} resources: pipelines: ${pipeline_name}: name: ${pipeline_name} target: ${schema} catalog: ${catalog} libraries: - file: path: ./src/pipeline.py configuration: pipelines.autoOptimize.managed: true serverless: true ``` ### 6. Deployment & Monitoring - Deploy using Asset Bundle commands ONLY - Execute pipeline - Validate results **Deployment Steps:** ```bash # Validate bundle databricks bundle validate --target dev # Deploy to workspace databricks bundle deploy --target dev # Start pipeline databricks bundle run --target dev ${pipeline_name} ``` **Monitoring Checklist:** - Pipeline state reaches "IDLE" - Latest update shows "COMPLETED" - No ERROR events in history - All tables created and queryable - Data quality metrics pass thresholds ### IMMEDIATE ACTIONS AFTER FILE CREATION: 1. **VALIDATE**: `databricks bundle validate --target dev` 2. **DEPLOY**: `databricks bundle deploy --target dev` 3. **EXECUTE**: `databricks bundle run --target dev [pipeline_name]` 4. **MONITOR**: Track all flow executions until completion 5. **VERIFY**: Query all tables and demonstrate analytics ## Best Practices ### Performance Optimization - Use serverless compute for auto-scaling - Implement incremental processing with Auto Loader - Partition large tables by date or category - Z-order clustering for query optimization ### **Deployment Requirements** - **Asset Bundle**: Create Databricks Asset Bundle for version control - **Parameterization**: Use ${workspace.current_user.userName} for all user references - **Environment**: [specify target environment, e.g., dev/prod] - **Permissions**: [specify permission levels needed] ### Error Handling ```python # Safe division value / total if total > 0 else 0.0 # Null handling COALESCE(field, default_value) # Type conversion CAST(string_col AS DOUBLE) ``` ### Star Schema Design - **Fact Tables**: Transactions, events, measurements - **Dimension Tables**: Customers, products, time, geography - **Slowly Changing Dimensions**: Type 2 with effective dates - **Conformed Dimensions**: Shared across fact tables ## Available Tools **Pipeline Management:** - `list_pipelines`, `get_pipeline`, `create_pipeline` - `start_pipeline_update`, `stop_pipeline_update` - `get_pipeline_run`, `list_pipeline_runs` **Unity Catalog:** - `create_catalog`, `create_schema` - `describe_uc_table`, `list_uc_tables` **Monitoring:** - `execute_dbsql` - Query validation - `get_job_run_logs` - Pipeline execution logs ## Success Criteria ✓ Pipeline deploys without errors ✓ All medallion layers created successfully ✓ Data quality expectations pass ✓ Star schema tables queryable ✓ Asset Bundle version controlled ✓ **Pipeline Status**: Must show "COMPLETED" status ✓ **Table Creation**: All 10 expected tables created (2 Bronze + 4 Silver + 4 Gold) ✓ **Record Counts**: Verify non-zero record counts in fact tables ✓ **Complete Pipeline Deployment**: ALL layers deployed without errors ✓ **Full Medallion Architecture**: Bronze + Silver + Gold ALL operational ✓ **Comprehensive Data Quality**: ALL expectations implemented and passing ✓ **Complete Star Schema**: ALL fact and dimension tables created and queryable ✓ **Full Asset Bundle**: Complete production-ready deployment ✓ **Comprehensive Analytics**: ALL business queries functional ## Common Issues & Solutions | Issue | Solution | |-------|----------| | Permission denied | Grant schema privileges before deployment | | Schema evolution conflicts | Enable `mergeSchema` option | | Memory errors | Use serverless or increase cluster size | | Slow queries | Add Z-ordering on filter columns | | Failed expectations | Review data quality rules, use `expect` instead of `expect_or_fail` | ## Example Star Schema ```yaml Gold Layer: Fact Tables: - fact_sales (date_key, product_key, customer_key, amount, quantity) - fact_inventory (date_key, product_key, warehouse_key, quantity) Dimensions: - dim_date (date_key, date, year, quarter, month, day_of_week) - dim_product (product_key, name, category, price) - dim_customer (customer_key, name, segment, geography) ``` ## Documentation References - [Lakeflow/DLT Overview](https://docs.databricks.com/delta-live-tables/) - [Medallion Architecture](https://docs.databricks.com/lakehouse/medallion) - [Asset Bundles Guide](https://docs.databricks.com/dev-tools/bundles/) - [Data Quality Expectations](https://docs.databricks.com/delta-live-tables/expectations) Ready to build your Lakeflow Declarative Pipeline! Provide any additional requirements or constraints to customize the implementation.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PulkitXChadha/awesome-databricks-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server