Skip to main content
Glama
NiclasOlofsson

DBT Core MCP Server

analyze_impact

Analyze downstream dependencies affected by changes to dbt resources like models, sources, or seeds, providing actionable recommendations for running impacted components.

Instructions

Analyze the impact of changing any dbt resource with auto-detection.

This unified tool works across all resource types (models, sources, seeds, snapshots, etc.) showing all downstream dependencies that would be affected by changes. Provides actionable recommendations for running affected resources.

Args: name: Resource name. For sources, use "source_name.table_name" or just "table_name" Examples: "stg_customers", "jaffle_shop.orders", "raw_customers" resource_type: Optional filter to narrow search: - "model": Data transformation models - "source": External data sources - "seed": CSV reference data files - "snapshot": SCD Type 2 historical tables - "test": Data quality tests - "analysis": Ad-hoc analysis queries - None: Auto-detect (searches all types)

Returns: Impact analysis with: - List of affected models by distance - Count of affected tests and other resources - Total impact statistics - Resources grouped by distance from changed resource - Recommended dbt command to run affected resources - Human-readable impact assessment message If multiple matches found, returns all matches for LLM to process.

Raises: ValueError: If resource not found

Examples: analyze_impact("stg_customers") -> auto-detect and show impact analyze_impact("jaffle_shop.orders", "source") -> impact of source change analyze_impact("raw_customers", "seed") -> impact of seed data change

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
nameYes
resource_typeNo

