query_database
Execute SQL queries against a dbt project's database to retrieve, analyze, and export data using proper dbt references and efficient query practices.
Instructions
Execute a SQL query against the dbt project's database.
BEST PRACTICES:
Before querying: Inspect schema using get_resource_info() with include_database_schema=True
Always use {{ ref('model_name') }} for dbt models (never hard-code table paths)
Always use {{ source('source_name', 'table_name') }} for source tables
For non-dbt tables: Verify schema with user before querying
After results: Report "Query Result: X rows retrieved" and summarize key findings
QUERY EFFICIENCY:
Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data
Apply WHERE filters early to narrow scope before aggregation
Use LIMIT for exploratory queries to get representative samples
Calculate totals, ratios, and trends in SQL rather than returning all rows
Use GROUP BY for categorization within the query
Always ask: "Can SQL answer this question directly?" before returning data
LARGE RESULT HANDLING:
For queries returning many rows (>100), use output_file parameter to save results to disk
This prevents context window overflow and improves performance
The tool returns metadata + preview instead of full results when output_file is used
Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json")
OUTPUT FORMATS:
json (default): Returns data as JSON array of objects
csv: Returns comma-separated values with header row
tsv: Returns tab-separated values with header row
CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible
Args: sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }} For exploratory queries, include LIMIT. For aggregations/counts, omit it. output_file: Optional file path to save results. Recommended for large result sets (>100 rows). If provided, only metadata is returned (no preview for CSV/TSV). If omitted, all data is returned inline (may consume large context). output_format: Output format - "json" (default), "csv", or "tsv"
Returns: JSON inline: {"status": "success", "row_count": N, "rows": [...]} JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]} CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."} CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | ||
| output_file | No | ||
| output_format | No | json |
Implementation Reference
- src/dbt_core_mcp/server.py:650-786 (handler)Main handler function that executes the SQL query via the dbt runner's invoke_query method, handles errors, parses JSON results from dbt show, supports JSON/CSV/TSV output formats, and saves to file if specified.async def toolImpl_query_database(self, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]: """Implementation for query_database tool.""" # Execute query using dbt show --inline result = await self.runner.invoke_query(sql) # type: ignore if not result.success: error_msg = str(result.exception) if result.exception else "Unknown error" response = { "status": "failed", "error": error_msg, } # Include dbt output for debugging if result.stdout: response["dbt_output"] = result.stdout if result.stderr: response["stderr"] = result.stderr return response # Parse JSON output from dbt show import json import re output = result.stdout if hasattr(result, "stdout") else "" try: # dbt show --output json returns: {"show": [...rows...]} # Find the JSON object (look for {"show": pattern) json_match = re.search(r'\{\s*"show"\s*:\s*\[', output) if not json_match: return { "status": "failed", "error": "No JSON output found in dbt show response", } # Use JSONDecoder to parse just the first complete JSON object # This handles extra data after the JSON (like log lines) decoder = json.JSONDecoder() data, _ = decoder.raw_decode(output, json_match.start()) if "show" in data: rows = data["show"] row_count = len(rows) # Handle different output formats if output_format in ("csv", "tsv"): # Convert to CSV/TSV format import csv import io delimiter = "\t" if output_format == "tsv" else "," csv_buffer = io.StringIO() if rows: writer = csv.DictWriter(csv_buffer, fieldnames=rows[0].keys(), delimiter=delimiter) writer.writeheader() writer.writerows(rows) csv_string = csv_buffer.getvalue() else: csv_string = "" if output_file: # Save to file from pathlib import Path output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", newline="") as f: f.write(csv_string) # Get file size file_size_bytes = output_path.stat().st_size file_size_kb = file_size_bytes / 1024 return { "status": "success", "row_count": row_count, "format": output_format, "saved_to": str(output_path), "file_size_kb": round(file_size_kb, 2), } else: # Return CSV/TSV inline return { "status": "success", "row_count": row_count, "format": output_format, output_format: csv_string, } else: # JSON format (default) if output_file: # Ensure directory exists from pathlib import Path output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) # Write rows to file with open(output_path, "w") as f: json.dump(rows, f, indent=2) # Get file size file_size_bytes = output_path.stat().st_size file_size_kb = file_size_bytes / 1024 # Return metadata with preview return { "status": "success", "row_count": row_count, "saved_to": str(output_path), "file_size_kb": round(file_size_kb, 2), "columns": list(rows[0].keys()) if rows else [], "preview": rows[:3], # First 3 rows as preview } else: # Return all rows inline return { "status": "success", "row_count": row_count, "rows": rows, } else: return { "status": "failed", "error": "Unexpected JSON format from dbt show", "data": data, } except json.JSONDecodeError as e: return { "status": "error", "message": f"Failed to parse query results: {e}", "raw_output": output[:500], } async def toolImpl_run_models(
- src/dbt_core_mcp/server.py:1479-1525 (registration)Tool registration using @app.tool() decorator. Defines the input schema via parameters (sql: str, output_file: Optional[str], output_format: str='json') and comprehensive docstring with usage guidelines. Delegates to toolImpl_query_database.async def query_database(ctx: Context, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]: """Execute a SQL query against the dbt project's database. BEST PRACTICES: 1. Before querying: Inspect schema using get_resource_info() with include_database_schema=True 2. Always use {{ ref('model_name') }} for dbt models (never hard-code table paths) 3. Always use {{ source('source_name', 'table_name') }} for source tables 4. For non-dbt tables: Verify schema with user before querying 5. After results: Report "Query Result: X rows retrieved" and summarize key findings QUERY EFFICIENCY: - Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data - Apply WHERE filters early to narrow scope before aggregation - Use LIMIT for exploratory queries to get representative samples - Calculate totals, ratios, and trends in SQL rather than returning all rows - Use GROUP BY for categorization within the query - Always ask: "Can SQL answer this question directly?" before returning data LARGE RESULT HANDLING: - For queries returning many rows (>100), use output_file parameter to save results to disk - This prevents context window overflow and improves performance - The tool returns metadata + preview instead of full results when output_file is used - Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json") OUTPUT FORMATS: - json (default): Returns data as JSON array of objects - csv: Returns comma-separated values with header row - tsv: Returns tab-separated values with header row - CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible Args: sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }} For exploratory queries, include LIMIT. For aggregations/counts, omit it. output_file: Optional file path to save results. Recommended for large result sets (>100 rows). If provided, only metadata is returned (no preview for CSV/TSV). If omitted, all data is returned inline (may consume large context). output_format: Output format - "json" (default), "csv", or "tsv" Returns: JSON inline: {"status": "success", "row_count": N, "rows": [...]} JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]} CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."} CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"} """ await self._ensure_initialized_with_context(ctx) return await self.toolImpl_query_database(sql, output_file, output_format)