Skip to main content
Glama

dbt_show

Preview model transformation results without materializing data to inspect, debug, or demonstrate processed data while preserving the target database.

Instructions

Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database.

Returns: Output from the dbt show command, defaulting to JSON format if not specified

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
modelsYesSpecific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref("model_name") }}' to reference other models.
project_dirNoABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.').
profiles_dirNoDirectory containing the profiles.yml file (defaults to project_dir if not specified)
limitNoLimit the number of rows returned
outputNoOutput format (json, table, etc.)json

Implementation Reference

  • Core MCP tool handler for 'dbt_show'. Executes dbt show command, handles inline SQL with security validation, model existence checks, and formats output using show_formatter.
    @mcp.tool() async def dbt_show( models: str = Field( description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models." ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), limit: Optional[int] = Field( default=None, description="Limit the number of rows returned" ), output: Optional[str] = Field( default="json", description="Output format (json, table, etc.)" ) ) -> str: """Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database. Returns: Output from the dbt show command, defaulting to JSON format if not specified """ # Use enhanced SQL detection is_inline_sql, sql_type = is_inline_sql_query(models) # If it's SQL, check for security risks if is_inline_sql: has_risk, risk_reason = contains_mutation_risk(models) if has_risk: logger.warning(f"Security risk detected in SQL: {risk_reason}") error_result = { "success": False, "output": f"Security validation failed: {risk_reason}. For security reasons, mutation operations are not allowed.", "error": "SecurityValidationError", "returncode": 1 } return await process_command_result( error_result, command_name="show", include_debug_info=True ) logger.info(f"dbt_show called with models={models}, is_inline_sql={is_inline_sql}") # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter if is_inline_sql: # Use regex to remove LIMIT clause from the SQL original_models = models models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE) logger.info(f"Stripped LIMIT clause: {original_models} -> {models}") # For inline SQL, use the --inline flag with the SQL as its value command = ["show", f"--inline={models}", "--output", output or "json"] # Only add --limit if the inline type is WITH or SELECT (select_inline vs meta_inline) if limit and sql_type in ["WITH", "SELECT"]: command.extend(["--limit", str(limit)]) logger.info(f"Executing dbt command: {' '.join(command)}") # Don't use --quiet for inline SQL to ensure we get error messages result = await execute_dbt_command(command, project_dir, profiles_dir) logger.info(f"Command result: success={result['success']}, returncode={result.get('returncode')}") if isinstance(result["output"], str): logger.info(f"Output (first 100 chars): {result['output'][:100]}") elif isinstance(result["output"], (dict, list)): logger.info(f"Output structure: {json.dumps(result['output'])[:100]}") # Check for specific error patterns in the output if not result["success"] or ( isinstance(result["output"], str) and any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"]) ): logger.warning(f"Error detected in output: {result['output'][:200]}") error_result = { "success": False, "output": f"Error executing inline SQL\n{result['output']}", "error": result["error"], "returncode": result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) else: # For regular model references, check if the model exists first check_command = ["ls", "-s", models] check_result = await execute_dbt_command(check_command, project_dir, profiles_dir) # If the model doesn't exist, return the error message if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]): error_result = { "success": False, "output": f"Model does not exist or is not enabled\n{check_result['output']}", "error": check_result["error"], "returncode": check_result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) # If the model exists, run the show command with --quiet and --output json command = ["show", "-s", models, "--quiet", "--output", output or "json"] if limit: command.extend(["--limit", str(limit)]) result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result( result, command_name="show", output_formatter=show_formatter, include_debug_info=True )
  • src/server.py:88-90 (registration)
    Registers all dbt MCP tools by calling register_tools(mcp), which defines and decorates the dbt_show handler.
    # Register tools register_tools(mcp)
  • Input schema defined via Pydantic Field annotations, providing parameter descriptions, defaults, and types for the dbt_show tool.
    models: str = Field( description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models." ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), limit: Optional[int] = Field( default=None, description="Limit the number of rows returned" ), output: Optional[str] = Field( default="json", description="Output format (json, table, etc.)" ) ) -> str:
  • Helper function used by dbt_show to detect if 'models' input is inline SQL or model reference using multiple heuristics.
    def is_inline_sql_query(query: str) -> tuple[bool, Optional[str]]: """ Determine if the given string is an inline SQL query or a model reference. This function uses multiple heuristics to determine if a string is likely an SQL query rather than a model name: 1. Checks for common SQL keywords at the beginning 2. Looks for SQL syntax patterns 3. Considers length and complexity 4. Handles SQL with comments (both single-line and multi-line) 5. Recognizes dbt templating syntax Args: query: The string to check Returns: A tuple of (is_sql, sql_type) where: - is_sql: True if the input is SQL, False otherwise - sql_type: The type of SQL statement if is_sql is True, None otherwise (e.g., "SELECT", "WITH", "SHOW", etc.) """ # Normalize the query by trimming whitespace normalized_query = query.strip() # Skip empty queries if not normalized_query: return False, None # Check if the query contains SQL comments has_single_line_comment = '--' in normalized_query has_multi_line_comment = '/*' in normalized_query and '*/' in normalized_query # If the query only contains comments, it's still SQL if has_single_line_comment or has_multi_line_comment: # Check if it's only comments by removing them and seeing if anything remains # Remove /* */ style comments sql_no_comments = re.sub(r'/\*.*?\*/', ' ', normalized_query, flags=re.DOTALL) # Remove -- style comments sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE) # Normalize whitespace sql_no_comments = ' '.join(sql_no_comments.split()).strip() if not sql_no_comments: # If nothing remains after removing comments, it's only comments return True, "COMMENT" # Convert to lowercase for case-insensitive matching normalized_query_lower = normalized_query.lower() # Check for SQL comments at the beginning and skip them for detection # This handles both single-line (--) and multi-line (/* */) comments comment_pattern = r'^(\s*(--[^\n]*\n|\s*/\*.*?\*/\s*)*\s*)' match = re.match(comment_pattern, normalized_query_lower, re.DOTALL) if match: # Skip past the comments for keyword detection start_pos = match.end() if start_pos >= len(normalized_query_lower): # If the query is only comments, it's still SQL return True, "COMMENT" normalized_query_lower = normalized_query_lower[start_pos:] # Common SQL statement starting keywords sql_starters = { 'select': 'SELECT', 'with': 'WITH', 'show': 'SHOW', 'describe': 'DESCRIBE', 'explain': 'EXPLAIN', 'analyze': 'ANALYZE', 'use': 'USE', 'set': 'SET' } # Check if the query starts with a common SQL keyword for keyword, sql_type in sql_starters.items(): if normalized_query_lower.startswith(keyword + ' '): return True, sql_type # Check for more complex patterns like CTEs # WITH clause followed by identifier and AS cte_pattern = r'^\s*with\s+[a-z0-9_]+\s+as\s*\(' if re.search(cte_pattern, normalized_query_lower, re.IGNORECASE): return True, "WITH" # Check for Jinja templating with SQL inside jinja_sql_pattern = r'{{\s*sql\s*}}' if re.search(jinja_sql_pattern, normalized_query_lower): return True, "JINJA" # Check for dbt ref or source macros which indicate SQL dbt_macro_pattern = r'{{\s*(ref|source)\s*\(\s*[\'"]' if re.search(dbt_macro_pattern, normalized_query_lower): return True, "DBT_MACRO" # If the query contains certain SQL syntax elements, it's likely SQL sql_syntax_elements = [ r'\bfrom\s+[a-z0-9_]+', # FROM clause r'\bjoin\s+[a-z0-9_]+', # JOIN clause r'\bwhere\s+', # WHERE clause r'\bgroup\s+by\s+', # GROUP BY clause r'\border\s+by\s+', # ORDER BY clause r'\bhaving\s+', # HAVING clause r'\bunion\s+', # UNION operator r'\bcase\s+when\s+' # CASE expression ] for pattern in sql_syntax_elements: if re.search(pattern, normalized_query_lower, re.IGNORECASE): return True, "SQL_SYNTAX" # If the query is long and contains spaces, it's more likely to be SQL than a model name if len(normalized_query_lower) > 30 and ' ' in normalized_query_lower: return True, "COMPLEX" # If none of the above conditions are met, it's likely a model name return False, None
  • Output formatter specifically for dbt_show results, converting various output formats to clean JSON.
    def show_formatter(output: Any) -> str: """ Formatter for dbt show command output. Args: output: The command output Returns: Formatted output string """ # Log the type and content of the output for debugging logger.info(f"show_formatter received output of type: {type(output)}") if isinstance(output, str): logger.info(f"Output string (first 100 chars): {output[:100]}") elif isinstance(output, (dict, list)): logger.info(f"Output structure: {json.dumps(output)[:100]}") # If output is already a dict or list, just return it as JSON if isinstance(output, (dict, list)): return json.dumps(output) # For string output, try to extract the JSON part if isinstance(output, str): try: # Look for JSON object in the output json_start = output.find('{') if json_start >= 0: # Extract everything from the first { to the end json_str = output[json_start:] logger.info(f"Extracted potential JSON: {json_str[:100]}...") # Try to parse it as JSON parsed_json = json.loads(json_str) logger.info(f"Successfully parsed JSON from output") # Return the parsed JSON return json.dumps(parsed_json) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON from output: {e}") # Try to convert tabular output to JSON if possible try: # Simple conversion of tabular data to JSON lines = str(output).strip().split("\n") logger.info(f"Number of lines in output: {len(lines)}") if len(lines) > 2: # Need at least header and one data row # Extract header row (assuming it's the first row) header = lines[0].strip().split("|") header = [h.strip() for h in header if h.strip()] logger.info(f"Extracted header: {header}") # Extract data rows (skip header and separator row) data_rows = [] for line in lines[2:]: if line.strip() and "|" in line: values = line.strip().split("|") values = [v.strip() for v in values if v.strip()] if len(values) == len(header): row_dict = dict(zip(header, values)) data_rows.append(row_dict) logger.info(f"Extracted {len(data_rows)} data rows") return json.dumps(data_rows) except Exception as e: logger.warning(f"Failed to convert tabular output to JSON: {e}") import traceback logger.warning(f"Traceback: {traceback.format_exc()}") # Default to string output if conversion fails return str(output)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MammothGrowth/dbt-cli-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server