Skip to main content
Glama
NiclasOlofsson

DBT Core MCP Server

query_database

Execute SQL queries against a dbt project's database to retrieve, analyze, and export data using proper dbt references and efficient query practices.

Instructions

Execute a SQL query against the dbt project's database.

BEST PRACTICES:

  1. Before querying: Inspect schema using get_resource_info() with include_database_schema=True

  2. Always use {{ ref('model_name') }} for dbt models (never hard-code table paths)

  3. Always use {{ source('source_name', 'table_name') }} for source tables

  4. For non-dbt tables: Verify schema with user before querying

  5. After results: Report "Query Result: X rows retrieved" and summarize key findings

QUERY EFFICIENCY:

  • Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data

  • Apply WHERE filters early to narrow scope before aggregation

  • Use LIMIT for exploratory queries to get representative samples

  • Calculate totals, ratios, and trends in SQL rather than returning all rows

  • Use GROUP BY for categorization within the query

  • Always ask: "Can SQL answer this question directly?" before returning data

LARGE RESULT HANDLING:

  • For queries returning many rows (>100), use output_file parameter to save results to disk

  • This prevents context window overflow and improves performance

  • The tool returns metadata + preview instead of full results when output_file is used

  • Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json")

OUTPUT FORMATS:

  • json (default): Returns data as JSON array of objects

  • csv: Returns comma-separated values with header row

  • tsv: Returns tab-separated values with header row

  • CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible

Args: sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }} For exploratory queries, include LIMIT. For aggregations/counts, omit it. output_file: Optional file path to save results. Recommended for large result sets (>100 rows). If provided, only metadata is returned (no preview for CSV/TSV). If omitted, all data is returned inline (may consume large context). output_format: Output format - "json" (default), "csv", or "tsv"

Returns: JSON inline: {"status": "success", "row_count": N, "rows": [...]} JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]} CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."} CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYes
output_fileNo
output_formatNojson

