Skip to main content
Glama
delta.py8.15 kB
""" API routes for Delta Lake operations. """ import logging from typing import Any, Dict, List, Optional, cast from fastapi import APIRouter, Depends, Request, status from src.delta_lake import data_store, delta_service from src.service.dependencies import auth, get_spark_session from src.settings import get_settings from src.service.models import ( DatabaseListRequest, DatabaseListResponse, DatabaseStructureRequest, DatabaseStructureResponse, TableCountRequest, TableCountResponse, TableListRequest, TableListResponse, TableQueryRequest, TableQueryResponse, TableSampleRequest, TableSampleResponse, TableSchemaRequest, TableSchemaResponse, TableSelectRequest, TableSelectResponse, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/delta", tags=["Delta Lake"]) def _extract_token_from_request(request: Request) -> Optional[str]: """Extract the Bearer token from the request Authorization header.""" auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): return auth_header[7:] # Remove "Bearer " prefix return None @router.post( "/databases/list", response_model=DatabaseListResponse, status_code=status.HTTP_200_OK, summary="List all databases in the Hive metastore", description="Lists all databases available in the Hive metastore, optionally using PostgreSQL for faster retrieval and filtered by user namespace.", operation_id="list_databases", ) def list_databases( body: DatabaseListRequest, http_request: Request, spark=Depends(get_spark_session), auth=Depends(auth), ) -> DatabaseListResponse: """ Endpoint to list all databases in the Hive metastore. Optionally filters by user/tenant namespace prefixes. """ # Extract auth token for namespace filtering auth_token = None if body.filter_by_namespace: auth_token = _extract_token_from_request(http_request) if not auth_token: raise ValueError("Authorization token required for namespace filtering") databases = cast( list[str], data_store.get_databases( spark=spark, use_hms=body.use_hms, return_json=False, filter_by_namespace=body.filter_by_namespace, auth_token=auth_token, ), ) return DatabaseListResponse(databases=databases) @router.post( "/databases/tables/list", response_model=TableListResponse, status_code=status.HTTP_200_OK, summary="List tables in a database", description="Lists all tables in a specific database, optionally using PostgreSQL for faster retrieval.", operation_id="list_database_tables", ) def list_database_tables( request: TableListRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableListResponse: """ Endpoint to list tables in a specific database. """ settings = get_settings() tables = cast( list[str], data_store.get_tables( database=request.database, spark=spark, use_hms=request.use_hms, return_json=False, settings=settings, ), ) return TableListResponse(tables=tables) @router.post( "/databases/tables/schema", response_model=TableSchemaResponse, status_code=status.HTTP_200_OK, summary="Get table schema", description="Gets the schema (column names) of a specific table in a database.", operation_id="get_table_schema", ) def get_table_schema( request: TableSchemaRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableSchemaResponse: """ Endpoint to get the schema of a specific table in a database. """ columns = cast( list[str], data_store.get_table_schema( database=request.database, table=request.table, spark=spark, return_json=False, ), ) return TableSchemaResponse(columns=columns) @router.post( "/databases/structure", response_model=DatabaseStructureResponse, status_code=status.HTTP_200_OK, summary="Get database structure", description="Gets the complete structure of all databases, optionally including table schemas.", operation_id="get_database_structure", ) def get_database_structure( request: DatabaseStructureRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> DatabaseStructureResponse: """ Endpoint to get the complete structure of all databases. """ settings = get_settings() structure = cast( dict[str, list[str] | dict[str, list[str]]], data_store.get_db_structure( spark=spark, with_schema=request.with_schema, use_hms=request.use_hms, return_json=False, settings=settings, ), ) return DatabaseStructureResponse(structure=structure) @router.post( "/tables/count", response_model=TableCountResponse, status_code=status.HTTP_200_OK, summary="Count rows in a Delta table", description="Gets the total row count for a specified Delta table.", operation_id="count_delta_table", ) def count_table( request: TableCountRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableCountResponse: """ Endpoint to count rows in a specific Delta table. """ count = delta_service.count_delta_table( spark=spark, database=request.database, table=request.table ) return TableCountResponse(count=count) @router.post( "/tables/sample", response_model=TableSampleResponse, status_code=status.HTTP_200_OK, summary="Sample data from a Delta table", description="Retrieves a small sample of rows from a specified Delta table.", operation_id="sample_delta_table", ) def sample_table( request: TableSampleRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableSampleResponse: """ Endpoint to get a sample of data from a specific Delta table. """ sample: List[Dict[str, Any]] = delta_service.sample_delta_table( spark=spark, database=request.database, table=request.table, limit=request.limit, columns=request.columns, where_clause=request.where_clause, ) return TableSampleResponse(sample=sample) @router.post( "/tables/query", response_model=TableQueryResponse, status_code=status.HTTP_200_OK, summary="Query a Delta table", description="Executes a SQL query against a specified Delta table.", operation_id="query_delta_table", ) def query_table( request: TableQueryRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableQueryResponse: """ Endpoint to execute a query against a specific Delta table. """ result: List[Dict[str, Any]] = delta_service.query_delta_table( spark=spark, query=request.query, ) return TableQueryResponse(result=result) @router.post( "/tables/select", response_model=TableSelectResponse, status_code=status.HTTP_200_OK, summary="Execute a structured SELECT query", description=( "Builds and executes a SELECT query from structured parameters. " "Supports column selection, aggregations (COUNT, SUM, AVG, MIN, MAX), " "JOINs, WHERE filters, GROUP BY, HAVING, ORDER BY, DISTINCT, and pagination. " "The backend builds the query safely, preventing SQL injection." ), operation_id="select_delta_table", ) def select_table( request: TableSelectRequest, spark=Depends(get_spark_session), auth=Depends(auth), ) -> TableSelectResponse: """ Endpoint to execute a structured SELECT query with pagination support. This endpoint allows users to build complex queries without writing raw SQL. The backend constructs the query from the provided parameters, ensuring security and proper escaping of all values. """ return delta_service.select_from_delta_table(spark=spark, request=request)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BERDataLakehouse/datalake-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server