Skip to main content
Glama

dbt_run

Execute dbt models to transform data and build analytical tables in a data warehouse. Use this to refresh data or implement new data transformations in a dbt project, returning command output as text.

Instructions

Run dbt models. An AI agent should use this tool when it needs to execute dbt models to transform data and build analytical tables in the data warehouse. This is essential for refreshing data or implementing new data transformations in a project.

Returns: Output from the dbt run command as text (this command does not support JSON output format)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
excludeNoModels to exclude
full_refreshNoWhether to perform a full refresh
modelsNoSpecific models to run, using the dbt selection syntax (e.g., "model_name+")
profiles_dirNoDirectory containing the profiles.yml file (defaults to project_dir if not specified)
project_dirNoABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.').
selectorNoNamed selector to use

Implementation Reference

  • The main handler function for the 'dbt_run' MCP tool. It constructs the dbt 'run' command based on input parameters like models, selector, exclude, etc., executes it using execute_dbt_command, and processes the result.
    async def dbt_run( models: Optional[str] = Field( default=None, description="Specific models to run, using the dbt selection syntax (e.g., \"model_name+\")" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), full_refresh: bool = Field( default=False, description="Whether to perform a full refresh" ) ) -> str: """Run dbt models. An AI agent should use this tool when it needs to execute dbt models to transform data and build analytical tables in the data warehouse. This is essential for refreshing data or implementing new data transformations in a project. Returns: Output from the dbt run command as text (this command does not support JSON output format) """ command = ["run"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if full_refresh: command.append("--full-refresh") # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="run")
  • Pydantic input schema for the dbt_run tool, defining parameters with descriptions and defaults for MCP tool validation.
    models: Optional[str] = Field( default=None, description="Specific models to run, using the dbt selection syntax (e.g., \"model_name+\")" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), full_refresh: bool = Field( default=False, description="Whether to perform a full refresh" ) ) -> str:
  • src/server.py:89-89 (registration)
    Invocation of register_tools(mcp) which defines and registers the dbt_run tool (along with others) with the FastMCP server instance.
    register_tools(mcp)
  • Key helper function that asynchronously executes dbt CLI commands via subprocess, handles environment setup including profiles_dir and .env loading, and returns structured results.
    async def execute_dbt_command( command: List[str], project_dir: str = ".", profiles_dir: Optional[str] = None ) -> Dict[str, Any]: """ Execute a dbt command and return the result. Args: command: List of command arguments (without the dbt executable) project_dir: Directory containing the dbt project profiles_dir: Directory containing the profiles.yml file (defaults to project_dir if not specified) Returns: Dictionary containing command result: { "success": bool, "output": str or dict, "error": str or None, "returncode": int } """ # Get dbt path from config dbt_path = get_config("dbt_path", "dbt") full_command = [dbt_path] + command # Load environment variables env_vars = load_environment(project_dir) # Explicitly set HOME environment variable in os.environ os.environ["HOME"] = str(Path.home()) logger.debug(f"Explicitly setting HOME environment variable in os.environ to {os.environ['HOME']}") # Set DBT_PROFILES_DIR based on profiles_dir or project_dir if profiles_dir is not None: # Use the explicitly provided profiles_dir abs_profiles_dir = str(Path(profiles_dir).resolve()) os.environ["DBT_PROFILES_DIR"] = abs_profiles_dir logger.debug(f"Setting DBT_PROFILES_DIR in os.environ to {abs_profiles_dir} (from profiles_dir)") else: # Check if there's a value from the .env file if "DBT_PROFILES_DIR" in env_vars: os.environ["DBT_PROFILES_DIR"] = env_vars["DBT_PROFILES_DIR"] logger.debug(f"Setting DBT_PROFILES_DIR from env_vars to {env_vars['DBT_PROFILES_DIR']}") else: # Default to project_dir abs_project_dir = str(Path(project_dir).resolve()) os.environ["DBT_PROFILES_DIR"] = abs_project_dir logger.debug(f"Setting DBT_PROFILES_DIR in os.environ to {abs_project_dir} (from project_dir)") # Update env_vars with the current os.environ env_vars.update(os.environ) logger.debug(f"Executing command: {' '.join(full_command)} in {project_dir}") try: # Execute the command process = await asyncio.create_subprocess_exec( *full_command, cwd=project_dir, env=env_vars, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Communicate with the process stdout_bytes, stderr_bytes = await process.communicate() stdout = stdout_bytes.decode('utf-8') if stdout_bytes else "" stderr = stderr_bytes.decode('utf-8') if stderr_bytes else "" success = process.returncode == 0 # Special case for 'show' command: detect "does not match any enabled nodes" as an error # Only check if --quiet is not in the command, as --quiet suppresses this output if success and command[0] == "show" and "--quiet" not in command and "does not match any enabled nodes" in stdout: success = False # For commands that failed, combine stdout and stderr for comprehensive output if not success and stderr: # If there's output from both stdout and stderr, combine them if stdout: output = f"{stdout}\n\nSTDERR:\n{stderr}" else: output = stderr else: # For successful commands, use stdout output = stdout # Check if this is dbt Cloud CLI output format with embedded JSON in log lines if stdout.strip().startswith('[') and '"name":' in stdout: try: # Parse the entire output as JSON array json_array = json.loads(stdout) # If it's an array of log objects with name field (dbt Cloud CLI format) if isinstance(json_array, list) and all(isinstance(item, dict) and "name" in item for item in json_array): logger.debug(f"Detected dbt Cloud CLI output format with {len(json_array)} items") output = json_array except json.JSONDecodeError: # Not valid JSON array, keep as string logger.debug("Failed to parse stdout as JSON array, keeping as string") pass else: # Try standard JSON parsing try: output = json.loads(stdout) except json.JSONDecodeError: # Not JSON, keep as string logger.debug("Failed to parse stdout as standard JSON, keeping as string") pass result = { "success": success, "output": output, "error": stderr if not success else None, "returncode": process.returncode } if not success: logger.warning(f"Command failed with exit code {process.returncode}: {stderr}") # Log full environment for debugging logger.debug(f"Full environment variables: {env_vars}") logger.debug(f"Current directory: {project_dir}") logger.debug(f"Full command: {' '.join(full_command)}") return result except Exception as e: import traceback stack_trace = traceback.format_exc() logger.error(f"Error executing command: {e}\nStack trace: {stack_trace}") return { "success": False, "output": None, "error": f"{str(e)}\nStack trace: {stack_trace}", "returncode": -1 }
  • Helper function that processes the raw result from execute_dbt_command, applies formatting if provided, and returns user-friendly string output or error messages.
    async def process_command_result( result: Dict[str, Any], command_name: str, output_formatter: Optional[Callable] = None, include_debug_info: bool = False ) -> str: """ Process the result of a dbt command execution. Args: result: The result dictionary from execute_dbt_command command_name: The name of the dbt command (e.g. "run", "test") output_formatter: Optional function to format successful output include_debug_info: Whether to include additional debug info in error messages Returns: Formatted output or error message """ logger.info(f"Processing command result for {command_name}") logger.info(f"Result success: {result['success']}, returncode: {result.get('returncode')}") # Log the output type and a sample if "output" in result: if isinstance(result["output"], str): logger.info(f"Output type: str, first 100 chars: {result['output'][:100]}") elif isinstance(result["output"], (dict, list)): logger.info(f"Output type: {type(result['output'])}, sample: {json.dumps(result['output'])[:100]}") else: logger.info(f"Output type: {type(result['output'])}") # For errors, simply return the raw command output if available if not result["success"]: logger.warning(f"Command {command_name} failed with returncode {result.get('returncode')}") # If we have command output, return it directly if "output" in result and result["output"]: logger.info(f"Returning error output: {str(result['output'])[:100]}...") return str(result["output"]) # If no command output, return the error message if result["error"]: logger.info(f"Returning error message: {str(result['error'])[:100]}...") return str(result["error"]) # If neither output nor error is available, return a generic message logger.info("No output or error available, returning generic message") return f"Command failed with exit code {result.get('returncode', 'unknown')}" # Format successful output if output_formatter: logger.info(f"Using custom formatter for {command_name}") formatted_result = output_formatter(result["output"]) logger.info(f"Formatted result type: {type(formatted_result)}, first 100 chars: {str(formatted_result)[:100]}") return formatted_result # Default output formatting logger.info(f"Using default formatting for {command_name}") if isinstance(result["output"], (dict, list)): json_result = json.dumps(result["output"]) logger.info(f"JSON result length: {len(json_result)}, first 100 chars: {json_result[:100]}") return json_result else: str_result = str(result["output"]) logger.info(f"String result length: {len(str_result)}, first 100 chars: {str_result[:100]}") return str_result

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MammothGrowth/dbt-cli-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server