get_table_lineage
Retrieve table lineage to identify upstream and downstream dependencies, including related notebooks and jobs in Databricks environments.
Instructions
Get table lineage (upstream/downstream tables and related notebooks/jobs)
Args: catalog: Catalog name schema: Schema name table: Table name include_notebooks: Include notebook/job associations (slower) limit: Max rows to return (default 50)
Returns: Dict with upstream, downstream tables and optionally notebook/job info
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| catalog | Yes | ||
| schema | Yes | ||
| table | Yes | ||
| include_notebooks | No | ||
| limit | No |
Implementation Reference
- tools/delta.py:90-196 (handler)The 'get_table_lineage' function handles retrieving and classifying table lineage (upstream/downstream tables and associated notebook/job information) from the system.access.table_lineage table. It is registered as an MCP tool using the @mcp.tool decorator.
def get_table_lineage( ctx: Context, catalog: str, schema: str, table: str, include_notebooks: bool = False, limit: int = 50 ) -> Dict[str, Any]: """ Get table lineage (upstream/downstream tables and related notebooks/jobs) Args: catalog: Catalog name schema: Schema name table: Table name include_notebooks: Include notebook/job associations (slower) limit: Max rows to return (default 50) Returns: Dict with upstream, downstream tables and optionally notebook/job info """ cat = safe_identifier(catalog, "catalog") sch = safe_identifier(schema, "schema") tbl = safe_identifier(table, "table") full_name = f"{cat}.{sch}.{tbl}" # Query system.access.table_lineage sql_query = f""" SELECT DISTINCT source_table_full_name, target_table_full_name, entity_type, entity_metadata FROM system.access.table_lineage WHERE source_table_full_name = '{full_name}' OR target_table_full_name = '{full_name}' ORDER BY source_table_full_name, target_table_full_name LIMIT {limit} """ rows = execute_sql(ctx, sql_query) # Classify upstream/downstream upstream: Set[str] = set() downstream: Set[str] = set() notebook_reads: Dict[str, Dict] = {} notebook_writes: Dict[str, Dict] = {} # Collect job/notebook pairs for resolution job_notebook_pairs: List[Dict] = [] for row in rows: source = row.get("source_table_full_name") target = row.get("target_table_full_name") metadata_str = row.get("entity_metadata") # Classify tables if source == full_name and target and target != full_name: downstream.add(target) elif target == full_name and source and source != full_name: upstream.add(source) # Parse notebook/job info if requested if include_notebooks and metadata_str: try: metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else metadata_str notebook_id = metadata.get("notebook_id") job_info = metadata.get("job_info", {}) job_id = job_info.get("job_id") if job_info else None if notebook_id and job_id: job_notebook_pairs.append({ "notebook_id": notebook_id, "job_id": job_id, "source": source, "target": target }) except (json.JSONDecodeError, TypeError): pass result = { "table": full_name, "upstream": sorted(upstream), "downstream": sorted(downstream), "upstream_count": len(upstream), "downstream_count": len(downstream) } # Resolve notebook/job details if requested if include_notebooks and job_notebook_pairs: ctx.info(f"Resolving {len(job_notebook_pairs)} notebook/job associations...") w = get_workspace_client() for pair in job_notebook_pairs: key = f"{pair['job_id']}:{pair['notebook_id']}" info = _resolve_job_info(w, pair["job_id"], pair["notebook_id"]) if pair["source"] == full_name: notebook_reads[key] = info elif pair["target"] == full_name: notebook_writes[key] = info result["notebooks_reading"] = list(notebook_reads.values()) result["notebooks_writing"] = list(notebook_writes.values()) return result