Skip to main content
Glama
NiclasOlofsson

DBT Core MCP Server

run_models

Execute dbt models by compiling SQL and running against databases. Choose modified models, downstream dependencies, or custom selections to update data transformations efficiently.

Instructions

Run dbt models (compile SQL and execute against database).

State-based selection modes (uses dbt state:modified selector):

  • select_state_modified: Run only models modified since last successful run (state:modified)

  • select_state_modified_plus_downstream: Run modified + downstream dependencies (state:modified+) Note: Requires select_state_modified=True

Manual selection (alternative to state-based):

  • select: dbt selector syntax (e.g., "customers", "tag:mart", "stg_*")

  • exclude: Exclude specific models

Args: select: Manual selector (e.g., "customers", "tag:mart", "path:marts/*") exclude: Exclude selector (e.g., "tag:deprecated") select_state_modified: Use state:modified selector (changed models only) select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream) full_refresh: Force full refresh of incremental models fail_fast: Stop execution on first failure check_schema_changes: Detect schema changes and recommend downstream runs

Returns: Execution results with status, models run, timing info, and optional schema_changes

Examples: - run_models(select="customers") - Run specific model - run_models(select_state_modified=True) - Run only what changed - run_models(select_state_modified=True, select_state_modified_plus_downstream=True) - Run changed + downstream - run_models(select="tag:mart", full_refresh=True) - Full refresh marts - run_models(select_state_modified=True, check_schema_changes=True) - Detect schema changes

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
selectNo
excludeNo
select_state_modifiedNo
select_state_modified_plus_downstreamNo
full_refreshNo
fail_fastNo
check_schema_changesNo

