Skip to main content
Glama
NiclasOlofsson

DBT Core MCP Server

seed_data

Load CSV seed files into database tables for reference data like country codes or product categories. Supports selective loading of modified files or manual selection of specific seeds.

Instructions

Load seed data (CSV files) from seeds/ directory into database tables.

Seeds are typically used for reference data like country codes, product categories, etc.

State-based selection modes (detects changed CSV files):

  • select_state_modified: Load only seeds modified since last successful run (state:modified)

  • select_state_modified_plus_downstream: Load modified + downstream dependencies (state:modified+) Note: Requires select_state_modified=True

Manual selection (alternative to state-based):

  • select: dbt selector syntax (e.g., "raw_customers", "tag:lookup")

  • exclude: Exclude specific seeds

Important: Change detection for seeds works via file hash comparison:

  • Seeds < 1 MiB: Content hash is compared (recommended)

  • Seeds >= 1 MiB: Only file path changes are detected (content changes ignored) For large seeds, use manual selection or run all seeds.

Args: select: Manual selector for seeds exclude: Exclude selector select_state_modified: Use state:modified selector (changed seeds only) select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream) full_refresh: Truncate and reload seed tables (default behavior) show: Show preview of loaded data

Returns: Seed results with status and loaded seed info

Examples: seed_data() # Load all seeds seed_data(select_state_modified=True) # Load only changed CSVs seed_data(select="raw_customers") # Load specific seed

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
selectNo
excludeNo
select_state_modifiedNo
select_state_modified_plus_downstreamNo
full_refreshNo
showNo

