Skip to main content
Glama

query_database

Execute SQL queries against a dbt project's database to retrieve, analyze, and export data using proper dbt references and efficient query practices.

Instructions

Execute a SQL query against the dbt project's database.

BEST PRACTICES:

  1. Before querying: Inspect schema using get_resource_info() with include_database_schema=True

  2. Always use {{ ref('model_name') }} for dbt models (never hard-code table paths)

  3. Always use {{ source('source_name', 'table_name') }} for source tables

  4. For non-dbt tables: Verify schema with user before querying

  5. After results: Report "Query Result: X rows retrieved" and summarize key findings

QUERY EFFICIENCY:

  • Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data

  • Apply WHERE filters early to narrow scope before aggregation

  • Use LIMIT for exploratory queries to get representative samples

  • Calculate totals, ratios, and trends in SQL rather than returning all rows

  • Use GROUP BY for categorization within the query

  • Always ask: "Can SQL answer this question directly?" before returning data

LARGE RESULT HANDLING:

  • For queries returning many rows (>100), use output_file parameter to save results to disk

  • This prevents context window overflow and improves performance

  • The tool returns metadata + preview instead of full results when output_file is used

  • Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json")

OUTPUT FORMATS:

  • json (default): Returns data as JSON array of objects

  • csv: Returns comma-separated values with header row

  • tsv: Returns tab-separated values with header row

  • CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible

Args: sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }} For exploratory queries, include LIMIT. For aggregations/counts, omit it. output_file: Optional file path to save results. Recommended for large result sets (>100 rows). If provided, only metadata is returned (no preview for CSV/TSV). If omitted, all data is returned inline (may consume large context). output_format: Output format - "json" (default), "csv", or "tsv"

Returns: JSON inline: {"status": "success", "row_count": N, "rows": [...]} JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]} CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."} CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYes
output_fileNo
output_formatNojson

Implementation Reference

  • Main handler function that executes the SQL query via the dbt runner's invoke_query method, handles errors, parses JSON results from dbt show, supports JSON/CSV/TSV output formats, and saves to file if specified.
    async def toolImpl_query_database(self, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]: """Implementation for query_database tool.""" # Execute query using dbt show --inline result = await self.runner.invoke_query(sql) # type: ignore if not result.success: error_msg = str(result.exception) if result.exception else "Unknown error" response = { "status": "failed", "error": error_msg, } # Include dbt output for debugging if result.stdout: response["dbt_output"] = result.stdout if result.stderr: response["stderr"] = result.stderr return response # Parse JSON output from dbt show import json import re output = result.stdout if hasattr(result, "stdout") else "" try: # dbt show --output json returns: {"show": [...rows...]} # Find the JSON object (look for {"show": pattern) json_match = re.search(r'\{\s*"show"\s*:\s*\[', output) if not json_match: return { "status": "failed", "error": "No JSON output found in dbt show response", } # Use JSONDecoder to parse just the first complete JSON object # This handles extra data after the JSON (like log lines) decoder = json.JSONDecoder() data, _ = decoder.raw_decode(output, json_match.start()) if "show" in data: rows = data["show"] row_count = len(rows) # Handle different output formats if output_format in ("csv", "tsv"): # Convert to CSV/TSV format import csv import io delimiter = "\t" if output_format == "tsv" else "," csv_buffer = io.StringIO() if rows: writer = csv.DictWriter(csv_buffer, fieldnames=rows[0].keys(), delimiter=delimiter) writer.writeheader() writer.writerows(rows) csv_string = csv_buffer.getvalue() else: csv_string = "" if output_file: # Save to file from pathlib import Path output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", newline="") as f: f.write(csv_string) # Get file size file_size_bytes = output_path.stat().st_size file_size_kb = file_size_bytes / 1024 return { "status": "success", "row_count": row_count, "format": output_format, "saved_to": str(output_path), "file_size_kb": round(file_size_kb, 2), } else: # Return CSV/TSV inline return { "status": "success", "row_count": row_count, "format": output_format, output_format: csv_string, } else: # JSON format (default) if output_file: # Ensure directory exists from pathlib import Path output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) # Write rows to file with open(output_path, "w") as f: json.dump(rows, f, indent=2) # Get file size file_size_bytes = output_path.stat().st_size file_size_kb = file_size_bytes / 1024 # Return metadata with preview return { "status": "success", "row_count": row_count, "saved_to": str(output_path), "file_size_kb": round(file_size_kb, 2), "columns": list(rows[0].keys()) if rows else [], "preview": rows[:3], # First 3 rows as preview } else: # Return all rows inline return { "status": "success", "row_count": row_count, "rows": rows, } else: return { "status": "failed", "error": "Unexpected JSON format from dbt show", "data": data, } except json.JSONDecodeError as e: return { "status": "error", "message": f"Failed to parse query results: {e}", "raw_output": output[:500], } async def toolImpl_run_models(
  • Tool registration using @app.tool() decorator. Defines the input schema via parameters (sql: str, output_file: Optional[str], output_format: str='json') and comprehensive docstring with usage guidelines. Delegates to toolImpl_query_database.
    async def query_database(ctx: Context, sql: str, output_file: str | None = None, output_format: str = "json") -> dict[str, Any]: """Execute a SQL query against the dbt project's database. BEST PRACTICES: 1. Before querying: Inspect schema using get_resource_info() with include_database_schema=True 2. Always use {{ ref('model_name') }} for dbt models (never hard-code table paths) 3. Always use {{ source('source_name', 'table_name') }} for source tables 4. For non-dbt tables: Verify schema with user before querying 5. After results: Report "Query Result: X rows retrieved" and summarize key findings QUERY EFFICIENCY: - Use aggregations (COUNT, SUM, AVG, etc.) instead of pulling raw data - Apply WHERE filters early to narrow scope before aggregation - Use LIMIT for exploratory queries to get representative samples - Calculate totals, ratios, and trends in SQL rather than returning all rows - Use GROUP BY for categorization within the query - Always ask: "Can SQL answer this question directly?" before returning data LARGE RESULT HANDLING: - For queries returning many rows (>100), use output_file parameter to save results to disk - This prevents context window overflow and improves performance - The tool returns metadata + preview instead of full results when output_file is used - Example: query_database(sql="SELECT * FROM large_table", output_file="temp_auto/results.json") OUTPUT FORMATS: - json (default): Returns data as JSON array of objects - csv: Returns comma-separated values with header row - tsv: Returns tab-separated values with header row - CSV/TSV formats use proper quoting (only when necessary) and are Excel-compatible Args: sql: SQL query with Jinja templating: {{ ref('model') }}, {{ source('src', 'table') }} For exploratory queries, include LIMIT. For aggregations/counts, omit it. output_file: Optional file path to save results. Recommended for large result sets (>100 rows). If provided, only metadata is returned (no preview for CSV/TSV). If omitted, all data is returned inline (may consume large context). output_format: Output format - "json" (default), "csv", or "tsv" Returns: JSON inline: {"status": "success", "row_count": N, "rows": [...]} JSON file: {"status": "success", "row_count": N, "saved_to": "path", "preview": [...]} CSV/TSV inline: {"status": "success", "row_count": N, "format": "csv", "csv": "..."} CSV/TSV file: {"status": "success", "row_count": N, "format": "csv", "saved_to": "path"} """ await self._ensure_initialized_with_context(ctx) return await self.toolImpl_query_database(sql, output_file, output_format)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NiclasOlofsson/dbt-core-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server