Skip to main content
Glama
unity_catalog.py15.3 kB
""" Unity Catalog manager for advanced catalog operations and metadata management. """ import logging from typing import Any, Dict, List, Optional, Set from dataclasses import dataclass from datetime import datetime from .databricks_client import DatabricksClient, TableMetadata logger = logging.getLogger(__name__) @dataclass class CatalogSummary: """Summary statistics for a catalog.""" name: str schema_count: int table_count: int view_count: int volume_count: int total_size_bytes: Optional[int] = None last_updated: Optional[datetime] = None @dataclass class SchemaSummary: """Summary statistics for a schema.""" name: str catalog_name: str table_count: int view_count: int volume_count: int tables: List[str] views: List[str] @dataclass class DataDiscoveryResult: """Result of data discovery operations.""" tables: List[TableMetadata] schemas: List[str] catalogs: List[str] total_tables: int search_term: str filters_applied: Dict[str, Any] class UnityCatalogManager: """Advanced Unity Catalog operations and metadata management.""" def __init__(self, databricks_client: DatabricksClient): self.client = databricks_client self._catalog_cache: Dict[str, CatalogSummary] = {} self._schema_cache: Dict[str, SchemaSummary] = {} async def get_catalog_summary(self, catalog_name: str, use_cache: bool = True) -> CatalogSummary: """Get comprehensive summary statistics for a catalog.""" cache_key = catalog_name if use_cache and cache_key in self._catalog_cache: return self._catalog_cache[cache_key] try: # Get all schemas in the catalog schemas = await self.client.list_schemas(catalog_name) schema_count = len(schemas) total_tables = 0 total_views = 0 total_volumes = 0 # Count tables and views in each schema for schema in schemas: try: tables = await self.client.list_tables(catalog_name, schema.name) for table in tables: if table.table_type and 'VIEW' in table.table_type.value: total_views += 1 else: total_tables += 1 except Exception as e: logger.warning(f"Error counting tables in {catalog_name}.{schema.name}: {e}") continue summary = CatalogSummary( name=catalog_name, schema_count=schema_count, table_count=total_tables, view_count=total_views, volume_count=total_volumes, # Volume counting would require additional API calls last_updated=datetime.now() ) self._catalog_cache[cache_key] = summary return summary except Exception as e: logger.error(f"Error getting catalog summary for {catalog_name}: {e}") raise async def get_schema_summary(self, catalog_name: str, schema_name: str, use_cache: bool = True) -> SchemaSummary: """Get comprehensive summary statistics for a schema.""" cache_key = f"{catalog_name}.{schema_name}" if use_cache and cache_key in self._schema_cache: return self._schema_cache[cache_key] try: tables = await self.client.list_tables(catalog_name, schema_name) table_names = [] view_names = [] for table in tables: if table.table_type and 'VIEW' in table.table_type.value: view_names.append(table.name) else: table_names.append(table.name) summary = SchemaSummary( name=schema_name, catalog_name=catalog_name, table_count=len(table_names), view_count=len(view_names), volume_count=0, # Would require additional API calls tables=table_names, views=view_names ) self._schema_cache[cache_key] = summary return summary except Exception as e: logger.error(f"Error getting schema summary for {catalog_name}.{schema_name}: {e}") raise async def discover_data(self, search_patterns: List[str], catalogs: Optional[List[str]] = None, schemas: Optional[List[str]] = None, table_types: Optional[List[str]] = None, include_views: bool = True, include_metadata: bool = True) -> DataDiscoveryResult: """ Advanced data discovery with multiple search patterns and filters. Args: search_patterns: List of patterns to search for in table names, comments, etc. catalogs: Optional list of catalogs to limit search to schemas: Optional list of schemas to limit search to table_types: Optional list of table types to include include_views: Whether to include views in results include_metadata: Whether to fetch detailed metadata for matching tables """ try: discovered_tables = [] discovered_schemas = set() discovered_catalogs = set() # Get catalogs to search if catalogs: catalogs_to_search = catalogs else: catalog_infos = await self.client.list_catalogs() catalogs_to_search = [cat.name for cat in catalog_infos] # Search each catalog for catalog_name in catalogs_to_search: try: # Get schemas to search if schemas: schemas_to_search = [s for s in schemas if '.' not in s or s.startswith(f"{catalog_name}.")] else: schema_infos = await self.client.list_schemas(catalog_name) schemas_to_search = [schema.name for schema in schema_infos] # Search each schema for schema_name in schemas_to_search: try: tables = await self.client.list_tables(catalog_name, schema_name) # Filter and search tables for table in tables: # Apply table type filter if table_types and table.table_type: if table.table_type.value not in table_types: continue # Apply view filter if not include_views and table.table_type and 'VIEW' in table.table_type.value: continue # Check if table matches any search pattern matches = False for pattern in search_patterns: pattern_lower = pattern.lower() if (pattern_lower in table.name.lower() or (table.comment and pattern_lower in table.comment.lower())): matches = True break if matches: if include_metadata: # Get detailed metadata table_metadata = await self.client.describe_table( catalog_name, schema_name, table.name ) discovered_tables.append(table_metadata) else: # Create basic metadata from table info basic_metadata = TableMetadata( name=table.name, catalog_name=catalog_name, schema_name=schema_name, table_type=table.table_type.value if table.table_type else "UNKNOWN", comment=table.comment, owner=table.owner, storage_location=table.storage_location ) discovered_tables.append(basic_metadata) discovered_schemas.add(f"{catalog_name}.{schema_name}") discovered_catalogs.add(catalog_name) except Exception as e: logger.warning(f"Error searching schema {catalog_name}.{schema_name}: {e}") continue except Exception as e: logger.warning(f"Error searching catalog {catalog_name}: {e}") continue return DataDiscoveryResult( tables=discovered_tables, schemas=list(discovered_schemas), catalogs=list(discovered_catalogs), total_tables=len(discovered_tables), search_term=" OR ".join(search_patterns), filters_applied={ "catalogs": catalogs, "schemas": schemas, "table_types": table_types, "include_views": include_views, "include_metadata": include_metadata } ) except Exception as e: logger.error(f"Error during data discovery: {e}") raise async def analyze_table_relationships(self, catalog_name: str, schema_name: str) -> Dict[str, Any]: """ Analyze relationships between tables in a schema based on naming patterns and column similarities. """ try: tables = await self.client.list_tables(catalog_name, schema_name) relationships = { "schema": f"{catalog_name}.{schema_name}", "table_count": len(tables), "potential_relationships": [], "naming_patterns": {}, "column_patterns": {} } # Analyze naming patterns table_names = [table.name for table in tables] prefixes = {} suffixes = {} for name in table_names: # Extract potential prefixes (first word) if '_' in name: prefix = name.split('_')[0] prefixes[prefix] = prefixes.get(prefix, 0) + 1 # Extract potential suffixes (last word) if '_' in name: suffix = name.split('_')[-1] suffixes[suffix] = suffixes.get(suffix, 0) + 1 relationships["naming_patterns"] = { "common_prefixes": {k: v for k, v in prefixes.items() if v > 1}, "common_suffixes": {k: v for k, v in suffixes.items() if v > 1} } # For detailed column analysis, we'd need to fetch table schemas # This is a simplified version relationships["analysis_note"] = "Detailed column relationship analysis requires individual table schema inspection" return relationships except Exception as e: logger.error(f"Error analyzing table relationships: {e}") raise async def get_data_quality_insights(self, catalog_name: str, schema_name: str, table_name: str) -> Dict[str, Any]: """ Get basic data quality insights for a table. """ try: # Get table metadata table_metadata = await self.client.describe_table(catalog_name, schema_name, table_name) # Sample some data for analysis sample_result = await self.client.sample_table(catalog_name, schema_name, table_name, limit=100) insights = { "table": f"{catalog_name}.{schema_name}.{table_name}", "column_count": len(table_metadata.columns), "sample_row_count": sample_result.row_count, "columns": [], "potential_issues": [] } # Analyze columns for col in table_metadata.columns: col_info = { "name": col["name"], "type": col["type"], "nullable": col.get("nullable", True), "has_comment": bool(col.get("comment")) } # Check for potential naming issues if not col.get("comment"): insights["potential_issues"].append(f"Column '{col['name']}' lacks documentation") if col["name"].lower() in ["id", "key"] and col.get("nullable", True): insights["potential_issues"].append(f"Potential key column '{col['name']}' is nullable") insights["columns"].append(col_info) # Analyze sample data if available if sample_result.status == "SUCCESS" and sample_result.data: for col_name in sample_result.columns: # Check for null values in sample null_count = sum(1 for row in sample_result.data if row.get(col_name) is None) if null_count > 0: null_percentage = (null_count / len(sample_result.data)) * 100 if null_percentage > 50: insights["potential_issues"].append( f"Column '{col_name}' has {null_percentage:.1f}% null values in sample" ) return insights except Exception as e: logger.error(f"Error getting data quality insights: {e}") raise def clear_cache(self) -> None: """Clear the internal caches.""" self._catalog_cache.clear() self._schema_cache.clear() logger.info("Unity Catalog manager caches cleared")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knustx/databricks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server