Implementation Reference

  • The core handler function that implements the seed_data tool logic. It constructs the dbt seed command with selectors (manual or state-based), executes it via the runner, handles progress reporting, parses results, and saves state for future runs.
    async def toolImpl_seed_data(
        self,
        ctx: Context | None = None,
        select: str | None = None,
        exclude: str | None = None,
        select_state_modified: bool = False,
        select_state_modified_plus_downstream: bool = False,
        full_refresh: bool = False,
        show: bool = False,
    ) -> dict[str, Any]:
        """Implementation of seed_data tool."""
        # Prepare state-based selection (validates and returns selector)
        selector = await self._prepare_state_based_selection(select_state_modified, select_state_modified_plus_downstream, select)
    
        # Early return if state-based requested but no state exists
        if select_state_modified and not selector:
            return {
                "status": "success",
                "message": "No previous state found - cannot determine modifications",
                "results": [],
                "elapsed_time": 0,
            }
    
        # Build command args
        args = ["seed"]
    
        # Add selector if we have one (state-based or manual)
        if selector:
            args.extend(["-s", selector, "--state", "target/state_last_run"])
        elif select:
            args.extend(["-s", select])
    
        if exclude:
            args.extend(["--exclude", exclude])
    
        if full_refresh:
            args.append("--full-refresh")
    
        if show:
            args.append("--show")
    
        # Execute with progress reporting
        logger.info(f"Running DBT seed with args: {args}")
    
        # Define progress callback if context available
        async def progress_callback(current: int, total: int, message: str) -> None:
            if ctx:
                await ctx.report_progress(progress=current, total=total, message=message)
    
        result = await self.runner.invoke(args, progress_callback=progress_callback if ctx else None)  # type: ignore
    
        if not result.success:
            error_msg = str(result.exception) if result.exception else "Seed failed"
            response = {
                "status": "error",
                "message": error_msg,
                "command": " ".join(args),
            }
            # Include dbt output for debugging
            if result.stdout:
                response["dbt_output"] = result.stdout
            if result.stderr:
                response["stderr"] = result.stderr
            return response
    
        # Save state on success for next modified run
        if result.success and self.project_dir:
            state_dir = self.project_dir / "target" / "state_last_run"
            state_dir.mkdir(parents=True, exist_ok=True)
            manifest_path = self.runner.get_manifest_path()  # type: ignore
            shutil.copy(manifest_path, state_dir / "manifest.json")
    
        # Parse run_results.json for details
        run_results = self._parse_run_results()
    
        return {
            "status": "success",
            "command": " ".join(args),
            "results": run_results.get("results", []),
            "elapsed_time": run_results.get("elapsed_time"),
        }
  • The FastMCP tool registration decorator (@self.app.tool()) that defines the 'seed_data' tool, its parameters (serving as input schema), comprehensive docstring, and delegates to the implementation handler.
    async def seed_data(
        ctx: Context,
        select: str | None = None,
        exclude: str | None = None,
        select_state_modified: bool = False,
        select_state_modified_plus_downstream: bool = False,
        full_refresh: bool = False,
        show: bool = False,
    ) -> dict[str, Any]:
        """Load seed data (CSV files) from seeds/ directory into database tables.
    
        Seeds are typically used for reference data like country codes, product categories, etc.
    
        State-based selection modes (detects changed CSV files):
        - select_state_modified: Load only seeds modified since last successful run (state:modified)
        - select_state_modified_plus_downstream: Load modified + downstream dependencies (state:modified+)
          Note: Requires select_state_modified=True
    
        Manual selection (alternative to state-based):
        - select: dbt selector syntax (e.g., "raw_customers", "tag:lookup")
        - exclude: Exclude specific seeds
    
        Important: Change detection for seeds works via file hash comparison:
        - Seeds < 1 MiB: Content hash is compared (recommended)
        - Seeds >= 1 MiB: Only file path changes are detected (content changes ignored)
        For large seeds, use manual selection or run all seeds.
    
        Args:
            select: Manual selector for seeds
            exclude: Exclude selector
            select_state_modified: Use state:modified selector (changed seeds only)
            select_state_modified_plus_downstream: Extend to state:modified+ (changed + downstream)
            full_refresh: Truncate and reload seed tables (default behavior)
            show: Show preview of loaded data
    
        Returns:
            Seed results with status and loaded seed info
    
        Examples:
            seed_data()  # Load all seeds
            seed_data(select_state_modified=True)  # Load only changed CSVs
            seed_data(select="raw_customers")  # Load specific seed
        """
        await self._ensure_initialized_with_context(ctx)
        return await self.toolImpl_seed_data(ctx, select, exclude, select_state_modified, select_state_modified_plus_downstream, full_refresh, show)
  • Helper method used by seed_data (and similar tools) to validate parameters and construct state-based dbt selectors like 'state:modified' or 'state:modified+'.
    async def _prepare_state_based_selection(
        self,
        select_state_modified: bool,
        select_state_modified_plus_downstream: bool,
        select: str | None,
    ) -> str | None:
        """Validate and prepare state-based selection.
    
        Args:
            select_state_modified: Use state:modified selector
            select_state_modified_plus_downstream: Extend to state:modified+
            select: Manual selector (conflicts with state-based)
    
        Returns:
            The dbt selector string to use ("state:modified" or "state:modified+"), or None if:
            - Not using state-based selection
            - No previous state exists (cannot determine modifications)
    
        Raises:
            ValueError: If validation fails
        """
        # Validate: hierarchical requirement
        if select_state_modified_plus_downstream and not select_state_modified:
            raise ValueError("select_state_modified_plus_downstream requires select_state_modified=True")
    
        # Validate: can't use both state-based and manual selection
        if select_state_modified and select:
            raise ValueError("Cannot use both select_state_modified* flags and select parameter")
    
        # If not using state-based selection, return None
        if not select_state_modified:
            return None
    
        # Check if state exists
        state_dir = self.project_dir / "target" / "state_last_run"  # type: ignore
        if not state_dir.exists():
            # No state - cannot determine modifications
            return None
    
        # Return selector (state exists)
        return "state:modified+" if select_state_modified_plus_downstream else "state:modified"
  • Helper method to parse dbt's run_results.json file after seed/run/test/build operations, simplifying results for tool responses.
    def _parse_run_results(self) -> dict[str, Any]:
        """Parse target/run_results.json after dbt run/test/build.
    
        Returns:
            Dictionary with results array and metadata
        """
        if not self.project_dir:
            return {"results": [], "elapsed_time": 0}
    
        run_results_path = self.project_dir / "target" / "run_results.json"
        if not run_results_path.exists():
            return {"results": [], "elapsed_time": 0}
    
        try:
            with open(run_results_path) as f:
                data = json.load(f)
    
            # Simplify results for output
            simplified_results = []
            for result in data.get("results", []):
                simplified_results.append(
                    {
                        "unique_id": result.get("unique_id"),
                        "status": result.get("status"),
                        "message": result.get("message"),
                        "execution_time": result.get("execution_time"),
                        "failures": result.get("failures"),
                    }
                )
    
            return {
                "results": simplified_results,
                "elapsed_time": data.get("elapsed_time", 0),
            }
        except Exception as e:
            logger.warning(f"Failed to parse run_results.json: {e}")
            return {"results": [], "elapsed_time": 0}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NiclasOlofsson/dbt-core-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server