Implementation Reference

  • Main handler function that executes the SQL query via the dbt runner's invoke_query method, handles errors, parses JSON results from dbt show, supports JSON/CSV/TSV output formats, and saves to file if specified.
    async def toolImpl_query_database(self, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]:
        """Implementation for query_database tool."""
        # Execute query using dbt show --inline
        result = await self.runner.invoke_query(sql)  # type: ignore
    
        if not result.success:
            error_msg = str(result.exception) if result.exception else "Unknown error"
            response = {
                "status": "failed",
                "error": error_msg,
            }
            # Include dbt output for debugging
            if result.stdout:
                response["dbt_output"] = result.stdout
            if result.stderr:
                response["stderr"] = result.stderr
            return response
    
        # Parse JSON output from dbt show
        import json
        import re
    
        output = result.stdout if hasattr(result, "stdout") else ""
    
        try:
            # dbt show --output json returns: {"show": [...rows...]}
            # Find the JSON object (look for {"show": pattern)
            json_match = re.search(r'\{\s*"show"\s*:\s*\[', output)
            if not json_match:
                return {
                    "status": "failed",
                    "error": "No JSON output found in dbt show response",
                }
    
            # Use JSONDecoder to parse just the first complete JSON object
            # This handles extra data after the JSON (like log lines)
            decoder = json.JSONDecoder()
            data, _ = decoder.raw_decode(output, json_match.start())
    
            if "show" in data:
                rows = data["show"]
                row_count = len(rows)
    
                # Handle different output formats
                if output_format in ("csv", "tsv"):
                    # Convert to CSV/TSV format
                    import csv
                    import io
    
                    delimiter = "\t" if output_format == "tsv" else ","
                    csv_buffer = io.StringIO()
    
                    if rows:
                        writer = csv.DictWriter(csv_buffer, fieldnames=rows[0].keys(), delimiter=delimiter)
                        writer.writeheader()
                        writer.writerows(rows)
                        csv_string = csv_buffer.getvalue()
                    else:
                        csv_string = ""
    
                    if output_file:
                        # Save to file
                        from pathlib import Path
    
                        output_path = Path(output_file)
                        output_path.parent.mkdir(parents=True, exist_ok=True)
    
                        with open(output_path, "w", newline="") as f:
                            f.write(csv_string)
    
                        # Get file size
                        file_size_bytes = output_path.stat().st_size
                        file_size_kb = file_size_bytes / 1024
    
                        return {
                            "status": "success",
                            "row_count": row_count,
                            "format": output_format,
                            "saved_to": str(output_path),
                            "file_size_kb": round(file_size_kb, 2),
                        }
                    else:
                        # Return CSV/TSV inline
                        return {
                            "status": "success",
                            "row_count": row_count,
                            "format": output_format,
                            output_format: csv_string,
                        }
                else:
                    # JSON format (default)
                    if output_file:
                        # Ensure directory exists
                        from pathlib import Path
    
                        output_path = Path(output_file)
                        output_path.parent.mkdir(parents=True, exist_ok=True)
    
                        # Write rows to file
                        with open(output_path, "w") as f:
                            json.dump(rows, f, indent=2)
    
                        # Get file size
                        file_size_bytes = output_path.stat().st_size
                        file_size_kb = file_size_bytes / 1024
    
                        # Return metadata with preview
                        return {
                            "status": "success",
                            "row_count": row_count,
                            "saved_to": str(output_path),
                            "file_size_kb": round(file_size_kb, 2),
                            "columns": list(rows[0].keys()) if rows else [],
                            "preview": rows[:3],  # First 3 rows as preview
                        }
                    else:
                        # Return all rows inline
                        return {
                            "status": "success",
                            "row_count": row_count,
                            "rows": rows,
                        }
            else:
                return {
                    "status": "failed",
                    "error": "Unexpected JSON format from dbt show",
                    "data": data,
                }
    
        except json.JSONDecodeError as e:
            return {
                "status": "error",
                "message": f"Failed to parse query results: {e}",
                "raw_output": output[:500],
            }
    
    async def toolImpl_run_models(
  • Tool registration using @app.tool() decorator. Defines the input schema via parameters (sql: str, output_file: Optional[str], output_format: str='json') and comprehensive docstring with usage guidelines. Delegates to toolImpl_query_database.
    async def query_database(ctx: Context, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]:
        """Execute a SQL query against the dbt project's database.
    
        BEST PRACTICES:
        1. Before querying: Inspect schema using get_resource_info() with include_database_schema=True
        2. Always use {{ ref('model_name') }} for dbt models (never hard-code table paths)
        3. Always use {{ source('source_name', 'table_name') }} for source tables
        4. For non-dbt tables: Verify schema with user before querying
        5. After results: Report "Query Result: X rows retrieved" and summarize key findings
    
        QUERY EFFICIENCY:
        - Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data
        - Apply WHERE filters early to narrow scope before aggregation
        - Use LIMIT for exploratory queries to get representative samples
        - Calculate totals, ratios, and trends in SQL rather than returning all rows
        - Use GROUP BY for categorization within the query
        - Always ask: "Can SQL answer this question directly?" before returning data
    
        LARGE RESULT HANDLING:
        - For queries returning many rows (>100), use output_file parameter to save results to disk
        - This prevents context window overflow and improves performance
        - The tool returns metadata + preview instead of full results when output_file is used
        - Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json")
    
        OUTPUT FORMATS:
        - json (default): Returns data as JSON array of objects
        - csv: Returns comma-separated values with header row
        - tsv: Returns tab-separated values with header row
        - CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible
    
        Args:
            sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }}
                 For exploratory queries, include LIMIT. For aggregations/counts, omit it.
            output_file: Optional file path to save results. Recommended for large result sets (>100 rows).
                        If provided, only metadata is returned (no preview for CSV/TSV).
                        If omitted, all data is returned inline (may consume large context).
            output_format: Output format - "json" (default), "csv", or "tsv"
    
        Returns:
            JSON inline: {"status": "success", "row_count": N, "rows": [...]}
            JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]}
            CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."}
            CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"}
        """
        await self._ensure_initialized_with_context(ctx)
        return await self.toolImpl_query_database(sql, output_file, output_format)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NiclasOlofsson/dbt-core-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server