Implementation Reference

  • The core handler function that implements the run_models tool logic. It handles state-based selection (modified models), builds dbt run commands, executes via BridgeRunner, parses results, detects schema changes, saves state for future runs, and supports progress reporting.
    async def toolImpl_run_models(
        self,
        ctx: Context | None,
        select: str | None = None,
        exclude: str | None = None,
        select_state_modified: bool = False,
        select_state_modified_plus_downstream: bool = False,
        full_refresh: bool = False,
        fail_fast: bool = False,
        check_schema_changes: bool = False,
    ) -> dict[str, Any]:
        """Implementation for run_models tool."""
        # Prepare state-based selection (validates and returns selector)
        selector = await self._prepare_state_based_selection(select_state_modified, select_state_modified_plus_downstream, select)
    
        # Early return if state-based requested but no state exists
        if select_state_modified and not selector:
            return {
                "status": "success",
                "message": "No previous state found - cannot determine modifications",
                "results": [],
                "elapsed_time": 0,
            }
    
        # Build command args
        args = ["run"]
    
        # Add selector if we have one (state-based or manual)
        if selector:
            args.extend(["-s", selector, "--state", "target/state_last_run"])
        elif select:
            args.extend(["-s", select])
    
        if exclude:
            args.extend(["--exclude", exclude])
    
        if full_refresh:
            args.append("--full-refresh")
    
        if fail_fast:
            args.append("--fail-fast")
    
        # Capture pre-run table columns for schema change detection
        # Also get expected count of models for progress reporting
        pre_run_columns: dict[str, list[str]] = {}
        expected_total: int | None = None
    
        if check_schema_changes or True:  # Always get count for progress
            # Use dbt list to get models that will be run (without actually running them)
            list_args = ["list", "--resource-type", "model", "--output", "name"]
    
            if select_state_modified:
                selector = "state:modified+" if select_state_modified_plus_downstream else "state:modified"
                list_args.extend(["-s", selector, "--state", "target/state_last_run"])
            elif select:
                list_args.extend(["-s", select])
    
            if exclude:
                list_args.extend(["--exclude", exclude])
    
            # Get list of models
            logger.info(f"Getting model list: {list_args}")
            list_result = await self.runner.invoke(list_args)  # type: ignore
    
            if list_result.success and list_result.stdout:
                model_count = 0
                # Parse model names from output (one per line with --output name)
                for line in list_result.stdout.strip().split("\n"):
                    line = line.strip()
                    # Skip log lines, timestamps, empty lines, and JSON output
                    if (
                        not line
                        or line.startswith("{")
                        or ":" in line[:10]  # Timestamp like "07:39:44"
                        or "Running with dbt=" in line
                        or "Registered adapter:" in line
                    ):
                        continue
                    model_count += 1
    
                    # For schema change detection, query pre-run columns
                    if check_schema_changes:
                        model_name = line
                        logger.info(f"Querying pre-run columns for {model_name}")
                        cols = await self._get_table_columns_from_db(model_name)
                        if cols:
                            pre_run_columns[model_name] = cols
                        else:
                            # Table doesn't exist yet - mark as new
                            pre_run_columns[model_name] = []
    
                # Set expected total from model count
                if model_count > 0:
                    expected_total = model_count
                    logger.info(f"Expected total models to run: {expected_total}")
    
        # Execute with progress reporting
        logger.info(f"Running dbt models with args: {args}")
        logger.info(f"Expected total for progress: {expected_total}")
    
        # Define progress callback if context available
        async def progress_callback(current: int, total: int, message: str) -> None:
            if ctx:
                await ctx.report_progress(progress=current, total=total, message=message)
    
        result = await self.runner.invoke(args, progress_callback=progress_callback if ctx else None, expected_total=expected_total)  # type: ignore
    
        if not result.success:
            error_msg = str(result.exception) if result.exception else "Run failed"
            response = {
                "status": "error",
                "message": error_msg,
                "command": " ".join(args),
            }
            # Include dbt output for debugging
            if result.stdout:
                response["dbt_output"] = result.stdout
            if result.stderr:
                response["stderr"] = result.stderr
            return response
    
        # Parse run_results.json for details
        run_results = self._parse_run_results()
    
        # Check for schema changes if requested
        schema_changes: dict[str, dict[str, list[str]]] = {}
        if check_schema_changes and pre_run_columns:
            logger.info("Detecting schema changes by comparing pre/post-run database columns")
    
            for model_name, old_columns in pre_run_columns.items():
                # Query post-run columns from database
                new_columns = await self._get_table_columns_from_db(model_name)
    
                if not new_columns:
                    # Model failed to build or was skipped
                    continue
    
                # Compare columns
                added = [c for c in new_columns if c not in old_columns]
                removed = [c for c in old_columns if c not in new_columns] if old_columns else []
    
                if added or removed:
                    schema_changes[model_name] = {}
                    if added:
                        schema_changes[model_name]["added"] = added
                    if removed:
                        schema_changes[model_name]["removed"] = removed
    
        # Save state on success for next modified run
        if result.success and self.project_dir:
            state_dir = self.project_dir / "target" / "state_last_run"
            state_dir.mkdir(parents=True, exist_ok=True)
            manifest_path = self.runner.get_manifest_path()  # type: ignore
            shutil.copy(manifest_path, state_dir / "manifest.json")
    
        response: dict[str, Any] = {
            "status": "success",
            "command": " ".join(args),
            "results": run_results.get("results", []),
            "elapsed_time": run_results.get("elapsed_time"),
        }
    
        if schema_changes:
            response["schema_changes"] = schema_changes
            response["recommendation"] = "Schema changes detected. Consider running downstream models with modified_downstream=True to propagate changes."
    
        return response
  • Registers the 'run_models' tool with FastMCP using @app.tool() decorator. Defines the tool's input parameters, comprehensive docstring describing usage, and delegates to toolImpl_run_models.
    async def run_models(
        ctx: Context,
        select: str | None = None,
        exclude: str | None = None,
        select_state_modified: bool = False,
        select_state_modified_plus_downstream: bool = False,
        full_refresh: bool = False,
        fail_fast: bool = False,
        check_schema_changes: bool = False,
    ) -> dict[str, Any]:
        """Run dbt models (compile SQL and execute against database).
    
        State-based selection modes (uses dbt state:modified selector):
        - select_state_modified: Run only models modified since last successful run (state:modified)
        - select_state_modified_plus_downstream: Run modified + downstream dependencies (state:modified+)
          Note: Requires select_state_modified=True
    
        Manual selection (alternative to state-based):
        - select: dbt selector syntax (e.g., "customers", "tag:mart", "stg_*")
        - exclude: Exclude specific models
    
        Args:
            select: Manual selector (e.g., "customers", "tag:mart", "path:marts/*")
            exclude: Exclude selector (e.g., "tag:deprecated")
            select_state_modified: Use state:modified selector (changed models only)
            select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream)
            full_refresh: Force full refresh of incremental models
            fail_fast: Stop execution on first failure
            check_schema_changes: Detect schema changes and recommend downstream runs
    
        Returns:
            Execution results with status, models run, timing info, and optional schema_changes
    
        Examples:
            - run_models(select="customers") - Run specific model
            - run_models(select_state_modified=True) - Run only what changed
            - run_models(select_state_modified=True, select_state_modified_plus_downstream=True) - Run changed + downstream
            - run_models(select="tag:mart", full_refresh=True) - Full refresh marts
            - run_models(select_state_modified=True, check_schema_changes=True) - Detect schema changes
        """
        await self._ensure_initialized_with_context(ctx)
        return await self.toolImpl_run_models(ctx, select, exclude, select_state_modified, select_state_modified_plus_downstream, full_refresh, fail_fast, check_schema_changes)
  • Helper function that validates parameters and prepares dbt state-based selectors (state:modified or state:modified+) for efficient incremental runs.
    async def _prepare_state_based_selection(
        self,
        select_state_modified: bool,
        select_state_modified_plus_downstream: bool,
        select: str | None,
    ) -> str | None:
        """Validate and prepare state-based selection.
    
        Args:
            select_state_modified: Use state:modified selector
            select_state_modified_plus_downstream: Extend to state:modified+
            select: Manual selector (conflicts with state-based)
    
        Returns:
            The dbt selector string to use ("state:modified" or "state:modified+"), or None if:
            - Not using state-based selection
            - No previous state exists (cannot determine modifications)
    
        Raises:
            ValueError: If validation fails
        """
        # Validate: hierarchical requirement
        if select_state_modified_plus_downstream and not select_state_modified:
            raise ValueError("select_state_modified_plus_downstream requires select_state_modified=True")
    
        # Validate: can't use both state-based and manual selection
        if select_state_modified and select:
            raise ValueError("Cannot use both select_state_modified* flags and select parameter")
    
        # If not using state-based selection, return None
        if not select_state_modified:
            return None
    
        # Check if state exists
        state_dir = self.project_dir / "target" / "state_last_run"  # type: ignore
        if not state_dir.exists():
            # No state - cannot determine modifications
            return None
    
        # Return selector (state exists)
        return "state:modified+" if select_state_modified_plus_downstream else "state:modified"
  • Helper that parses dbt's run_results.json into a simplified dictionary format for tool responses.
    def _parse_run_results(self) -> dict[str, Any]:
        """Parse target/run_results.json after dbt run/test/build.
    
        Returns:
            Dictionary with results array and metadata
        """
        if not self.project_dir:
            return {"results": [], "elapsed_time": 0}
    
        run_results_path = self.project_dir / "target" / "run_results.json"
        if not run_results_path.exists():
            return {"results": [], "elapsed_time": 0}
    
        try:
            with open(run_results_path) as f:
                data = json.load(f)
    
            # Simplify results for output
            simplified_results = []
            for result in data.get("results", []):
                simplified_results.append(
                    {
                        "unique_id": result.get("unique_id"),
                        "status": result.get("status"),
                        "message": result.get("message"),
                        "execution_time": result.get("execution_time"),
                        "failures": result.get("failures"),
                    }
                )
    
            return {
                "results": simplified_results,
                "elapsed_time": data.get("elapsed_time", 0),
            }
        except Exception as e:
            logger.warning(f"Failed to parse run_results.json: {e}")
            return {"results": [], "elapsed_time": 0}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NiclasOlofsson/dbt-core-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server