dbt_show
Preview model transformation results without materializing data to inspect, debug, or demonstrate processed data while preserving the target database.
Instructions
Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database.
Returns:
Output from the dbt show command, defaulting to JSON format if not specified
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| models | Yes | Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref("model_name") }}' to reference other models. | |
| project_dir | No | ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.') | . |
| profiles_dir | No | Directory containing the profiles.yml file (defaults to project_dir if not specified) | |
| limit | No | Limit the number of rows returned | |
| output | No | Output format (json, table, etc.) | json |
Implementation Reference
- src/tools.py:351-475 (handler)Core MCP tool handler for 'dbt_show'. Executes dbt show command, handles inline SQL with security validation, model existence checks, and formats output using show_formatter.@mcp.tool() async def dbt_show( models: str = Field( description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models." ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), limit: Optional[int] = Field( default=None, description="Limit the number of rows returned" ), output: Optional[str] = Field( default="json", description="Output format (json, table, etc.)" ) ) -> str: """Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database. Returns: Output from the dbt show command, defaulting to JSON format if not specified """ # Use enhanced SQL detection is_inline_sql, sql_type = is_inline_sql_query(models) # If it's SQL, check for security risks if is_inline_sql: has_risk, risk_reason = contains_mutation_risk(models) if has_risk: logger.warning(f"Security risk detected in SQL: {risk_reason}") error_result = { "success": False, "output": f"Security validation failed: {risk_reason}. For security reasons, mutation operations are not allowed.", "error": "SecurityValidationError", "returncode": 1 } return await process_command_result( error_result, command_name="show", include_debug_info=True ) logger.info(f"dbt_show called with models={models}, is_inline_sql={is_inline_sql}") # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter if is_inline_sql: # Use regex to remove LIMIT clause from the SQL original_models = models models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE) logger.info(f"Stripped LIMIT clause: {original_models} -> {models}") # For inline SQL, use the --inline flag with the SQL as its value command = ["show", f"--inline={models}", "--output", output or "json"] # Only add --limit if the inline type is WITH or SELECT (select_inline vs meta_inline) if limit and sql_type in ["WITH", "SELECT"]: command.extend(["--limit", str(limit)]) logger.info(f"Executing dbt command: {' '.join(command)}") # Don't use --quiet for inline SQL to ensure we get error messages result = await execute_dbt_command(command, project_dir, profiles_dir) logger.info(f"Command result: success={result['success']}, returncode={result.get('returncode')}") if isinstance(result["output"], str): logger.info(f"Output (first 100 chars): {result['output'][:100]}") elif isinstance(result["output"], (dict, list)): logger.info(f"Output structure: {json.dumps(result['output'])[:100]}") # Check for specific error patterns in the output if not result["success"] or ( isinstance(result["output"], str) and any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"]) ): logger.warning(f"Error detected in output: {result['output'][:200]}") error_result = { "success": False, "output": f"Error executing inline SQL\n{result['output']}", "error": result["error"], "returncode": result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) else: # For regular model references, check if the model exists first check_command = ["ls", "-s", models] check_result = await execute_dbt_command(check_command, project_dir, profiles_dir) # If the model doesn't exist, return the error message if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]): error_result = { "success": False, "output": f"Model does not exist or is not enabled\n{check_result['output']}", "error": check_result["error"], "returncode": check_result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) # If the model exists, run the show command with --quiet and --output json command = ["show", "-s", models, "--quiet", "--output", output or "json"] if limit: command.extend(["--limit", str(limit)]) result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result( result, command_name="show", output_formatter=show_formatter, include_debug_info=True )
- src/server.py:88-90 (registration)Registers all dbt MCP tools by calling register_tools(mcp), which defines and decorates the dbt_show handler.# Register tools register_tools(mcp)
- src/tools.py:353-372 (schema)Input schema defined via Pydantic Field annotations, providing parameter descriptions, defaults, and types for the dbt_show tool.models: str = Field( description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models." ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), limit: Optional[int] = Field( default=None, description="Limit the number of rows returned" ), output: Optional[str] = Field( default="json", description="Output format (json, table, etc.)" ) ) -> str:
- src/tools.py:533-649 (helper)Helper function used by dbt_show to detect if 'models' input is inline SQL or model reference using multiple heuristics.def is_inline_sql_query(query: str) -> tuple[bool, Optional[str]]: """ Determine if the given string is an inline SQL query or a model reference. This function uses multiple heuristics to determine if a string is likely an SQL query rather than a model name: 1. Checks for common SQL keywords at the beginning 2. Looks for SQL syntax patterns 3. Considers length and complexity 4. Handles SQL with comments (both single-line and multi-line) 5. Recognizes dbt templating syntax Args: query: The string to check Returns: A tuple of (is_sql, sql_type) where: - is_sql: True if the input is SQL, False otherwise - sql_type: The type of SQL statement if is_sql is True, None otherwise (e.g., "SELECT", "WITH", "SHOW", etc.) """ # Normalize the query by trimming whitespace normalized_query = query.strip() # Skip empty queries if not normalized_query: return False, None # Check if the query contains SQL comments has_single_line_comment = '--' in normalized_query has_multi_line_comment = '/*' in normalized_query and '*/' in normalized_query # If the query only contains comments, it's still SQL if has_single_line_comment or has_multi_line_comment: # Check if it's only comments by removing them and seeing if anything remains # Remove /* */ style comments sql_no_comments = re.sub(r'/\*.*?\*/', ' ', normalized_query, flags=re.DOTALL) # Remove -- style comments sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE) # Normalize whitespace sql_no_comments = ' '.join(sql_no_comments.split()).strip() if not sql_no_comments: # If nothing remains after removing comments, it's only comments return True, "COMMENT" # Convert to lowercase for case-insensitive matching normalized_query_lower = normalized_query.lower() # Check for SQL comments at the beginning and skip them for detection # This handles both single-line (--) and multi-line (/* */) comments comment_pattern = r'^(\s*(--[^\n]*\n|\s*/\*.*?\*/\s*)*\s*)' match = re.match(comment_pattern, normalized_query_lower, re.DOTALL) if match: # Skip past the comments for keyword detection start_pos = match.end() if start_pos >= len(normalized_query_lower): # If the query is only comments, it's still SQL return True, "COMMENT" normalized_query_lower = normalized_query_lower[start_pos:] # Common SQL statement starting keywords sql_starters = { 'select': 'SELECT', 'with': 'WITH', 'show': 'SHOW', 'describe': 'DESCRIBE', 'explain': 'EXPLAIN', 'analyze': 'ANALYZE', 'use': 'USE', 'set': 'SET' } # Check if the query starts with a common SQL keyword for keyword, sql_type in sql_starters.items(): if normalized_query_lower.startswith(keyword + ' '): return True, sql_type # Check for more complex patterns like CTEs # WITH clause followed by identifier and AS cte_pattern = r'^\s*with\s+[a-z0-9_]+\s+as\s*\(' if re.search(cte_pattern, normalized_query_lower, re.IGNORECASE): return True, "WITH" # Check for Jinja templating with SQL inside jinja_sql_pattern = r'{{\s*sql\s*}}' if re.search(jinja_sql_pattern, normalized_query_lower): return True, "JINJA" # Check for dbt ref or source macros which indicate SQL dbt_macro_pattern = r'{{\s*(ref|source)\s*\(\s*[\'"]' if re.search(dbt_macro_pattern, normalized_query_lower): return True, "DBT_MACRO" # If the query contains certain SQL syntax elements, it's likely SQL sql_syntax_elements = [ r'\bfrom\s+[a-z0-9_]+', # FROM clause r'\bjoin\s+[a-z0-9_]+', # JOIN clause r'\bwhere\s+', # WHERE clause r'\bgroup\s+by\s+', # GROUP BY clause r'\border\s+by\s+', # ORDER BY clause r'\bhaving\s+', # HAVING clause r'\bunion\s+', # UNION operator r'\bcase\s+when\s+' # CASE expression ] for pattern in sql_syntax_elements: if re.search(pattern, normalized_query_lower, re.IGNORECASE): return True, "SQL_SYNTAX" # If the query is long and contains spaces, it's more likely to be SQL than a model name if len(normalized_query_lower) > 30 and ' ' in normalized_query_lower: return True, "COMPLEX" # If none of the above conditions are met, it's likely a model name return False, None
- src/formatters.py:93-162 (helper)Output formatter specifically for dbt_show results, converting various output formats to clean JSON.def show_formatter(output: Any) -> str: """ Formatter for dbt show command output. Args: output: The command output Returns: Formatted output string """ # Log the type and content of the output for debugging logger.info(f"show_formatter received output of type: {type(output)}") if isinstance(output, str): logger.info(f"Output string (first 100 chars): {output[:100]}") elif isinstance(output, (dict, list)): logger.info(f"Output structure: {json.dumps(output)[:100]}") # If output is already a dict or list, just return it as JSON if isinstance(output, (dict, list)): return json.dumps(output) # For string output, try to extract the JSON part if isinstance(output, str): try: # Look for JSON object in the output json_start = output.find('{') if json_start >= 0: # Extract everything from the first { to the end json_str = output[json_start:] logger.info(f"Extracted potential JSON: {json_str[:100]}...") # Try to parse it as JSON parsed_json = json.loads(json_str) logger.info(f"Successfully parsed JSON from output") # Return the parsed JSON return json.dumps(parsed_json) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON from output: {e}") # Try to convert tabular output to JSON if possible try: # Simple conversion of tabular data to JSON lines = str(output).strip().split("\n") logger.info(f"Number of lines in output: {len(lines)}") if len(lines) > 2: # Need at least header and one data row # Extract header row (assuming it's the first row) header = lines[0].strip().split("|") header = [h.strip() for h in header if h.strip()] logger.info(f"Extracted header: {header}") # Extract data rows (skip header and separator row) data_rows = [] for line in lines[2:]: if line.strip() and "|" in line: values = line.strip().split("|") values = [v.strip() for v in values if v.strip()] if len(values) == len(header): row_dict = dict(zip(header, values)) data_rows.append(row_dict) logger.info(f"Extracted {len(data_rows)} data rows") return json.dumps(data_rows) except Exception as e: logger.warning(f"Failed to convert tabular output to JSON: {e}") import traceback logger.warning(f"Traceback: {traceback.format_exc()}") # Default to string output if conversion fails return str(output)