Implementation Reference

  • Core handler implementing the analyze_impact tool logic: locates the target resource, traverses all downstream dependencies using the manifest's child_map, categorizes affected models/tests/others by distance, sorts by distance/name, generates type-specific dbt run recommendations, and provides impact assessment message.
    def analyze_impact(
        self,
        name: str,
        resource_type: str | None = None,
    ) -> dict[str, Any]:
        """
        Analyze the impact of changing a resource across all resource types.
    
        Shows all downstream dependencies that would be affected by changes,
        including models, tests, and other resources. Provides actionable
        recommendations for running affected resources.
    
        Args:
            name: Resource name. For sources, use "source_name.table_name" or just "table_name"
            resource_type: Optional filter (model, source, seed, snapshot, test, analysis).
                          If None, auto-detects resource type.
    
        Returns:
            Dictionary with impact analysis:
            {
                "resource": {...},  # The target resource info
                "impact": {
                    "models_affected": [...],  # Downstream models by distance
                    "models_affected_count": int,
                    "tests_affected_count": int,
                    "other_affected_count": int,
                    "total_affected": int
                },
                "affected_by_distance": {
                    "1": [...],  # Immediate dependents
                    "2": [...],  # Second-level dependents
                    ...
                },
                "recommendation": str,  # Suggested dbt command
                "message": str  # Human-readable impact assessment
            }
    
            If multiple matches found, returns:
            {"multiple_matches": True, "matches": [...], "message": "..."}
    
        Raises:
            RuntimeError: If manifest not loaded
            ValueError: If resource not found
    
        Examples:
            analyze_impact("stg_customers") -> impact of changing staging model
            analyze_impact("jaffle_shop.orders", "source") -> impact of source change
            analyze_impact("raw_customers", "seed") -> impact of seed change
        """
        if not self._manifest:
            raise RuntimeError("Manifest not loaded. Call load() first.")
    
        # Get the resource (auto-detect if resource_type not specified)
        resource = self.get_resource_node(name, resource_type)
    
        # Handle multiple matches - return for LLM to process
        if resource.get("multiple_matches"):
            return resource
    
        # Extract unique_id for impact traversal
        unique_id = resource.get("unique_id")
        if not unique_id:
            raise ValueError(f"Resource '{name}' does not have a unique_id")
    
        # Get all downstream dependencies (no depth limit for impact)
        downstream = self.get_downstream_nodes(unique_id, max_depth=None)
    
        # Categorize by resource type
        models_affected: list[dict[str, Any]] = []
        tests_affected: list[dict[str, Any]] = []
        other_affected: list[dict[str, Any]] = []
        affected_by_distance: dict[str, list[dict[str, Any]]] = {}
    
        for dep in downstream:
            dep_type = str(dep["type"])
            distance = str(dep["distance"])
    
            # Group by distance
            if distance not in affected_by_distance:
                affected_by_distance[distance] = []
            affected_by_distance[distance].append(dep)
    
            # Categorize by type
            if dep_type == "model":
                models_affected.append(dep)
            elif dep_type == "test":
                tests_affected.append(dep)
            else:
                other_affected.append(dep)
    
        # Sort models by distance for better readability
        models_affected_sorted = sorted(models_affected, key=lambda x: (int(x["distance"]), str(x["name"])))
    
        # Build recommendation based on resource type
        resource_name = resource.get("name", name)
        current_resource_type = resource.get("resource_type")
    
        if current_resource_type == "source":
            # For sources, recommend running downstream models
            if len(models_affected) == 0:
                recommendation = f"dbt test -s source:{resource.get('source_name')}.{resource_name}"
            else:
                recommendation = f"dbt run -s {resource_name}+"
        elif current_resource_type == "seed":
            # For seeds, recommend seeding + downstream
            if len(models_affected) == 0:
                recommendation = f"dbt seed -s {resource_name} && dbt test -s {resource_name}"
            else:
                recommendation = f"dbt seed -s {resource_name} && dbt run -s {resource_name}+"
        else:
            # For models, snapshots, etc.
            if len(models_affected) == 0:
                recommendation = f"dbt run -s {resource_name}"
            else:
                recommendation = f"dbt run -s {resource_name}+"
    
        # Build result
        result: dict[str, Any] = {
            "resource": {
                "name": resource_name,
                "unique_id": unique_id,
                "resource_type": current_resource_type,
                "package_name": resource.get("package_name"),
            },
            "impact": {
                "models_affected": models_affected_sorted,
                "models_affected_count": len(models_affected),
                "tests_affected_count": len(tests_affected),
                "other_affected_count": len(other_affected),
                "total_affected": len(downstream),
            },
            "affected_by_distance": affected_by_distance,
            "recommendation": recommendation,
        }
    
        # Add helpful message based on impact size
        if len(models_affected) == 0:
            result["message"] = "No downstream models affected. Only this resource needs to be run/tested."
        elif len(models_affected) <= 3:
            result["message"] = f"Low impact: {len(models_affected)} downstream model(s) affected."
        elif len(models_affected) <= 10:
            result["message"] = f"Medium impact: {len(models_affected)} downstream models affected."
        else:
            result["message"] = f"High impact: {len(models_affected)} downstream models affected. Consider incremental changes."
    
        return result
  • MCP tool registration for 'analyze_impact' using FastMCP @app.tool() decorator. Defines input schema (name: str required, resource_type: str optional), comprehensive docstring with usage examples, ensures server initialization, and delegates to toolImpl_analyze_impact.
    async def analyze_impact(
        ctx: Context,
        name: str,
        resource_type: str | None = None,
    ) -> dict[str, Any]:
        """Analyze the impact of changing any dbt resource with auto-detection.
    
        This unified tool works across all resource types (models, sources, seeds, snapshots, etc.)
        showing all downstream dependencies that would be affected by changes. Provides actionable
        recommendations for running affected resources.
    
        Args:
            name: Resource name. For sources, use "source_name.table_name" or just "table_name"
                Examples: "stg_customers", "jaffle_shop.orders", "raw_customers"
            resource_type: Optional filter to narrow search:
                - "model": Data transformation models
                - "source": External data sources
                - "seed": CSV reference data files
                - "snapshot": SCD Type 2 historical tables
                - "test": Data quality tests
                - "analysis": Ad-hoc analysis queries
                - None: Auto-detect (searches all types)
    
        Returns:
            Impact analysis with:
            - List of affected models by distance
            - Count of affected tests and other resources
            - Total impact statistics
            - Resources grouped by distance from changed resource
            - Recommended dbt command to run affected resources
            - Human-readable impact assessment message
            If multiple matches found, returns all matches for LLM to process.
    
        Raises:
            ValueError: If resource not found
    
        Examples:
            analyze_impact("stg_customers") -> auto-detect and show impact
            analyze_impact("jaffle_shop.orders", "source") -> impact of source change
            analyze_impact("raw_customers", "seed") -> impact of seed data change
        """
        await self._ensure_initialized_with_context(ctx)
        return await self.toolImpl_analyze_impact(name, resource_type)
  • Server-side toolImpl handler that initializes dbt components if needed and delegates core logic to ManifestLoader.analyze_impact, with error wrapping.
    async def toolImpl_analyze_impact(self, name: str, resource_type: str | None = None) -> dict[str, Any]:
        """Implementation for analyze_impact tool."""
        try:
            return self.manifest.analyze_impact(name, resource_type)  # type: ignore
        except ValueError as e:
            raise ValueError(f"Impact analysis error: {e}")
  • Recursive helper function to compute all downstream dependents from the manifest's child_map, tracking distance and avoiding cycles with seen set. Critical for impact analysis.
    def get_downstream_nodes(self, unique_id: str, max_depth: int | None = None, current_depth: int = 0) -> list[dict[str, Any]]:
        """Get all downstream dependents of a node recursively.
    
        Args:
            unique_id: The unique identifier of the node
            max_depth: Maximum depth to traverse (None for unlimited)
            current_depth: Current recursion depth (internal use)
    
        Returns:
            List of dictionaries with downstream node info:
            {"unique_id": str, "name": str, "type": str, "distance": int}
        """
        if not self._manifest:
            raise RuntimeError("Manifest not loaded. Call load() first.")
    
        if max_depth is not None and current_depth >= max_depth:
            return []
    
        child_map = self._manifest.get("child_map", {})
        children = child_map.get(unique_id, [])
    
        downstream: list[dict[str, Any]] = []
        seen: set[str] = set()
    
        for child_id in children:
            if child_id in seen:
                continue
            seen.add(child_id)
    
            node = self.get_node_by_unique_id(child_id)
            if node:
                resource_type = node.get("resource_type", "unknown")
                downstream.append(
                    {
                        "unique_id": child_id,
                        "name": node.get("name", ""),
                        "type": resource_type,
                        "distance": current_depth + 1,
                    }
                )
    
                # Recurse
                if max_depth is None or current_depth + 1 < max_depth:
                    grandchildren = self.get_downstream_nodes(child_id, max_depth, current_depth + 1)
                    for gc in grandchildren:
                        if gc["unique_id"] not in seen:
                            seen.add(str(gc["unique_id"]))
                            downstream.append(gc)
    
        return downstream
  • Helper to locate target resource by name across all types (models/sources/etc.), supports source_name.table format and table_name fallback, returns multiple_matches dict if ambiguous for LLM handling.
    def get_resource_node(self, name: str, resource_type: str | None = None) -> dict[str, Any]:
        """
        Get a resource node by name with auto-detection across all resource types.
    
        This method searches for resources across models, sources, seeds, snapshots, tests, etc.
        Designed for LLM consumption - returns all matches when ambiguous rather than raising errors.
    
        Args:
            name: Resource name. For sources, can be "source_name.table_name" or just "table_name"
            resource_type: Optional filter (model, source, seed, snapshot, test, analysis).
                          If None, searches all types.
    
        Returns:
            Single resource dict if exactly one match found, or dict with multiple_matches=True
            containing all matching resources for LLM to process.
    
        Raises:
            RuntimeError: If manifest not loaded
            ValueError: If resource not found (only case that raises)
    
        Examples:
            get_resource_node("customers") -> single model dict
            get_resource_node("customers", "source") -> single source dict
            get_resource_node("customers") with multiple matches -> {"multiple_matches": True, ...}
        """
        if not self._manifest:
            raise RuntimeError("Manifest not loaded. Call load() first.")
    
        # Validate resource_type if provided
        valid_types = {"model", "source", "seed", "snapshot", "test", "analysis"}
        if resource_type is not None and resource_type not in valid_types:
            raise ValueError(f"Invalid resource_type '{resource_type}'. Must be one of: {', '.join(sorted(valid_types))}")
    
        matches: list[dict[str, Any]] = []
    
        # For sources, try "source_name.table_name" format first
        if "." in name and (resource_type is None or resource_type == "source"):
            parts = name.split(".", 1)
            if len(parts) == 2:
                # Search sources dict directly
                sources_dict = self._manifest.get("sources", {})
                for _, source in sources_dict.items():
                    if isinstance(source, dict) and source.get("source_name") == parts[0] and source.get("name") == parts[1]:
                        matches.append(dict(source))
                        break
    
        # Search nodes (models, tests, snapshots, seeds, analyses, etc.)
        nodes = self._manifest.get("nodes", {})
        for unique_id, node in nodes.items():
            if not isinstance(node, dict):
                continue
    
            node_type = node.get("resource_type")
            node_name = node.get("name")
    
            # Type filter if specified
            if resource_type is not None and node_type != resource_type:
                continue
    
            if node_name == name:
                matches.append(dict(node))
    
        # Search sources by table name only (fallback when no dot in name)
        if resource_type is None or resource_type == "source":
            sources = self._manifest.get("sources", {})
            for unique_id, source in sources.items():
                if not isinstance(source, dict):
                    continue
    
                if source.get("name") == name:
                    # Avoid duplicates if already matched via source_name.table_name
                    if not any(m.get("unique_id") == unique_id for m in matches):
                        matches.append(dict(source))
    
        # Handle results based on match count
        if len(matches) == 0:
            type_hint = f" of type '{resource_type}'" if resource_type else ""
            raise ValueError(f"Resource '{name}'{type_hint} not found in manifest")
        elif len(matches) == 1:
            # Single match - return the resource directly
            return matches[0]
        else:
            # Multiple matches - return all with metadata for LLM to process
            return {
                "multiple_matches": True,
                "name": name,
                "match_count": len(matches),
                "matches": matches,
                "message": f"Found {len(matches)} resources named '{name}'. Returning all matches for context.",
            }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NiclasOlofsson/dbt-core-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server