Skip to main content
Glama

API Registry MCP Server

db_resources.py•12.9 kB
"""Database resources router for listing warehouses, catalogs, and schemas.""" import os from typing import Any, Dict, List from databricks.sdk import WorkspaceClient from databricks.sdk.core import Config from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel router = APIRouter() def get_workspace_client(request: Request) -> WorkspaceClient: """Get a WorkspaceClient with on-behalf-of user authentication. Falls back to OAuth service principal authentication if: - User token is not available - User has no access to warehouses AND catalogs Args: request: FastAPI Request object to extract user token from Returns: WorkspaceClient configured with appropriate authentication """ host = os.environ.get('DATABRICKS_HOST') # Try to get user token from request headers (on-behalf-of authentication) user_token = request.headers.get('x-forwarded-access-token') if user_token: # Try on-behalf-of authentication with user's token print(f"šŸ” Attempting OBO authentication for user") config = Config(host=host, token=user_token, auth_type='pat') user_client = WorkspaceClient(config=config) # Verify user has access to SQL warehouses has_warehouse_access = False try: warehouses = list(user_client.warehouses.list()) if warehouses: has_warehouse_access = True print(f"āœ… User has access to {len(warehouses)} warehouse(s)") except Exception as e: print(f"āš ļø User cannot list warehouses: {str(e)}") # If user has warehouse access, use OBO; otherwise fallback to service principal if has_warehouse_access: print(f"āœ… Using OBO authentication - user has warehouse access") return user_client else: print(f"āš ļø User has no warehouse access, falling back to service principal") return WorkspaceClient(host=host) else: # No user token - fall back to OAuth service principal authentication print(f"āš ļø No user token found, falling back to service principal") return WorkspaceClient(host=host) class Warehouse(BaseModel): """SQL Warehouse information.""" id: str name: str state: str size: str | None = None type: str | None = None class Catalog(BaseModel): """Catalog information.""" name: str comment: str | None = None class Schema(BaseModel): """Schema information.""" name: str catalog_name: str comment: str | None = None class CatalogSchema(BaseModel): """Combined catalog.schema information.""" catalog_name: str schema_name: str full_name: str comment: str | None = None @router.get('/warehouses') async def list_warehouses(request: Request, search: str = None) -> Dict[str, Any]: """List all SQL warehouses in the Databricks workspace. Args: search: Optional search filter (case-insensitive, matches warehouse name) Returns: Dictionary with list of warehouses and their details """ try: w = get_workspace_client(request) warehouses = [] search_lower = search.lower() if search else None for warehouse in w.warehouses.list(): # Apply search filter if search_lower and search_lower not in warehouse.name.lower(): continue warehouses.append( Warehouse( id=warehouse.id, name=warehouse.name, state=warehouse.state.value if warehouse.state else 'UNKNOWN', size=warehouse.cluster_size, type=warehouse.warehouse_type.value if warehouse.warehouse_type else None, ) ) return {'warehouses': [w.model_dump() for w in warehouses], 'count': len(warehouses)} except Exception as e: raise HTTPException(status_code=500, detail=f'Failed to list warehouses: {str(e)}') @router.get('/catalogs') async def list_catalogs(request: Request) -> Dict[str, Any]: """List all catalogs in the Databricks workspace. Returns: Dictionary with list of catalogs """ try: w = get_workspace_client(request) catalogs = [] for catalog in w.catalogs.list(): catalogs.append( Catalog( name=catalog.name, comment=catalog.comment if hasattr(catalog, 'comment') else None, ) ) return {'catalogs': [c.model_dump() for c in catalogs], 'count': len(catalogs)} except Exception as e: raise HTTPException(status_code=500, detail=f'Failed to list catalogs: {str(e)}') @router.get('/schemas/{catalog_name}') async def list_schemas(catalog_name: str, request: Request) -> Dict[str, Any]: """List all schemas in a specific catalog. Args: catalog_name: Name of the catalog Returns: Dictionary with list of schemas in the catalog """ try: w = get_workspace_client(request) schemas = [] for schema in w.schemas.list(catalog_name=catalog_name): schemas.append( Schema( name=schema.name, catalog_name=catalog_name, comment=schema.comment if hasattr(schema, 'comment') else None, ) ) return {'schemas': [s.model_dump() for s in schemas], 'catalog': catalog_name, 'count': len(schemas)} except Exception as e: raise HTTPException(status_code=500, detail=f'Failed to list schemas: {str(e)}') @router.get('/catalog-schemas') async def list_all_catalog_schemas( request: Request, limit: int = 100, search: str = None ) -> Dict[str, Any]: """List catalog.schema combinations available in the workspace. This is useful for populating a dropdown that shows catalog_name.schema_name format. Performance optimized with limit and search filtering. Args: limit: Maximum number of results to return (default: 100, helps with performance) search: Optional search filter (case-insensitive, matches catalog or schema name) Returns: Dictionary with list of catalog.schema combinations Note: Permission filtering removed for performance - individual schema permission checks were too slow (N API calls for N schemas). The API will return permission errors when user tries to use a schema they don't have access to. """ try: w = get_workspace_client(request) catalog_schemas = [] search_lower = search.lower() if search else None # Iterate through all catalogs for catalog in w.catalogs.list(): catalog_name = catalog.name # Skip catalog if search doesn't match if search_lower and search_lower not in catalog_name.lower(): # Still check schemas in case they match schemas_match = False else: schemas_match = True # For each catalog, get all schemas try: for schema in w.schemas.list(catalog_name=catalog_name): schema_name = schema.name full_name = f'{catalog_name}.{schema_name}' # Apply search filter if search_lower and search_lower not in full_name.lower(): continue catalog_schemas.append( CatalogSchema( catalog_name=catalog_name, schema_name=schema_name, full_name=full_name, comment=schema.comment if hasattr(schema, 'comment') else None, ) ) # Stop if we hit the limit if len(catalog_schemas) >= limit: break except Exception as e: # Skip catalogs that can't be accessed (permission denied) print(f'Warning: Could not list schemas for catalog {catalog_name}: {str(e)}') continue # Stop if we hit the limit if len(catalog_schemas) >= limit: break total_count = len(catalog_schemas) has_more = total_count == limit return { 'catalog_schemas': [cs.model_dump() for cs in catalog_schemas], 'count': total_count, 'has_more': has_more, 'limit': limit } except Exception as e: raise HTTPException(status_code=500, detail=f'Failed to list catalog schemas: {str(e)}') @router.get('/validate-api-registry-table') async def validate_api_registry_table(catalog: str, schema: str, warehouse_id: str, request: Request) -> Dict[str, Any]: """Validate if api_http_registry table exists in the specified catalog.schema. Args: catalog: Catalog name schema: Schema name warehouse_id: SQL warehouse ID to execute the validation query Returns: Dictionary indicating if the table exists and any error messages """ try: from databricks.sdk.service.sql import StatementState import time w = get_workspace_client(request) # Build table name with proper backtick quoting for special characters table_name = f'`{catalog}`.`{schema}`.`api_http_registry`' # Use DESCRIBE TABLE to check existence - this only reads metadata, not storage # Avoids Azure storage authorization issues with SELECT queries query = f'DESCRIBE TABLE {table_name}' print(f'šŸ” Validating table existence: {table_name}') # Execute the statement statement = w.statement_execution.execute_statement( warehouse_id=warehouse_id, statement=query, wait_timeout='30s' ) # Wait for completion max_wait = 30 start_time = time.time() while statement.status.state in [StatementState.PENDING, StatementState.RUNNING]: if time.time() - start_time > max_wait: return { 'exists': False, 'error': 'Validation query timed out', 'table_name': table_name, 'message': f'Could not validate table {table_name} within {max_wait} seconds', } time.sleep(0.5) statement = w.statement_execution.get_statement(statement.statement_id) # Check final state if statement.status.state == StatementState.SUCCEEDED: return { 'exists': True, 'table_name': table_name, 'catalog': catalog, 'schema': schema, 'message': f'Table {table_name} exists and is accessible', } else: error_message = statement.status.error.message if statement.status.error else 'Unknown error' # Check if it's a table not found error if 'TABLE_OR_VIEW_NOT_FOUND' in error_message or 'does not exist' in error_message.lower(): return { 'exists': False, 'error': 'TABLE_NOT_FOUND', 'table_name': table_name, 'message': f'No api_http_registry table exists in {catalog}.{schema}', 'suggestion': f'Run setup_api_http_registry_table.sql to create the table in {catalog}.{schema}', } else: return { 'exists': False, 'error': error_message, 'table_name': table_name, 'message': f'Error validating table: {error_message}', } except Exception as e: error_str = str(e) # Check if it's a table not found error if 'TABLE_OR_VIEW_NOT_FOUND' in error_str or 'does not exist' in error_str.lower(): return { 'exists': False, 'error': 'TABLE_NOT_FOUND', 'table_name': f'{catalog}.{schema}.api_http_registry', 'message': f'No api_http_registry table exists in {catalog}.{schema}', 'suggestion': f'Run setup_api_http_registry_table.sql to create the table in {catalog}.{schema}', } return { 'exists': False, 'error': str(e), 'table_name': f'{catalog}.{schema}.api_http_registry', 'message': f'Failed to validate table: {str(e)}', }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lucamilletti99/dataverse_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server