Skip to main content
Glama

Databricks MCP Server

by moma1992
databricks_sdk.md8.03 kB
# Databricks SDK for Python **Official Documentation**: https://databricks-sdk-py.readthedocs.io/en/latest/ The Databricks SDK for Python provides programmatic access to the Databricks REST APIs, allowing you to manage workspace resources, execute queries, and interact with Databricks services directly from your Python applications. ## Installation Already included in this template: ```toml # pyproject.toml databricks-sdk==0.59.0 ``` ## Basic Setup ```python from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError # Initialize client (automatically uses your configured authentication) client = WorkspaceClient() ``` ## Authentication The SDK automatically uses your app's configured authentication: - **Personal Access Token (PAT)**: Uses `DATABRICKS_HOST` and `DATABRICKS_TOKEN` - **CLI Profile**: Uses `DATABRICKS_CONFIG_PROFILE` - **Service Principal**: When deployed to Databricks Apps ```python # Authentication is automatic - no additional setup needed client = WorkspaceClient() # Verify authentication try: user = client.current_user.me() print(f"Authenticated as: {user.user_name}") except DatabricksError as e: print(f"Authentication failed: {e}") ``` ## Core APIs ### Workspace Management ```python # List files and directories files = client.workspace.list('/Users/user@company.com') # Upload a file with open('script.py', 'rb') as f: client.workspace.upload('/Users/user@company.com/script.py', f.read()) # Download a file content = client.workspace.download('/Users/user@company.com/script.py') ``` ### Cluster Management ```python # List clusters clusters = client.clusters.list() # Get cluster details cluster = client.clusters.get('cluster-id') # Start a cluster client.clusters.start('cluster-id') # Create a cluster cluster_spec = { 'cluster_name': 'my-cluster', 'spark_version': '13.3.x-scala2.12', 'node_type_id': 'i3.xlarge', 'num_workers': 2 } cluster_id = client.clusters.create(**cluster_spec).cluster_id ``` ### SQL Warehouse Operations ```python # List SQL warehouses warehouses = client.warehouses.list() # Execute SQL query response = client.statement_execution.execute_statement( warehouse_id='warehouse-id', statement='SELECT * FROM my_table LIMIT 10' ) # Get query results if response.result: for row in response.result.data_array: print(row) ``` ### Jobs and Workflows ```python # List jobs jobs = client.jobs.list() # Run a job run = client.jobs.run_now(job_id=123) # Get job run status run_status = client.jobs.get_run(run.run_id) ``` ### Secrets Management ```python # List secret scopes scopes = client.secrets.list_scopes() # Get secret value secret_value = client.secrets.get_secret('scope-name', 'secret-key') # Create secret client.secrets.put_secret('scope-name', 'secret-key', 'secret-value') ``` ## Error Handling ```python from databricks.sdk.errors import ( NotFound, PermissionDenied, BadRequest, DatabricksError ) try: result = client.workspace.get_status('/path/to/file') except NotFound: print("File not found") except PermissionDenied: print("Permission denied") except BadRequest as e: print(f"Bad request: {e}") except DatabricksError as e: print(f"API error: {e}") ``` ## Common Patterns ### Pagination ```python # Most list operations support pagination for cluster in client.clusters.list(): print(f"Cluster: {cluster.cluster_name}") # Explicit pagination clusters = client.clusters.list(limit=10, offset=0) ``` ### Async Operations ```python # Wait for cluster to be ready cluster_id = client.clusters.create(...).cluster_id client.clusters.wait_get_cluster_running(cluster_id) # Wait for job completion run = client.jobs.run_now(job_id=123) final_state = client.jobs.wait_get_run_job_terminated_or_skipped(run.run_id) ``` ### Batch Operations ```python # Upload multiple files import os def upload_directory(local_dir, remote_dir): for root, dirs, files in os.walk(local_dir): for file in files: local_path = os.path.join(root, file) relative_path = os.path.relpath(local_path, local_dir) remote_path = f"{remote_dir}/{relative_path}".replace('\\', '/') with open(local_path, 'rb') as f: client.workspace.upload(remote_path, f.read(), overwrite=True) print(f"Uploaded: {relative_path}") ``` ## FastAPI Integration ### Create API Endpoints ```python from fastapi import APIRouter, HTTPException from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError router = APIRouter() client = WorkspaceClient() @router.get("/workspace/files") async def list_workspace_files(path: str = "/"): try: files = client.workspace.list(path) return [{"name": f.path, "type": f.object_type.value} for f in files] except DatabricksError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/clusters/{cluster_id}/start") async def start_cluster(cluster_id: str): try: client.clusters.start(cluster_id) return {"message": "Cluster start initiated"} except DatabricksError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/sql/execute") async def execute_sql(warehouse_id: str, query: str): try: response = client.statement_execution.execute_statement( warehouse_id=warehouse_id, statement=query ) return {"statement_id": response.statement_id, "status": response.status.state} except DatabricksError as e: raise HTTPException(status_code=400, detail=str(e)) ``` ## Advanced Features ### Unity Catalog Integration ```python # List catalogs catalogs = client.catalogs.list() # List schemas schemas = client.schemas.list('catalog_name') # List tables tables = client.tables.list('catalog_name.schema_name') # Get table info table = client.tables.get('catalog_name.schema_name.table_name') ``` ### Delta Sharing ```python # List shares shares = client.shares.list() # Get share details share = client.shares.get('share_name') # List recipients recipients = client.recipients.list() ``` ### Model Serving ```python # List serving endpoints endpoints = client.serving_endpoints.list() # Create serving endpoint endpoint_spec = { 'name': 'my-endpoint', 'config': { 'served_models': [{ 'model_name': 'my_model', 'model_version': '1', 'workload_size': 'Small' }] } } client.serving_endpoints.create(**endpoint_spec) ``` ## Configuration ### Environment Variables ```bash # Set in your .env.local file DATABRICKS_HOST=https://your-workspace.cloud.databricks.com DATABRICKS_TOKEN=your-personal-access-token # Or use profile DATABRICKS_CONFIG_PROFILE=your-profile-name ``` ### Custom Configuration ```python from databricks.sdk.config import Config # Custom configuration config = Config( host='https://your-workspace.cloud.databricks.com', token='your-token', retry_timeout_seconds=300 ) client = WorkspaceClient(config=config) ``` ## Best Practices 1. **Reuse client instances**: Create one `WorkspaceClient` per application 2. **Handle errors gracefully**: Always wrap API calls in try-catch blocks 3. **Use pagination**: Don't assume all results fit in one response 4. **Cache responses**: Cache frequently accessed data to reduce API calls 5. **Monitor rate limits**: Be aware of API rate limits and implement backoff 6. **Use async operations**: Wait for long-running operations to complete 7. **Secure credentials**: Never hardcode tokens in your code ## Resource Links - **Official Documentation**: https://databricks-sdk-py.readthedocs.io/en/latest/ - **GitHub Repository**: https://github.com/databricks/databricks-sdk-py - **API Reference**: https://docs.databricks.com/api/workspace/introduction - **SDK Examples**: https://github.com/databricks/databricks-sdk-py/tree/main/examples - **Authentication Guide**: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/moma1992/mcp-databricks-app'

If you have feedback or need assistance with the MCP directory API, please join our Discord server