analyze_impact
Analyze downstream dependencies affected by changes to dbt resources like models, sources, or seeds, providing actionable recommendations for running impacted components.
Instructions
Analyze the impact of changing any dbt resource with auto-detection.
This unified tool works across all resource types (models, sources, seeds, snapshots, etc.) showing all downstream dependencies that would be affected by changes. Provides actionable recommendations for running affected resources.
Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" Examples: "stg_customers", "jaffle_shop.orders", "raw_customers" resource_type: Optional filter to narrow search: - "model": Data transformation models - "source": External data sources - "seed": CSV reference data files - "snapshot": SCD Type 2 historical tables - "test": Data quality tests - "analysis": Ad-hoc analysis queries - None: Auto-detect (searches all types)
Returns: Impact analysis with: - List of affected models by distance - Count of affected tests and other resources - Total impact statistics - Resources grouped by distance from changed resource - Recommended dbt command to run affected resources - Human-readable impact assessment message If multiple matches found, returns all matches for LLM to process.
Raises: ValueError: If resource not found
Examples: analyze_impact("stg_customers") -> auto-detect and show impact analyze_impact("jaffle_shop.orders", "source") -> impact of source change analyze_impact("raw_customers", "seed") -> impact of seed data change
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| name | Yes | ||
| resource_type | No |
Implementation Reference
- src/dbt_core_mcp/dbt/manifest.py:573-719 (handler)Core handler implementing the analyze_impact tool logic: locates the target resource, traverses all downstream dependencies using the manifest's child_map, categorizes affected models/tests/others by distance, sorts by distance/name, generates type-specific dbt run recommendations, and provides impact assessment message.def analyze_impact( self, name: str, resource_type: str | None = None, ) -> dict[str, Any]: """ Analyze the impact of changing a resource across all resource types. Shows all downstream dependencies that would be affected by changes, including models, tests, and other resources. Provides actionable recommendations for running affected resources. Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" resource_type: Optional filter (model, source, seed, snapshot, test, analysis). If None, auto-detects resource type. Returns: Dictionary with impact analysis: { "resource": {...}, # The target resource info "impact": { "models_affected": [...], # Downstream models by distance "models_affected_count": int, "tests_affected_count": int, "other_affected_count": int, "total_affected": int }, "affected_by_distance": { "1": [...], # Immediate dependents "2": [...], # Second-level dependents ... }, "recommendation": str, # Suggested dbt command "message": str # Human-readable impact assessment } If multiple matches found, returns: {"multiple_matches": True, "matches": [...], "message": "..."} Raises: RuntimeError: If manifest not loaded ValueError: If resource not found Examples: analyze_impact("stg_customers") -> impact of changing staging model analyze_impact("jaffle_shop.orders", "source") -> impact of source change analyze_impact("raw_customers", "seed") -> impact of seed change """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") # Get the resource (auto-detect if resource_type not specified) resource = self.get_resource_node(name, resource_type) # Handle multiple matches - return for LLM to process if resource.get("multiple_matches"): return resource # Extract unique_id for impact traversal unique_id = resource.get("unique_id") if not unique_id: raise ValueError(f"Resource '{name}' does not have a unique_id") # Get all downstream dependencies (no depth limit for impact) downstream = self.get_downstream_nodes(unique_id, max_depth=None) # Categorize by resource type models_affected: list[dict[str, Any]] = [] tests_affected: list[dict[str, Any]] = [] other_affected: list[dict[str, Any]] = [] affected_by_distance: dict[str, list[dict[str, Any]]] = {} for dep in downstream: dep_type = str(dep["type"]) distance = str(dep["distance"]) # Group by distance if distance not in affected_by_distance: affected_by_distance[distance] = [] affected_by_distance[distance].append(dep) # Categorize by type if dep_type == "model": models_affected.append(dep) elif dep_type == "test": tests_affected.append(dep) else: other_affected.append(dep) # Sort models by distance for better readability models_affected_sorted = sorted(models_affected, key=lambda x: (int(x["distance"]), str(x["name"]))) # Build recommendation based on resource type resource_name = resource.get("name", name) current_resource_type = resource.get("resource_type") if current_resource_type == "source": # For sources, recommend running downstream models if len(models_affected) == 0: recommendation = f"dbt test -s source:{resource.get('source_name')}.{resource_name}" else: recommendation = f"dbt run -s {resource_name}+" elif current_resource_type == "seed": # For seeds, recommend seeding + downstream if len(models_affected) == 0: recommendation = f"dbt seed -s {resource_name} && dbt test -s {resource_name}" else: recommendation = f"dbt seed -s {resource_name} && dbt run -s {resource_name}+" else: # For models, snapshots, etc. if len(models_affected) == 0: recommendation = f"dbt run -s {resource_name}" else: recommendation = f"dbt run -s {resource_name}+" # Build result result: dict[str, Any] = { "resource": { "name": resource_name, "unique_id": unique_id, "resource_type": current_resource_type, "package_name": resource.get("package_name"), }, "impact": { "models_affected": models_affected_sorted, "models_affected_count": len(models_affected), "tests_affected_count": len(tests_affected), "other_affected_count": len(other_affected), "total_affected": len(downstream), }, "affected_by_distance": affected_by_distance, "recommendation": recommendation, } # Add helpful message based on impact size if len(models_affected) == 0: result["message"] = "No downstream models affected. Only this resource needs to be run/tested." elif len(models_affected) <= 3: result["message"] = f"Low impact: {len(models_affected)} downstream model(s) affected." elif len(models_affected) <= 10: result["message"] = f"Medium impact: {len(models_affected)} downstream models affected." else: result["message"] = f"High impact: {len(models_affected)} downstream models affected. Consider incremental changes." return result
- src/dbt_core_mcp/server.py:1434-1477 (registration)MCP tool registration for 'analyze_impact' using FastMCP @app.tool() decorator. Defines input schema (name: str required, resource_type: str optional), comprehensive docstring with usage examples, ensures server initialization, and delegates to toolImpl_analyze_impact.async def analyze_impact( ctx: Context, name: str, resource_type: str | None = None, ) -> dict[str, Any]: """Analyze the impact of changing any dbt resource with auto-detection. This unified tool works across all resource types (models, sources, seeds, snapshots, etc.) showing all downstream dependencies that would be affected by changes. Provides actionable recommendations for running affected resources. Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" Examples: "stg_customers", "jaffle_shop.orders", "raw_customers" resource_type: Optional filter to narrow search: - "model": Data transformation models - "source": External data sources - "seed": CSV reference data files - "snapshot": SCD Type 2 historical tables - "test": Data quality tests - "analysis": Ad-hoc analysis queries - None: Auto-detect (searches all types) Returns: Impact analysis with: - List of affected models by distance - Count of affected tests and other resources - Total impact statistics - Resources grouped by distance from changed resource - Recommended dbt command to run affected resources - Human-readable impact assessment message If multiple matches found, returns all matches for LLM to process. Raises: ValueError: If resource not found Examples: analyze_impact("stg_customers") -> auto-detect and show impact analyze_impact("jaffle_shop.orders", "source") -> impact of source change analyze_impact("raw_customers", "seed") -> impact of seed data change """ await self._ensure_initialized_with_context(ctx) return await self.toolImpl_analyze_impact(name, resource_type)
- src/dbt_core_mcp/server.py:605-610 (handler)Server-side toolImpl handler that initializes dbt components if needed and delegates core logic to ManifestLoader.analyze_impact, with error wrapping.async def toolImpl_analyze_impact(self, name: str, resource_type: str | None = None) -> dict[str, Any]: """Implementation for analyze_impact tool.""" try: return self.manifest.analyze_impact(name, resource_type) # type: ignore except ValueError as e: raise ValueError(f"Impact analysis error: {e}")
- Recursive helper function to compute all downstream dependents from the manifest's child_map, tracking distance and avoiding cycles with seen set. Critical for impact analysis.def get_downstream_nodes(self, unique_id: str, max_depth: int | None = None, current_depth: int = 0) -> list[dict[str, Any]]: """Get all downstream dependents of a node recursively. Args: unique_id: The unique identifier of the node max_depth: Maximum depth to traverse (None for unlimited) current_depth: Current recursion depth (internal use) Returns: List of dictionaries with downstream node info: {"unique_id": str, "name": str, "type": str, "distance": int} """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") if max_depth is not None and current_depth >= max_depth: return [] child_map = self._manifest.get("child_map", {}) children = child_map.get(unique_id, []) downstream: list[dict[str, Any]] = [] seen: set[str] = set() for child_id in children: if child_id in seen: continue seen.add(child_id) node = self.get_node_by_unique_id(child_id) if node: resource_type = node.get("resource_type", "unknown") downstream.append( { "unique_id": child_id, "name": node.get("name", ""), "type": resource_type, "distance": current_depth + 1, } ) # Recurse if max_depth is None or current_depth + 1 < max_depth: grandchildren = self.get_downstream_nodes(child_id, max_depth, current_depth + 1) for gc in grandchildren: if gc["unique_id"] not in seen: seen.add(str(gc["unique_id"])) downstream.append(gc) return downstream
- Helper to locate target resource by name across all types (models/sources/etc.), supports source_name.table format and table_name fallback, returns multiple_matches dict if ambiguous for LLM handling.def get_resource_node(self, name: str, resource_type: str | None = None) -> dict[str, Any]: """ Get a resource node by name with auto-detection across all resource types. This method searches for resources across models, sources, seeds, snapshots, tests, etc. Designed for LLM consumption - returns all matches when ambiguous rather than raising errors. Args: name: Resource name. For sources, can be "source_name.table_name" or just "table_name" resource_type: Optional filter (model, source, seed, snapshot, test, analysis). If None, searches all types. Returns: Single resource dict if exactly one match found, or dict with multiple_matches=True containing all matching resources for LLM to process. Raises: RuntimeError: If manifest not loaded ValueError: If resource not found (only case that raises) Examples: get_resource_node("customers") -> single model dict get_resource_node("customers", "source") -> single source dict get_resource_node("customers") with multiple matches -> {"multiple_matches": True, ...} """ if not self._manifest: raise RuntimeError("Manifest not loaded. Call load() first.") # Validate resource_type if provided valid_types = {"model", "source", "seed", "snapshot", "test", "analysis"} if resource_type is not None and resource_type not in valid_types: raise ValueError(f"Invalid resource_type '{resource_type}'. Must be one of: {', '.join(sorted(valid_types))}") matches: list[dict[str, Any]] = [] # For sources, try "source_name.table_name" format first if "." in name and (resource_type is None or resource_type == "source"): parts = name.split(".", 1) if len(parts) == 2: # Search sources dict directly sources_dict = self._manifest.get("sources", {}) for _, source in sources_dict.items(): if isinstance(source, dict) and source.get("source_name") == parts[0] and source.get("name") == parts[1]: matches.append(dict(source)) break # Search nodes (models, tests, snapshots, seeds, analyses, etc.) nodes = self._manifest.get("nodes", {}) for unique_id, node in nodes.items(): if not isinstance(node, dict): continue node_type = node.get("resource_type") node_name = node.get("name") # Type filter if specified if resource_type is not None and node_type != resource_type: continue if node_name == name: matches.append(dict(node)) # Search sources by table name only (fallback when no dot in name) if resource_type is None or resource_type == "source": sources = self._manifest.get("sources", {}) for unique_id, source in sources.items(): if not isinstance(source, dict): continue if source.get("name") == name: # Avoid duplicates if already matched via source_name.table_name if not any(m.get("unique_id") == unique_id for m in matches): matches.append(dict(source)) # Handle results based on match count if len(matches) == 0: type_hint = f" of type '{resource_type}'" if resource_type else "" raise ValueError(f"Resource '{name}'{type_hint} not found in manifest") elif len(matches) == 1: # Single match - return the resource directly return matches[0] else: # Multiple matches - return all with metadata for LLM to process return { "multiple_matches": True, "name": name, "match_count": len(matches), "matches": matches, "message": f"Found {len(matches)} resources named '{name}'. Returning all matches for context.", }