Skip to main content
Glama
ChrisChoTW

databricks-mcp

by ChrisChoTW

get_table_lineage

Retrieve table lineage to identify upstream and downstream dependencies, including related notebooks and jobs in Databricks environments.

Instructions

Get table lineage (upstream/downstream tables and related notebooks/jobs)

Args: catalog: Catalog name schema: Schema name table: Table name include_notebooks: Include notebook/job associations (slower) limit: Max rows to return (default 50)

Returns: Dict with upstream, downstream tables and optionally notebook/job info

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
catalogYes
schemaYes
tableYes
include_notebooksNo
limitNo

Implementation Reference

  • The 'get_table_lineage' function handles retrieving and classifying table lineage (upstream/downstream tables and associated notebook/job information) from the system.access.table_lineage table. It is registered as an MCP tool using the @mcp.tool decorator.
    def get_table_lineage(
        ctx: Context,
        catalog: str,
        schema: str,
        table: str,
        include_notebooks: bool = False,
        limit: int = 50
    ) -> Dict[str, Any]:
        """
        Get table lineage (upstream/downstream tables and related notebooks/jobs)
    
        Args:
            catalog: Catalog name
            schema: Schema name
            table: Table name
            include_notebooks: Include notebook/job associations (slower)
            limit: Max rows to return (default 50)
    
        Returns:
            Dict with upstream, downstream tables and optionally notebook/job info
        """
        cat = safe_identifier(catalog, "catalog")
        sch = safe_identifier(schema, "schema")
        tbl = safe_identifier(table, "table")
        full_name = f"{cat}.{sch}.{tbl}"
    
        # Query system.access.table_lineage
        sql_query = f"""
        SELECT DISTINCT
            source_table_full_name,
            target_table_full_name,
            entity_type,
            entity_metadata
        FROM system.access.table_lineage
        WHERE source_table_full_name = '{full_name}'
           OR target_table_full_name = '{full_name}'
        ORDER BY source_table_full_name, target_table_full_name
        LIMIT {limit}
        """
    
        rows = execute_sql(ctx, sql_query)
    
        # Classify upstream/downstream
        upstream: Set[str] = set()
        downstream: Set[str] = set()
        notebook_reads: Dict[str, Dict] = {}
        notebook_writes: Dict[str, Dict] = {}
    
        # Collect job/notebook pairs for resolution
        job_notebook_pairs: List[Dict] = []
    
        for row in rows:
            source = row.get("source_table_full_name")
            target = row.get("target_table_full_name")
            metadata_str = row.get("entity_metadata")
    
            # Classify tables
            if source == full_name and target and target != full_name:
                downstream.add(target)
            elif target == full_name and source and source != full_name:
                upstream.add(source)
    
            # Parse notebook/job info if requested
            if include_notebooks and metadata_str:
                try:
                    metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else metadata_str
                    notebook_id = metadata.get("notebook_id")
                    job_info = metadata.get("job_info", {})
                    job_id = job_info.get("job_id") if job_info else None
    
                    if notebook_id and job_id:
                        job_notebook_pairs.append({
                            "notebook_id": notebook_id,
                            "job_id": job_id,
                            "source": source,
                            "target": target
                        })
                except (json.JSONDecodeError, TypeError):
                    pass
    
        result = {
            "table": full_name,
            "upstream": sorted(upstream),
            "downstream": sorted(downstream),
            "upstream_count": len(upstream),
            "downstream_count": len(downstream)
        }
    
        # Resolve notebook/job details if requested
        if include_notebooks and job_notebook_pairs:
            ctx.info(f"Resolving {len(job_notebook_pairs)} notebook/job associations...")
            w = get_workspace_client()
    
            for pair in job_notebook_pairs:
                key = f"{pair['job_id']}:{pair['notebook_id']}"
                info = _resolve_job_info(w, pair["job_id"], pair["notebook_id"])
    
                if pair["source"] == full_name:
                    notebook_reads[key] = info
                elif pair["target"] == full_name:
                    notebook_writes[key] = info
    
            result["notebooks_reading"] = list(notebook_reads.values())
            result["notebooks_writing"] = list(notebook_writes.values())
    
        return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ChrisChoTW/databricks-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server