run_models
Execute dbt models by compiling SQL and running against databases. Choose modified models, downstream dependencies, or custom selections to update data transformations efficiently.
Instructions
Run dbt models (compile SQL and execute against database).
State-based selection modes (uses dbt state:modified selector):
select_state_modified: Run only models modified since last successful run (state:modified)
select_state_modified_plus_downstream: Run modified + downstream dependencies (state:modified+) Note: Requires select_state_modified=True
Manual selection (alternative to state-based):
select: dbt selector syntax (e.g., "customers", "tag:mart", "stg_*")
exclude: Exclude specific models
Args: select: Manual selector (e.g., "customers", "tag:mart", "path:marts/*") exclude: Exclude selector (e.g., "tag:deprecated") select_state_modified: Use state:modified selector (changed models only) select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream) full_refresh: Force full refresh of incremental models fail_fast: Stop execution on first failure check_schema_changes: Detect schema changes and recommend downstream runs
Returns: Execution results with status, models run, timing info, and optional schema_changes
Examples: - run_models(select="customers") - Run specific model - run_models(select_state_modified=True) - Run only what changed - run_models(select_state_modified=True, select_state_modified_plus_downstream=True) - Run changed + downstream - run_models(select="tag:mart", full_refresh=True) - Full refresh marts - run_models(select_state_modified=True, check_schema_changes=True) - Detect schema changes
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| select | No | ||
| exclude | No | ||
| select_state_modified | No | ||
| select_state_modified_plus_downstream | No | ||
| full_refresh | No | ||
| fail_fast | No | ||
| check_schema_changes | No |
Implementation Reference
- src/dbt_core_mcp/server.py:786-952 (handler)The core handler function that implements the run_models tool logic. It handles state-based selection (modified models), builds dbt run commands, executes via BridgeRunner, parses results, detects schema changes, saves state for future runs, and supports progress reporting.async def toolImpl_run_models( self, ctx: Context | None, select: str | None = None, exclude: str | None = None, select_state_modified: bool = False, select_state_modified_plus_downstream: bool = False, full_refresh: bool = False, fail_fast: bool = False, check_schema_changes: bool = False, ) -> dict[str, Any]: """Implementation for run_models tool.""" # Prepare state-based selection (validates and returns selector) selector = await self._prepare_state_based_selection(select_state_modified, select_state_modified_plus_downstream, select) # Early return if state-based requested but no state exists if select_state_modified and not selector: return { "status": "success", "message": "No previous state found - cannot determine modifications", "results": [], "elapsed_time": 0, } # Build command args args = ["run"] # Add selector if we have one (state-based or manual) if selector: args.extend(["-s", selector, "--state", "target/state_last_run"]) elif select: args.extend(["-s", select]) if exclude: args.extend(["--exclude", exclude]) if full_refresh: args.append("--full-refresh") if fail_fast: args.append("--fail-fast") # Capture pre-run table columns for schema change detection # Also get expected count of models for progress reporting pre_run_columns: dict[str, list[str]] = {} expected_total: int | None = None if check_schema_changes or True: # Always get count for progress # Use dbt list to get models that will be run (without actually running them) list_args = ["list", "--resource-type", "model", "--output", "name"] if select_state_modified: selector = "state:modified+" if select_state_modified_plus_downstream else "state:modified" list_args.extend(["-s", selector, "--state", "target/state_last_run"]) elif select: list_args.extend(["-s", select]) if exclude: list_args.extend(["--exclude", exclude]) # Get list of models logger.info(f"Getting model list: {list_args}") list_result = await self.runner.invoke(list_args) # type: ignore if list_result.success and list_result.stdout: model_count = 0 # Parse model names from output (one per line with --output name) for line in list_result.stdout.strip().split("\n"): line = line.strip() # Skip log lines, timestamps, empty lines, and JSON output if ( not line or line.startswith("{") or ":" in line[:10] # Timestamp like "07:39:44" or "Running with dbt=" in line or "Registered adapter:" in line ): continue model_count += 1 # For schema change detection, query pre-run columns if check_schema_changes: model_name = line logger.info(f"Querying pre-run columns for {model_name}") cols = await self._get_table_columns_from_db(model_name) if cols: pre_run_columns[model_name] = cols else: # Table doesn't exist yet - mark as new pre_run_columns[model_name] = [] # Set expected total from model count if model_count > 0: expected_total = model_count logger.info(f"Expected total models to run: {expected_total}") # Execute with progress reporting logger.info(f"Running dbt models with args: {args}") logger.info(f"Expected total for progress: {expected_total}") # Define progress callback if context available async def progress_callback(current: int, total: int, message: str) -> None: if ctx: await ctx.report_progress(progress=current, total=total, message=message) result = await self.runner.invoke(args, progress_callback=progress_callback if ctx else None, expected_total=expected_total) # type: ignore if not result.success: error_msg = str(result.exception) if result.exception else "Run failed" response = { "status": "error", "message": error_msg, "command": " ".join(args), } # Include dbt output for debugging if result.stdout: response["dbt_output"] = result.stdout if result.stderr: response["stderr"] = result.stderr return response # Parse run_results.json for details run_results = self._parse_run_results() # Check for schema changes if requested schema_changes: dict[str, dict[str, list[str]]] = {} if check_schema_changes and pre_run_columns: logger.info("Detecting schema changes by comparing pre/post-run database columns") for model_name, old_columns in pre_run_columns.items(): # Query post-run columns from database new_columns = await self._get_table_columns_from_db(model_name) if not new_columns: # Model failed to build or was skipped continue # Compare columns added = [c for c in new_columns if c not in old_columns] removed = [c for c in old_columns if c not in new_columns] if old_columns else [] if added or removed: schema_changes[model_name] = {} if added: schema_changes[model_name]["added"] = added if removed: schema_changes[model_name]["removed"] = removed # Save state on success for next modified run if result.success and self.project_dir: state_dir = self.project_dir / "target" / "state_last_run" state_dir.mkdir(parents=True, exist_ok=True) manifest_path = self.runner.get_manifest_path() # type: ignore shutil.copy(manifest_path, state_dir / "manifest.json") response: dict[str, Any] = { "status": "success", "command": " ".join(args), "results": run_results.get("results", []), "elapsed_time": run_results.get("elapsed_time"), } if schema_changes: response["schema_changes"] = schema_changes response["recommendation"] = "Schema changes detected. Consider running downstream models with modified_downstream=True to propagate changes." return response
- src/dbt_core_mcp/server.py:1527-1569 (registration)Registers the 'run_models' tool with FastMCP using @app.tool() decorator. Defines the tool's input parameters, comprehensive docstring describing usage, and delegates to toolImpl_run_models.async def run_models( ctx: Context, select: str | None = None, exclude: str | None = None, select_state_modified: bool = False, select_state_modified_plus_downstream: bool = False, full_refresh: bool = False, fail_fast: bool = False, check_schema_changes: bool = False, ) -> dict[str, Any]: """Run dbt models (compile SQL and execute against database). State-based selection modes (uses dbt state:modified selector): - select_state_modified: Run only models modified since last successful run (state:modified) - select_state_modified_plus_downstream: Run modified + downstream dependencies (state:modified+) Note: Requires select_state_modified=True Manual selection (alternative to state-based): - select: dbt selector syntax (e.g., "customers", "tag:mart", "stg_*") - exclude: Exclude specific models Args: select: Manual selector (e.g., "customers", "tag:mart", "path:marts/*") exclude: Exclude selector (e.g., "tag:deprecated") select_state_modified: Use state:modified selector (changed models only) select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream) full_refresh: Force full refresh of incremental models fail_fast: Stop execution on first failure check_schema_changes: Detect schema changes and recommend downstream runs Returns: Execution results with status, models run, timing info, and optional schema_changes Examples: - run_models(select="customers") - Run specific model - run_models(select_state_modified=True) - Run only what changed - run_models(select_state_modified=True, select_state_modified_plus_downstream=True) - Run changed + downstream - run_models(select="tag:mart", full_refresh=True) - Full refresh marts - run_models(select_state_modified=True, check_schema_changes=True) - Detect schema changes """ await self._ensure_initialized_with_context(ctx) return await self.toolImpl_run_models(ctx, select, exclude, select_state_modified, select_state_modified_plus_downstream, full_refresh, fail_fast, check_schema_changes)
- src/dbt_core_mcp/server.py:479-520 (helper)Helper function that validates parameters and prepares dbt state-based selectors (state:modified or state:modified+) for efficient incremental runs.async def _prepare_state_based_selection( self, select_state_modified: bool, select_state_modified_plus_downstream: bool, select: str | None, ) -> str | None: """Validate and prepare state-based selection. Args: select_state_modified: Use state:modified selector select_state_modified_plus_downstream: Extend to state:modified+ select: Manual selector (conflicts with state-based) Returns: The dbt selector string to use ("state:modified" or "state:modified+"), or None if: - Not using state-based selection - No previous state exists (cannot determine modifications) Raises: ValueError: If validation fails """ # Validate: hierarchical requirement if select_state_modified_plus_downstream and not select_state_modified: raise ValueError("select_state_modified_plus_downstream requires select_state_modified=True") # Validate: can't use both state-based and manual selection if select_state_modified and select: raise ValueError("Cannot use both select_state_modified* flags and select parameter") # If not using state-based selection, return None if not select_state_modified: return None # Check if state exists state_dir = self.project_dir / "target" / "state_last_run" # type: ignore if not state_dir.exists(): # No state - cannot determine modifications return None # Return selector (state exists) return "state:modified+" if select_state_modified_plus_downstream else "state:modified"
- src/dbt_core_mcp/server.py:304-341 (helper)Helper that parses dbt's run_results.json into a simplified dictionary format for tool responses.def _parse_run_results(self) -> dict[str, Any]: """Parse target/run_results.json after dbt run/test/build. Returns: Dictionary with results array and metadata """ if not self.project_dir: return {"results": [], "elapsed_time": 0} run_results_path = self.project_dir / "target" / "run_results.json" if not run_results_path.exists(): return {"results": [], "elapsed_time": 0} try: with open(run_results_path) as f: data = json.load(f) # Simplify results for output simplified_results = [] for result in data.get("results", []): simplified_results.append( { "unique_id": result.get("unique_id"), "status": result.get("status"), "message": result.get("message"), "execution_time": result.get("execution_time"), "failures": result.get("failures"), } ) return { "results": simplified_results, "elapsed_time": data.get("elapsed_time", 0), } except Exception as e: logger.warning(f"Failed to parse run_results.json: {e}") return {"results": [], "elapsed_time": 0}