#!/usr/bin/env python3
"""
dbt MCP Server for Claude Code Integration
A Model Context Protocol (MCP) server that provides Claude Code with intelligent
dbt project assistance, including model operations, testing, and discovery.
Protocol Compliance:
- Full MCP 1.0 specification compliance
- Async/await pattern for all operations
- Proper error handling and logging
- Resource and tool management
- Type safety with Pydantic models
Backend Integration Features:
- DuckDB database integration
- Local dbt project management
- Model compilation and execution
- Data quality testing
- Discovery and metadata operations
"""
import asyncio
import logging
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
from mcp.server import Server
from mcp.server.models import InitializationOptions, ServerCapabilities
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
CallToolResult,
ListResourcesResult,
ListToolsResult,
ReadResourceResult,
)
from dotenv import load_dotenv
import json
# Load environment configuration
load_dotenv(".env.dbt-mcp")
# Configure logging
logging.basicConfig(
level=logging.INFO if os.getenv("MCP_DEBUG", "false").lower() != "true" else logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("dbt-mcp-server")
class DbtMCPServer:
"""
dbt MCP Server implementation for Claude Code integration.
Provides protocol-compliant MCP server with dbt-specific tools and resources
for intelligent data transformation assistance.
"""
def __init__(self):
self.server = Server("dbt-data-stack")
self.dbt_project_dir = Path(os.getenv("DBT_PROJECT_DIR", "./transform"))
self.dbt_profiles_dir = Path(os.getenv("DBT_PROFILES_DIR", "./transform/profiles/duckdb"))
self.dbt_profile = os.getenv("DBT_PROFILE", "data_stack")
self.dbt_target = os.getenv("DBT_TARGET", "dev")
self.duckdb_path = Path(os.getenv("DUCKDB_PATH", "./data/warehouse/data_stack.duckdb"))
# Tool group configuration
self.enable_cli_tools = os.getenv("DBT_MCP_ENABLE_CLI_TOOLS", "true").lower() == "true"
self.enable_discovery_tools = os.getenv("DBT_MCP_ENABLE_DISCOVERY_TOOLS", "true").lower() == "true"
self.enable_semantic_layer_tools = os.getenv("DBT_MCP_ENABLE_SEMANTIC_LAYER_TOOLS", "false").lower() == "true"
self.enable_remote_tools = os.getenv("DBT_MCP_ENABLE_REMOTE_TOOLS", "true").lower() == "true"
self._register_handlers()
def _register_handlers(self):
"""Register MCP protocol handlers"""
@self.server.list_tools()
async def handle_list_tools() -> ListToolsResult:
"""List available dbt tools based on configuration"""
tools = []
if self.enable_cli_tools:
tools.extend(self._get_cli_tools())
if self.enable_discovery_tools:
tools.extend(self._get_discovery_tools())
if self.enable_remote_tools:
tools.extend(self._get_remote_tools())
return ListToolsResult(tools=tools)
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle tool execution requests"""
try:
if name.startswith("dbt_"):
return await self._handle_dbt_tool(name, arguments)
elif name.startswith("discovery_"):
return await self._handle_discovery_tool(name, arguments)
elif name.startswith("remote_"):
return await self._handle_remote_tool(name, arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
@self.server.list_resources()
async def handle_list_resources() -> ListResourcesResult:
"""List available dbt project resources"""
resources = []
# dbt project configuration
resources.append(Resource(
uri=f"dbt://project/config",
name="dbt Project Configuration",
description="Current dbt project configuration and settings",
mimeType="application/json"
))
# Model resources
if self.dbt_project_dir.exists():
models_dir = self.dbt_project_dir / "models"
if models_dir.exists():
for model_file in models_dir.rglob("*.sql"):
rel_path = model_file.relative_to(models_dir)
resources.append(Resource(
uri=f"dbt://model/{rel_path.stem}",
name=f"Model: {rel_path.stem}",
description=f"dbt model at {rel_path}",
mimeType="text/sql"
))
return ListResourcesResult(resources=resources)
@self.server.read_resource()
async def handle_read_resource(uri: str) -> ReadResourceResult:
"""Read dbt project resources"""
try:
if uri.startswith("dbt://project/config"):
config = await self._get_project_config()
return ReadResourceResult(
contents=[TextContent(
type="text",
text=json.dumps(config, indent=2)
)]
)
elif uri.startswith("dbt://model/"):
model_name = uri.split("/")[-1]
content = await self._get_model_content(model_name)
return ReadResourceResult(
contents=[TextContent(type="text", text=content)]
)
else:
raise ValueError(f"Unknown resource URI: {uri}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {str(e)}")
return ReadResourceResult(
contents=[TextContent(
type="text",
text=f"Error reading resource: {str(e)}"
)]
)
def _get_cli_tools(self) -> List[Tool]:
"""Get dbt CLI tools"""
return [
Tool(
name="dbt_run",
description="Execute dbt models to build data transformations",
inputSchema={
"type": "object",
"properties": {
"models": {
"type": "string",
"description": "Specific models to run (optional, space-separated)"
},
"full_refresh": {
"type": "boolean",
"description": "Perform full refresh of incremental models",
"default": False
}
}
}
),
Tool(
name="dbt_test",
description="Run data quality tests on dbt models and sources",
inputSchema={
"type": "object",
"properties": {
"models": {
"type": "string",
"description": "Specific models to test (optional, space-separated)"
}
}
}
),
Tool(
name="dbt_compile",
description="Compile dbt models to executable SQL without running",
inputSchema={
"type": "object",
"properties": {
"models": {
"type": "string",
"description": "Specific models to compile (optional, space-separated)"
}
}
}
),
Tool(
name="dbt_build",
description="Run models, tests, snapshots and seeds in DAG order",
inputSchema={
"type": "object",
"properties": {
"models": {
"type": "string",
"description": "Specific models to build (optional, space-separated)"
}
}
}
)
]
def _get_discovery_tools(self) -> List[Tool]:
"""Get dbt discovery and metadata tools"""
return [
Tool(
name="discovery_list_models",
description="List all dbt models in the project with metadata",
inputSchema={
"type": "object",
"properties": {
"filter": {
"type": "string",
"description": "Filter models by name pattern (optional)"
}
}
}
),
Tool(
name="discovery_model_details",
description="Get detailed information about a specific dbt model",
inputSchema={
"type": "object",
"properties": {
"model_name": {
"type": "string",
"description": "Name of the model to analyze"
}
},
"required": ["model_name"]
}
),
Tool(
name="discovery_lineage",
description="Get data lineage for models and dependencies",
inputSchema={
"type": "object",
"properties": {
"model_name": {
"type": "string",
"description": "Model name for lineage analysis (optional for full lineage)"
}
}
}
)
]
def _get_remote_tools(self) -> List[Tool]:
"""Get remote execution and query tools"""
return [
Tool(
name="remote_query_database",
description="Execute SQL queries against the DuckDB database",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to execute"
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return",
"default": 100
}
},
"required": ["sql"]
}
),
Tool(
name="remote_describe_table",
description="Get schema and column information for database tables",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe"
},
"schema": {
"type": "string",
"description": "Schema name (default: main)",
"default": "main"
}
},
"required": ["table_name"]
}
)
]
async def _handle_dbt_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle dbt CLI tool execution"""
try:
dbt_command = name.replace("dbt_", "")
# Use the venv dbt if available, otherwise fall back to system dbt
venv_dbt = self.dbt_project_dir / "venv" / "bin" / "dbt"
dbt_executable = str(venv_dbt) if venv_dbt.exists() else "dbt"
cmd = [
dbt_executable, dbt_command,
"--profiles-dir", str(self.dbt_profiles_dir),
"--profile", self.dbt_profile,
"--target", self.dbt_target,
"--project-dir", str(self.dbt_project_dir)
]
# Add model selection if specified
if "models" in arguments and arguments["models"]:
cmd.extend(["--models", arguments["models"]])
# Add full-refresh flag if specified
if arguments.get("full_refresh", False):
cmd.append("--full-refresh")
# Set environment variables - override any problematic global settings
env = os.environ.copy()
env["DBT_WARN_ERROR_OPTIONS"] = "all"
env["DBT_PROJECT_DIR"] = str(self.dbt_project_dir)
env["DBT_PROFILES_DIR"] = str(self.dbt_profiles_dir)
env["DBT_PROFILE"] = self.dbt_profile
env["DBT_TARGET"] = self.dbt_target
logger.info(f"Executing dbt command: {' '.join(cmd)}")
# Execute dbt command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=str(self.dbt_project_dir.parent),
env=env,
timeout=300 # 5 minute timeout
)
output = f"Exit code: {result.returncode}\n\n"
output += f"STDOUT:\n{result.stdout}\n\n"
if result.stderr:
output += f"STDERR:\n{result.stderr}"
return CallToolResult(
content=[TextContent(type="text", text=output)],
isError=result.returncode != 0
)
except subprocess.TimeoutExpired:
return CallToolResult(
content=[TextContent(type="text", text="Command timed out after 5 minutes")],
isError=True
)
except Exception as e:
logger.error(f"Error executing dbt tool {name}: {str(e)}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _handle_discovery_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle dbt discovery tool execution"""
try:
if name == "discovery_list_models":
models = await self._discover_models(arguments.get("filter"))
return CallToolResult(
content=[TextContent(type="text", text=json.dumps(models, indent=2))]
)
elif name == "discovery_model_details":
details = await self._get_model_details(arguments["model_name"])
return CallToolResult(
content=[TextContent(type="text", text=json.dumps(details, indent=2))]
)
elif name == "discovery_lineage":
lineage = await self._get_model_lineage(arguments.get("model_name"))
return CallToolResult(
content=[TextContent(type="text", text=json.dumps(lineage, indent=2))]
)
else:
raise ValueError(f"Unknown discovery tool: {name}")
except Exception as e:
logger.error(f"Error executing discovery tool {name}: {str(e)}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _handle_remote_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle remote database tool execution"""
try:
if name == "remote_query_database":
result = await self._execute_sql_query(
arguments["sql"],
arguments.get("limit", 100)
)
return CallToolResult(
content=[TextContent(type="text", text=result)]
)
elif name == "remote_describe_table":
result = await self._describe_table(
arguments["table_name"],
arguments.get("schema", "main")
)
return CallToolResult(
content=[TextContent(type="text", text=result)]
)
else:
raise ValueError(f"Unknown remote tool: {name}")
except Exception as e:
logger.error(f"Error executing remote tool {name}: {str(e)}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _get_project_config(self) -> Dict[str, Any]:
"""Get dbt project configuration"""
config = {
"project_name": "data_stack",
"dbt_version": "1.6.0+",
"profile": self.dbt_profile,
"target": self.dbt_target,
"project_dir": str(self.dbt_project_dir),
"profiles_dir": str(self.dbt_profiles_dir),
"database_path": str(self.duckdb_path),
"tool_groups": {
"cli_tools": self.enable_cli_tools,
"discovery_tools": self.enable_discovery_tools,
"semantic_layer_tools": self.enable_semantic_layer_tools,
"remote_tools": self.enable_remote_tools
}
}
return config
async def _get_model_content(self, model_name: str) -> str:
"""Get content of a specific dbt model"""
models_dir = self.dbt_project_dir / "models"
# Search for model file
for model_file in models_dir.rglob(f"{model_name}.sql"):
return model_file.read_text()
raise FileNotFoundError(f"Model {model_name} not found")
async def _discover_models(self, filter_pattern: Optional[str] = None) -> List[Dict[str, Any]]:
"""Discover dbt models in the project"""
models = []
models_dir = self.dbt_project_dir / "models"
if not models_dir.exists():
return models
for model_file in models_dir.rglob("*.sql"):
rel_path = model_file.relative_to(models_dir)
model_name = rel_path.stem
if filter_pattern and filter_pattern not in model_name:
continue
models.append({
"name": model_name,
"path": str(rel_path),
"layer": rel_path.parts[0] if rel_path.parts else "unknown",
"size_bytes": model_file.stat().st_size,
"modified": model_file.stat().st_mtime
})
return sorted(models, key=lambda x: x["name"])
async def _get_model_details(self, model_name: str) -> Dict[str, Any]:
"""Get detailed information about a specific model"""
content = await self._get_model_content(model_name)
details = {
"name": model_name,
"content": content,
"line_count": len(content.splitlines()),
"references": self._extract_model_references(content),
"columns": self._extract_model_columns(content)
}
return details
async def _get_model_lineage(self, model_name: Optional[str] = None) -> Dict[str, Any]:
"""Get data lineage information"""
# Simplified lineage implementation
# In production, this would parse manifest.json or use dbt's metadata
lineage = {
"models": await self._discover_models(),
"note": "Full lineage parsing requires dbt manifest.json analysis"
}
if model_name:
lineage["focus_model"] = model_name
return lineage
async def _execute_sql_query(self, sql: str, limit: int = 100) -> str:
"""Execute SQL query against DuckDB"""
try:
import duckdb
conn = duckdb.connect(str(self.duckdb_path))
# Add LIMIT if not present and limit specified
if limit and "limit" not in sql.lower():
sql = f"{sql.rstrip(';')} LIMIT {limit}"
result = conn.execute(sql).fetchall()
columns = [desc[0] for desc in conn.description] if conn.description else []
# Format results as table
output = f"Query: {sql}\n\n"
if columns:
output += " | ".join(columns) + "\n"
output += "-" * (len(" | ".join(columns))) + "\n"
for row in result:
output += " | ".join(str(cell) for cell in row) + "\n"
output += f"\nRows returned: {len(result)}"
conn.close()
return output
except Exception as e:
return f"SQL Error: {str(e)}"
async def _describe_table(self, table_name: str, schema: str = "main") -> str:
"""Describe table schema and structure"""
try:
sql = f"DESCRIBE {schema}.{table_name}"
return await self._execute_sql_query(sql)
except Exception as e:
return f"Error describing table: {str(e)}"
def _extract_model_references(self, content: str) -> List[str]:
"""Extract model references from SQL content"""
import re
# Look for {{ ref('model_name') }} patterns
ref_pattern = r"\{\{\s*ref\(['\"]([^'\"]+)['\"]\)\s*\}\}"
matches = re.findall(ref_pattern, content)
return list(set(matches))
def _extract_model_columns(self, content: str) -> List[str]:
"""Extract column names from SQL content (simplified)"""
import re
# Very basic column extraction - in production would use SQL parser
lines = content.lower().split('\n')
columns = []
in_select = False
for line in lines:
line = line.strip()
if line.startswith('select'):
in_select = True
continue
elif line.startswith('from') and in_select:
break
elif in_select and line and not line.startswith('--'):
# Extract column name (very simplified)
if ',' in line:
col = line.split(',')[0].strip()
else:
col = line.strip()
if ' as ' in col:
col = col.split(' as ')[-1].strip()
elif '.' in col:
col = col.split('.')[-1].strip()
if col and col not in ['*', '{{', '}}']:
columns.append(col)
return columns[:10] # Limit to first 10 columns
async def main():
"""Main entry point for dbt MCP server"""
# Validate environment
dbt_project_dir = Path(os.getenv("DBT_PROJECT_DIR", "./transform"))
if not dbt_project_dir.exists():
logger.error(f"dbt project directory not found: {dbt_project_dir}")
sys.exit(1)
# Initialize and run server
server_instance = DbtMCPServer()
logger.info("Starting dbt MCP Server for Claude Code integration")
logger.info(f"dbt project: {dbt_project_dir}")
logger.info(f"Profile: {server_instance.dbt_profile}")
logger.info(f"Target: {server_instance.dbt_target}")
async with stdio_server() as (read_stream, write_stream):
await server_instance.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="dbt-data-stack",
server_version="1.0.0",
capabilities=ServerCapabilities(
tools={},
resources={}
)
)
)
if __name__ == "__main__":
asyncio.run(main())