# Databricks Integration Guidelines
## Databricks SDK Architecture
### Core Components
- **WorkspaceClient**: Main client for Databricks workspace operations
- **Service APIs**: Specialized clients for different Databricks services
- **Authentication**: OAuth flow through Databricks Apps
- **Simple Error Handling**: Basic error responses for all operations
### Project Structure
- [server/tools/](mdc:server/tools/) - MCP tools using Databricks SDK
- [server/services/](mdc:server/services/) - Business logic and service integrations
- [dba_mcp_proxy/](mdc:dba_mcp_proxy/) - OAuth authentication proxy
### Direct Client Initialization
#### Simple Client Pattern
```python
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from typing import Dict, Any
def get_workspace_client() -> WorkspaceClient:
"""
Get authenticated Databricks workspace client.
Returns:
Authenticated WorkspaceClient instance
Raises:
Exception: If authentication fails
"""
try:
return WorkspaceClient()
except DatabricksError as e:
raise Exception(f"Failed to initialize Databricks client: {e}")
def simple_databricks_operation(operation_name: str, operation_func):
"""
Execute Databricks operation with simple error handling.
Args:
operation_name: Name of the operation for error reporting
operation_func: Function that performs the operation
Returns:
Dictionary with operation result or error
"""
try:
client = get_workspace_client()
result = operation_func(client)
return {
"success": True,
"data": result,
"message": f"{operation_name} completed successfully"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"{operation_name} failed"
}
```
### Direct Service Operations
#### SQL Operations
```python
def execute_sql_query(query: str, warehouse_id: str) -> Dict[str, Any]:
"""Execute SQL query on Databricks SQL warehouse."""
def _execute(client):
return client.sql.execute_query(
query=query,
warehouse_id=warehouse_id
)
return simple_databricks_operation("SQL query execution", _execute)
def list_sql_warehouses() -> Dict[str, Any]:
"""List all SQL warehouses in the workspace."""
def _list(client):
warehouses = client.sql.list_warehouses()
return [{
"id": w.id,
"name": w.name,
"state": w.state,
"cluster_size": w.cluster_size
} for w in warehouses]
return simple_databricks_operation("SQL warehouse listing", _list)
```
#### Unity Catalog Operations
```python
def list_catalogs() -> Dict[str, Any]:
"""List all Unity Catalog catalogs."""
def _list(client):
catalogs = client.unity_catalog.list_catalogs()
return [{
"name": c.name,
"comment": c.comment,
"owner": c.owner,
"created_at": c.created_at.isoformat() if c.created_at else None
} for c in catalogs]
return simple_databricks_operation("Catalog listing", _list)
def describe_table(table_name: str) -> Dict[str, Any]:
"""Get detailed information about a Unity Catalog table."""
def _describe(client):
table = client.unity_catalog.get_table(table_name)
return {
"name": table.name,
"catalog_name": table.catalog_name,
"schema_name": table.schema_name,
"table_type": table.table_type,
"columns": [{
"name": col.name,
"type": col.type,
"comment": col.comment
} for col in table.columns] if table.columns else []
}
return simple_databricks_operation("Table description", _describe)
```
#### Jobs and Pipelines
```python
def list_jobs() -> Dict[str, Any]:
"""List all Databricks jobs."""
def _list(client):
jobs = client.jobs.list()
return [{
"job_id": j.job_id,
"name": j.settings.name,
"created_time": j.created_time.isoformat() if j.created_time else None,
"creator_user_name": j.creator_user_name
} for j in jobs]
return simple_databricks_operation("Job listing", _list)
def get_job_runs(job_id: str, limit: int = 10) -> Dict[str, Any]:
"""Get recent runs for a specific job."""
def _get_runs(client):
runs = client.jobs.list_runs(job_id=job_id, limit=limit)
return [{
"run_id": r.run_id,
"state": r.state.life_cycle_state if r.state else None,
"start_time": r.start_time.isoformat() if r.start_time else None,
"end_time": r.end_time.isoformat() if r.end_time else None
} for r in runs]
return simple_databricks_operation("Job runs retrieval", _get_runs)
```
### Data Management Operations
#### DBFS Operations
```python
def list_dbfs_path(path: str = "/") -> Dict[str, Any]:
"""List contents of a DBFS path."""
def _list(client):
files = client.dbfs.list(path)
return [{
"path": f.path,
"is_dir": f.is_dir,
"file_size": f.file_size
} for f in files]
return simple_databricks_operation("DBFS listing", _list)
def upload_file_to_dbfs(local_path: str, dbfs_path: str) -> Dict[str, Any]:
"""Upload file to DBFS."""
def _upload(client):
with open(local_path, 'rb') as f:
client.dbfs.put(dbfs_path, f.read(), overwrite=True)
return {"uploaded_path": dbfs_path}
return simple_databricks_operation("File upload", _upload)
```
#### Volume Operations
```python
def list_volumes(catalog_name: str, schema_name: str) -> Dict[str, Any]:
"""List volumes in a Unity Catalog schema."""
def _list(client):
volumes = client.unity_catalog.list_volumes(
catalog_name=catalog_name,
schema_name=schema_name
)
return [{
"name": v.name,
"full_name": v.full_name,
"owner": v.owner,
"created_at": v.created_at.isoformat() if v.created_at else None
} for v in volumes]
return simple_databricks_operation("Volume listing", _list)
```
### Simple Error Handling Patterns
#### Basic Error Handling
```python
from databricks.sdk.errors import (
DatabricksError,
NotFound,
PermissionDenied,
InvalidParameterValue
)
def handle_databricks_error(operation: str, error: Exception) -> Dict[str, Any]:
"""Handle Databricks-specific errors with simple responses."""
if isinstance(error, NotFound):
return {
"success": False,
"error": f"Resource not found: {error}",
"message": f"{operation} failed - resource not found"
}
elif isinstance(error, PermissionDenied):
return {
"success": False,
"error": f"Permission denied: {error}",
"message": f"{operation} failed - insufficient permissions"
}
elif isinstance(error, InvalidParameterValue):
return {
"success": False,
"error": f"Invalid parameter: {error}",
"message": f"{operation} failed - invalid parameters"
}
else:
return {
"success": False,
"error": str(error),
"message": f"{operation} failed - unexpected error"
}
```
### Best Practices
#### Performance
- **Direct SDK Calls**: Call Databricks SDK directly, no wrapper layers
- **Simple Operations**: Keep operations straightforward and focused
- **Resource Cleanup**: Properly close connections and clean up resources
#### Security
- **Input Validation**: Validate all input parameters
- **Error Sanitization**: Don't expose sensitive information
- **Authentication**: Always verify authentication before operations
- **Authorization**: Check permissions before performing operations
#### Monitoring
- **Logging**: Log all Databricks operations for debugging
- **Simple Metrics**: Track operation success rates
- **Error Tracking**: Monitor and alert on specific error types
### Forbidden Databricks Patterns (DO NOT ADD THESE)
❌ **Complex client wrappers** or abstraction layers around Databricks SDK
❌ **Custom authentication systems** - use Databricks OAuth only
❌ **Complex error handling systems** - keep error handling simple
❌ **Connection pooling** or caching layers
❌ **Custom retry logic** - use Databricks SDK retry mechanisms
❌ **Complex data transformation layers** - keep transformations simple
### Required Databricks Patterns (ALWAYS USE THESE)
✅ **Direct SDK calls** - call Databricks SDK directly
✅ **Simple error handling** - try/catch with basic error responses
✅ **Clear operation names** - descriptive function names
✅ **Basic input validation** - validate required parameters
✅ **Simple return structures** - consistent dictionary format
✅ **Direct client initialization** - use WorkspaceClient() directly
### Code Review Questions
Before adding any Databricks integration, ask yourself:
- "Is this the simplest way to call this Databricks API?"
- "Would a new developer understand this immediately?"
- "Am I adding abstraction for a real need or hypothetical flexibility?"
- "Can I solve this with direct Databricks SDK calls?"
- "Does this follow the existing Databricks patterns in the codebase?"
### Examples of Good vs Bad Databricks Integration
**❌ BAD (Over-engineered):**
```python
class AbstractDatabricksClient(ABC):
@abstractmethod
def execute_operation(self, params: Dict[str, Any]) -> Dict[str, Any]: ...
class SQLClient(AbstractDatabricksClient):
def __init__(self, client_factory: ClientFactory): ...
def execute_operation(self, params: Dict[str, Any]) -> Dict[str, Any]: ...
```
**✅ GOOD (Simple):**
```python
def execute_sql_query(query: str, warehouse_id: str) -> Dict[str, Any]:
"""Execute SQL query on Databricks."""
try:
client = WorkspaceClient()
result = client.sql.execute_query(query, warehouse_id=warehouse_id)
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
```
## Summary: Databricks Integration Principles
✅ **Readable**: Any developer can understand the integration immediately
✅ **Maintainable**: Simple patterns that are easy to modify
✅ **Focused**: Each integration has a single, clear purpose
✅ **Direct**: No unnecessary abstractions or indirection
✅ **Practical**: Integrates with Databricks without over-engineering
When in doubt, choose the **simpler** integration. Your future self (and your teammates) will thank you.