MCP Databricks Server
from typing import Optional
from mcp.server.fastmcp import FastMCP
from dbapi import execute_statement
from databricks_formatter import format_query_results
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Configuration constants
DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST", "")
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN", "")
DATABRICKS_SQL_WAREHOUSE_ID = os.environ.get("DATABRICKS_SQL_WAREHOUSE_ID", "")
mcp = FastMCP("databricks")
@mcp.tool()
async def execute_sql_query(sql: str, warehouse_id: Optional[str] = None) -> str:
"""Execute a SQL query on Databricks and return the results.
Args:
sql: The SQL query to execute
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error executing SQL query: WAREHOUSE USED IS : {warehouse_id}{str(e)}"
@mcp.tool()
async def list_schemas(catalog: str, warehouse_id: Optional[str] = None) -> str:
"""List all available schemas in a Databricks catalog.
Args:
catalog: The catalog name to list schemas from
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"SHOW SCHEMAS IN {catalog}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error listing schemas: {str(e)}"
@mcp.tool()
async def list_tables(schema: str, warehouse_id: Optional[str] = None) -> str:
"""List all tables in a specific schema.
Args:
schema: The schema name to list tables from
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"SHOW TABLES IN {schema}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error listing tables: {str(e)}"
@mcp.tool()
async def describe_table(table_name: str, warehouse_id: Optional[str] = None) -> str:
"""Describe a table's schema.
Args:
table_name: The fully qualified table name (e.g., schema.table_name)
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"DESCRIBE TABLE {table_name}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error describing table: {str(e)}"
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')