get_lineage
Analyze data lineage and dependencies for DBT resources to understand upstream sources and downstream impacts with configurable depth and direction.
Instructions
Get lineage (dependency tree) for any dbt resource with auto-detection.
This unified tool works across all resource types (models, sources, seeds, snapshots, etc.) showing upstream and/or downstream dependencies with configurable depth.
Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" Examples: "customers", "jaffle_shop.orders", "raw_customers" resource_type: Optional filter to narrow search: - "model": Data transformation models - "source": External data sources - "seed": CSV reference data files - "snapshot": SCD Type 2 historical tables - "test": Data quality tests - "analysis": Ad-hoc analysis queries - None: Auto-detect (searches all types) direction: Lineage direction: - "upstream": Show where data comes from (parents) - "downstream": Show what depends on this resource (children) - "both": Show full lineage (default) depth: Maximum levels to traverse (None for unlimited) - depth=1: Immediate dependencies only - depth=2: Dependencies + their dependencies - None: Full dependency tree
Returns: Lineage information with upstream/downstream nodes and statistics. If multiple matches found, returns all matches for LLM to process.
Raises: ValueError: If resource not found or invalid direction
Examples: get_lineage("customers") -> auto-detect and show full lineage get_lineage("customers", "model", "upstream") -> where customers model gets data get_lineage("jaffle_shop.orders", "source", "downstream", 2) -> 2 levels of dependents
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| name | Yes | ||
| resource_type | No | ||
| direction | No | both | |
| depth | No |
Implementation Reference
- src/dbt_core_mcp/server.py:1386-1432 (registration)FastMCP tool registration for 'get_lineage', defines input parameters serving as schema and calls toolImpl_get_lineage after initialization.async def get_lineage( ctx: Context, name: str, resource_type: str | None = None, direction: str = "both", depth: int | None = None, ) -> dict[str, Any]: """Get lineage (dependency tree) for any dbt resource with auto-detection. This unified tool works across all resource types (models, sources, seeds, snapshots, etc.) showing upstream and/or downstream dependencies with configurable depth. Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" Examples: "customers", "jaffle_shop.orders", "raw_customers" resource_type: Optional filter to narrow search: - "model": Data transformation models - "source": External data sources - "seed": CSV reference data files - "snapshot": SCD Type 2 historical tables - "test": Data quality tests - "analysis": Ad-hoc analysis queries - None: Auto-detect (searches all types) direction: Lineage direction: - "upstream": Show where data comes from (parents) - "downstream": Show what depends on this resource (children) - "both": Show full lineage (default) depth: Maximum levels to traverse (None for unlimited) - depth=1: Immediate dependencies only - depth=2: Dependencies + their dependencies - None: Full dependency tree Returns: Lineage information with upstream/downstream nodes and statistics. If multiple matches found, returns all matches for LLM to process. Raises: ValueError: If resource not found or invalid direction Examples: get_lineage("customers") -> auto-detect and show full lineage get_lineage("customers", "model", "upstream") -> where customers model gets data get_lineage("jaffle_shop.orders", "source", "downstream", 2) -> 2 levels of dependents """ await self._ensure_initialized_with_context(ctx) return await self.toolImpl_get_lineage(name, resource_type, direction, depth)
- src/dbt_core_mcp/server.py:598-604 (handler)Primary handler method for the get_lineage tool, delegates core logic to ManifestLoader.get_lineage and handles errors.async def toolImpl_get_lineage(self, name: str, resource_type: str | None = None, direction: str = "both", depth: int | None = None) -> dict[str, Any]: """Implementation for get_lineage tool.""" try: return self.manifest.get_lineage(name, resource_type, direction, depth) # type: ignore except ValueError as e: raise ValueError(f"Lineage error: {e}")
- Core lineage computation logic: finds resource by name/type, traverses upstream/downstream dependencies using dbt's parent_map/child_map with configurable depth, returns structured lineage with stats.def get_lineage( self, name: str, resource_type: str | None = None, direction: str = "both", depth: int | None = None, ) -> dict[str, Any]: """ Get lineage (dependency tree) for any resource type with auto-detection. This unified method works across all resource types (models, sources, seeds, etc.) and provides upstream, downstream, or bidirectional dependency traversal. Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" resource_type: Optional filter (model, source, seed, snapshot, test, analysis). If None, auto-detects resource type. direction: Lineage direction: - "upstream": Show where data comes from (parents) - "downstream": Show what depends on this resource (children) - "both": Show full lineage (default) depth: Maximum levels to traverse (None for unlimited) - depth=1: Immediate dependencies only - depth=2: Dependencies + their dependencies - None: Full dependency tree Returns: Dictionary with lineage information: { "resource": {...}, # The target resource info "upstream": [...], # List of upstream dependencies (if direction in ["upstream", "both"]) "downstream": [...], # List of downstream dependents (if direction in ["downstream", "both"]) "stats": { "upstream_count": int, "downstream_count": int, "total_dependencies": int } } If multiple matches found, returns: {"multiple_matches": True, "matches": [...], "message": "..."} Raises: RuntimeError: If manifest not loaded ValueError: If resource not found or invalid direction Examples: get_lineage("customers") -> auto-detect and show full lineage get_lineage("customers", "model", "upstream") -> show where customers model gets data get_lineage("customers", direction="downstream", depth=2) -> 2 levels of dependents """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") # Validate direction valid_directions = {"upstream", "downstream", "both"} if direction not in valid_directions: raise ValueError(f"Invalid direction '{direction}'. Must be one of: {', '.join(sorted(valid_directions))}") # Get the resource (auto-detect if resource_type not specified) resource = self.get_resource_node(name, resource_type) # Handle multiple matches - return for LLM to process if resource.get("multiple_matches"): return resource # Extract unique_id for lineage traversal unique_id = resource.get("unique_id") if not unique_id: raise ValueError(f"Resource '{name}' does not have a unique_id") # Build lineage based on direction result: dict[str, Any] = { "resource": { "name": resource.get("name"), "unique_id": unique_id, "resource_type": resource.get("resource_type"), "package_name": resource.get("package_name"), } } upstream: list[dict[str, Any]] = [] downstream: list[dict[str, Any]] = [] if direction in ("upstream", "both"): upstream = self.get_upstream_nodes(unique_id, max_depth=depth) result["upstream"] = upstream if direction in ("downstream", "both"): downstream = self.get_downstream_nodes(unique_id, max_depth=depth) result["downstream"] = downstream # Add statistics result["stats"] = { "upstream_count": len(upstream), "downstream_count": len(downstream), "total_dependencies": len(upstream) + len(downstream), } return result
- Supporting utility for recursive upstream (parents) dependency traversal from a node's unique_id.def get_upstream_nodes(self, unique_id: str, max_depth: int | None = None, current_depth: int = 0) -> list[dict[str, Any]]: """Get all upstream dependencies of a node recursively. Args: unique_id: The unique identifier of the node max_depth: Maximum depth to traverse (None for unlimited) current_depth: Current recursion depth (internal use) Returns: List of dictionaries with upstream node info: {"unique_id": str, "name": str, "type": str, "distance": int} """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") if max_depth is not None and current_depth >= max_depth: return [] parent_map = self._manifest.get("parent_map", {}) parents = parent_map.get(unique_id, []) upstream: list[dict[str, Any]] = [] seen: set[str] = set() for parent_id in parents: if parent_id in seen: continue seen.add(parent_id) node = self.get_node_by_unique_id(parent_id) if node: resource_type = node.get("resource_type", "unknown") upstream.append( { "unique_id": parent_id, "name": node.get("name", ""), "type": resource_type, "distance": current_depth + 1, } ) # Recurse if max_depth is None or current_depth + 1 < max_depth: grandparents = self.get_upstream_nodes(parent_id, max_depth, current_depth + 1) for gp in grandparents: if gp["unique_id"] not in seen: seen.add(str(gp["unique_id"])) upstream.append(gp) return upstream
- Supporting utility for recursive downstream (children) dependency traversal from a node's unique_id.def get_downstream_nodes(self, unique_id: str, max_depth: int | None = None, current_depth: int = 0) -> list[dict[str, Any]]: """Get all downstream dependents of a node recursively. Args: unique_id: The unique identifier of the node max_depth: Maximum depth to traverse (None for unlimited) current_depth: Current recursion depth (internal use) Returns: List of dictionaries with downstream node info: {"unique_id": str, "name": str, "type": str, "distance": int} """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") if max_depth is not None and current_depth >= max_depth: return [] child_map = self._manifest.get("child_map", {}) children = child_map.get(unique_id, []) downstream: list[dict[str, Any]] = [] seen: set[str] = set() for child_id in children: if child_id in seen: continue seen.add(child_id) node = self.get_node_by_unique_id(child_id) if node: resource_type = node.get("resource_type", "unknown") downstream.append( { "unique_id": child_id, "name": node.get("name", ""), "type": resource_type, "distance": current_depth + 1, } ) # Recurse if max_depth is None or current_depth + 1 < max_depth: grandchildren = self.get_downstream_nodes(child_id, max_depth, current_depth + 1) for gc in grandchildren: if gc["unique_id"] not in seen: seen.add(str(gc["unique_id"])) downstream.append(gc) return downstream