Skip to main content
Glama
ajdoyl2

dbt MCP Server

by ajdoyl2
dbt_server.py25.3 kB
#!/usr/bin/env python3 """ dbt MCP Server for Claude Code Integration A Model Context Protocol (MCP) server that provides Claude Code with intelligent dbt project assistance, including model operations, testing, and discovery. Protocol Compliance: - Full MCP 1.0 specification compliance - Async/await pattern for all operations - Proper error handling and logging - Resource and tool management - Type safety with Pydantic models Backend Integration Features: - DuckDB database integration - Local dbt project management - Model compilation and execution - Data quality testing - Discovery and metadata operations """ import asyncio import logging import os import subprocess import sys from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Union from mcp.server import Server from mcp.server.models import InitializationOptions, ServerCapabilities from mcp.server.stdio import stdio_server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, CallToolResult, ListResourcesResult, ListToolsResult, ReadResourceResult, ) from dotenv import load_dotenv import json # Load environment configuration load_dotenv(".env.dbt-mcp") # Configure logging logging.basicConfig( level=logging.INFO if os.getenv("MCP_DEBUG", "false").lower() != "true" else logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("dbt-mcp-server") class DbtMCPServer: """ dbt MCP Server implementation for Claude Code integration. Provides protocol-compliant MCP server with dbt-specific tools and resources for intelligent data transformation assistance. """ def __init__(self): self.server = Server("dbt-data-stack") self.dbt_project_dir = Path(os.getenv("DBT_PROJECT_DIR", "./transform")) self.dbt_profiles_dir = Path(os.getenv("DBT_PROFILES_DIR", "./transform/profiles/duckdb")) self.dbt_profile = os.getenv("DBT_PROFILE", "data_stack") self.dbt_target = os.getenv("DBT_TARGET", "dev") self.duckdb_path = Path(os.getenv("DUCKDB_PATH", "./data/warehouse/data_stack.duckdb")) # Tool group configuration self.enable_cli_tools = os.getenv("DBT_MCP_ENABLE_CLI_TOOLS", "true").lower() == "true" self.enable_discovery_tools = os.getenv("DBT_MCP_ENABLE_DISCOVERY_TOOLS", "true").lower() == "true" self.enable_semantic_layer_tools = os.getenv("DBT_MCP_ENABLE_SEMANTIC_LAYER_TOOLS", "false").lower() == "true" self.enable_remote_tools = os.getenv("DBT_MCP_ENABLE_REMOTE_TOOLS", "true").lower() == "true" self._register_handlers() def _register_handlers(self): """Register MCP protocol handlers""" @self.server.list_tools() async def handle_list_tools() -> ListToolsResult: """List available dbt tools based on configuration""" tools = [] if self.enable_cli_tools: tools.extend(self._get_cli_tools()) if self.enable_discovery_tools: tools.extend(self._get_discovery_tools()) if self.enable_remote_tools: tools.extend(self._get_remote_tools()) return ListToolsResult(tools=tools) @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult: """Handle tool execution requests""" try: if name.startswith("dbt_"): return await self._handle_dbt_tool(name, arguments) elif name.startswith("discovery_"): return await self._handle_discovery_tool(name, arguments) elif name.startswith("remote_"): return await self._handle_remote_tool(name, arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}") return CallToolResult( content=[TextContent(type="text", text=f"Error: {str(e)}")], isError=True ) @self.server.list_resources() async def handle_list_resources() -> ListResourcesResult: """List available dbt project resources""" resources = [] # dbt project configuration resources.append(Resource( uri=f"dbt://project/config", name="dbt Project Configuration", description="Current dbt project configuration and settings", mimeType="application/json" )) # Model resources if self.dbt_project_dir.exists(): models_dir = self.dbt_project_dir / "models" if models_dir.exists(): for model_file in models_dir.rglob("*.sql"): rel_path = model_file.relative_to(models_dir) resources.append(Resource( uri=f"dbt://model/{rel_path.stem}", name=f"Model: {rel_path.stem}", description=f"dbt model at {rel_path}", mimeType="text/sql" )) return ListResourcesResult(resources=resources) @self.server.read_resource() async def handle_read_resource(uri: str) -> ReadResourceResult: """Read dbt project resources""" try: if uri.startswith("dbt://project/config"): config = await self._get_project_config() return ReadResourceResult( contents=[TextContent( type="text", text=json.dumps(config, indent=2) )] ) elif uri.startswith("dbt://model/"): model_name = uri.split("/")[-1] content = await self._get_model_content(model_name) return ReadResourceResult( contents=[TextContent(type="text", text=content)] ) else: raise ValueError(f"Unknown resource URI: {uri}") except Exception as e: logger.error(f"Error reading resource {uri}: {str(e)}") return ReadResourceResult( contents=[TextContent( type="text", text=f"Error reading resource: {str(e)}" )] ) def _get_cli_tools(self) -> List[Tool]: """Get dbt CLI tools""" return [ Tool( name="dbt_run", description="Execute dbt models to build data transformations", inputSchema={ "type": "object", "properties": { "models": { "type": "string", "description": "Specific models to run (optional, space-separated)" }, "full_refresh": { "type": "boolean", "description": "Perform full refresh of incremental models", "default": False } } } ), Tool( name="dbt_test", description="Run data quality tests on dbt models and sources", inputSchema={ "type": "object", "properties": { "models": { "type": "string", "description": "Specific models to test (optional, space-separated)" } } } ), Tool( name="dbt_compile", description="Compile dbt models to executable SQL without running", inputSchema={ "type": "object", "properties": { "models": { "type": "string", "description": "Specific models to compile (optional, space-separated)" } } } ), Tool( name="dbt_build", description="Run models, tests, snapshots and seeds in DAG order", inputSchema={ "type": "object", "properties": { "models": { "type": "string", "description": "Specific models to build (optional, space-separated)" } } } ) ] def _get_discovery_tools(self) -> List[Tool]: """Get dbt discovery and metadata tools""" return [ Tool( name="discovery_list_models", description="List all dbt models in the project with metadata", inputSchema={ "type": "object", "properties": { "filter": { "type": "string", "description": "Filter models by name pattern (optional)" } } } ), Tool( name="discovery_model_details", description="Get detailed information about a specific dbt model", inputSchema={ "type": "object", "properties": { "model_name": { "type": "string", "description": "Name of the model to analyze" } }, "required": ["model_name"] } ), Tool( name="discovery_lineage", description="Get data lineage for models and dependencies", inputSchema={ "type": "object", "properties": { "model_name": { "type": "string", "description": "Model name for lineage analysis (optional for full lineage)" } } } ) ] def _get_remote_tools(self) -> List[Tool]: """Get remote execution and query tools""" return [ Tool( name="remote_query_database", description="Execute SQL queries against the DuckDB database", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "SQL query to execute" }, "limit": { "type": "integer", "description": "Maximum number of rows to return", "default": 100 } }, "required": ["sql"] } ), Tool( name="remote_describe_table", description="Get schema and column information for database tables", inputSchema={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table to describe" }, "schema": { "type": "string", "description": "Schema name (default: main)", "default": "main" } }, "required": ["table_name"] } ) ] async def _handle_dbt_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult: """Handle dbt CLI tool execution""" try: dbt_command = name.replace("dbt_", "") # Use the venv dbt if available, otherwise fall back to system dbt venv_dbt = self.dbt_project_dir / "venv" / "bin" / "dbt" dbt_executable = str(venv_dbt) if venv_dbt.exists() else "dbt" cmd = [ dbt_executable, dbt_command, "--profiles-dir", str(self.dbt_profiles_dir), "--profile", self.dbt_profile, "--target", self.dbt_target, "--project-dir", str(self.dbt_project_dir) ] # Add model selection if specified if "models" in arguments and arguments["models"]: cmd.extend(["--models", arguments["models"]]) # Add full-refresh flag if specified if arguments.get("full_refresh", False): cmd.append("--full-refresh") # Set environment variables - override any problematic global settings env = os.environ.copy() env["DBT_WARN_ERROR_OPTIONS"] = "all" env["DBT_PROJECT_DIR"] = str(self.dbt_project_dir) env["DBT_PROFILES_DIR"] = str(self.dbt_profiles_dir) env["DBT_PROFILE"] = self.dbt_profile env["DBT_TARGET"] = self.dbt_target logger.info(f"Executing dbt command: {' '.join(cmd)}") # Execute dbt command result = subprocess.run( cmd, capture_output=True, text=True, cwd=str(self.dbt_project_dir.parent), env=env, timeout=300 # 5 minute timeout ) output = f"Exit code: {result.returncode}\n\n" output += f"STDOUT:\n{result.stdout}\n\n" if result.stderr: output += f"STDERR:\n{result.stderr}" return CallToolResult( content=[TextContent(type="text", text=output)], isError=result.returncode != 0 ) except subprocess.TimeoutExpired: return CallToolResult( content=[TextContent(type="text", text="Command timed out after 5 minutes")], isError=True ) except Exception as e: logger.error(f"Error executing dbt tool {name}: {str(e)}") return CallToolResult( content=[TextContent(type="text", text=f"Error: {str(e)}")], isError=True ) async def _handle_discovery_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult: """Handle dbt discovery tool execution""" try: if name == "discovery_list_models": models = await self._discover_models(arguments.get("filter")) return CallToolResult( content=[TextContent(type="text", text=json.dumps(models, indent=2))] ) elif name == "discovery_model_details": details = await self._get_model_details(arguments["model_name"]) return CallToolResult( content=[TextContent(type="text", text=json.dumps(details, indent=2))] ) elif name == "discovery_lineage": lineage = await self._get_model_lineage(arguments.get("model_name")) return CallToolResult( content=[TextContent(type="text", text=json.dumps(lineage, indent=2))] ) else: raise ValueError(f"Unknown discovery tool: {name}") except Exception as e: logger.error(f"Error executing discovery tool {name}: {str(e)}") return CallToolResult( content=[TextContent(type="text", text=f"Error: {str(e)}")], isError=True ) async def _handle_remote_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult: """Handle remote database tool execution""" try: if name == "remote_query_database": result = await self._execute_sql_query( arguments["sql"], arguments.get("limit", 100) ) return CallToolResult( content=[TextContent(type="text", text=result)] ) elif name == "remote_describe_table": result = await self._describe_table( arguments["table_name"], arguments.get("schema", "main") ) return CallToolResult( content=[TextContent(type="text", text=result)] ) else: raise ValueError(f"Unknown remote tool: {name}") except Exception as e: logger.error(f"Error executing remote tool {name}: {str(e)}") return CallToolResult( content=[TextContent(type="text", text=f"Error: {str(e)}")], isError=True ) async def _get_project_config(self) -> Dict[str, Any]: """Get dbt project configuration""" config = { "project_name": "data_stack", "dbt_version": "1.6.0+", "profile": self.dbt_profile, "target": self.dbt_target, "project_dir": str(self.dbt_project_dir), "profiles_dir": str(self.dbt_profiles_dir), "database_path": str(self.duckdb_path), "tool_groups": { "cli_tools": self.enable_cli_tools, "discovery_tools": self.enable_discovery_tools, "semantic_layer_tools": self.enable_semantic_layer_tools, "remote_tools": self.enable_remote_tools } } return config async def _get_model_content(self, model_name: str) -> str: """Get content of a specific dbt model""" models_dir = self.dbt_project_dir / "models" # Search for model file for model_file in models_dir.rglob(f"{model_name}.sql"): return model_file.read_text() raise FileNotFoundError(f"Model {model_name} not found") async def _discover_models(self, filter_pattern: Optional[str] = None) -> List[Dict[str, Any]]: """Discover dbt models in the project""" models = [] models_dir = self.dbt_project_dir / "models" if not models_dir.exists(): return models for model_file in models_dir.rglob("*.sql"): rel_path = model_file.relative_to(models_dir) model_name = rel_path.stem if filter_pattern and filter_pattern not in model_name: continue models.append({ "name": model_name, "path": str(rel_path), "layer": rel_path.parts[0] if rel_path.parts else "unknown", "size_bytes": model_file.stat().st_size, "modified": model_file.stat().st_mtime }) return sorted(models, key=lambda x: x["name"]) async def _get_model_details(self, model_name: str) -> Dict[str, Any]: """Get detailed information about a specific model""" content = await self._get_model_content(model_name) details = { "name": model_name, "content": content, "line_count": len(content.splitlines()), "references": self._extract_model_references(content), "columns": self._extract_model_columns(content) } return details async def _get_model_lineage(self, model_name: Optional[str] = None) -> Dict[str, Any]: """Get data lineage information""" # Simplified lineage implementation # In production, this would parse manifest.json or use dbt's metadata lineage = { "models": await self._discover_models(), "note": "Full lineage parsing requires dbt manifest.json analysis" } if model_name: lineage["focus_model"] = model_name return lineage async def _execute_sql_query(self, sql: str, limit: int = 100) -> str: """Execute SQL query against DuckDB""" try: import duckdb conn = duckdb.connect(str(self.duckdb_path)) # Add LIMIT if not present and limit specified if limit and "limit" not in sql.lower(): sql = f"{sql.rstrip(';')} LIMIT {limit}" result = conn.execute(sql).fetchall() columns = [desc[0] for desc in conn.description] if conn.description else [] # Format results as table output = f"Query: {sql}\n\n" if columns: output += " | ".join(columns) + "\n" output += "-" * (len(" | ".join(columns))) + "\n" for row in result: output += " | ".join(str(cell) for cell in row) + "\n" output += f"\nRows returned: {len(result)}" conn.close() return output except Exception as e: return f"SQL Error: {str(e)}" async def _describe_table(self, table_name: str, schema: str = "main") -> str: """Describe table schema and structure""" try: sql = f"DESCRIBE {schema}.{table_name}" return await self._execute_sql_query(sql) except Exception as e: return f"Error describing table: {str(e)}" def _extract_model_references(self, content: str) -> List[str]: """Extract model references from SQL content""" import re # Look for {{ ref('model_name') }} patterns ref_pattern = r"\{\{\s*ref\(['\"]([^'\"]+)['\"]\)\s*\}\}" matches = re.findall(ref_pattern, content) return list(set(matches)) def _extract_model_columns(self, content: str) -> List[str]: """Extract column names from SQL content (simplified)""" import re # Very basic column extraction - in production would use SQL parser lines = content.lower().split('\n') columns = [] in_select = False for line in lines: line = line.strip() if line.startswith('select'): in_select = True continue elif line.startswith('from') and in_select: break elif in_select and line and not line.startswith('--'): # Extract column name (very simplified) if ',' in line: col = line.split(',')[0].strip() else: col = line.strip() if ' as ' in col: col = col.split(' as ')[-1].strip() elif '.' in col: col = col.split('.')[-1].strip() if col and col not in ['*', '{{', '}}']: columns.append(col) return columns[:10] # Limit to first 10 columns async def main(): """Main entry point for dbt MCP server""" # Validate environment dbt_project_dir = Path(os.getenv("DBT_PROJECT_DIR", "./transform")) if not dbt_project_dir.exists(): logger.error(f"dbt project directory not found: {dbt_project_dir}") sys.exit(1) # Initialize and run server server_instance = DbtMCPServer() logger.info("Starting dbt MCP Server for Claude Code integration") logger.info(f"dbt project: {dbt_project_dir}") logger.info(f"Profile: {server_instance.dbt_profile}") logger.info(f"Target: {server_instance.dbt_target}") async with stdio_server() as (read_stream, write_stream): await server_instance.server.run( read_stream, write_stream, InitializationOptions( server_name="dbt-data-stack", server_version="1.0.0", capabilities=ServerCapabilities( tools={}, resources={} ) ) ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ajdoyl2/claude-data-stack-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server