"""
Unity Catalog manager for advanced catalog operations and metadata management.
"""
import logging
from typing import Any, Dict, List, Optional, Set
from dataclasses import dataclass
from datetime import datetime
from .databricks_client import DatabricksClient, TableMetadata
logger = logging.getLogger(__name__)
@dataclass
class CatalogSummary:
"""Summary statistics for a catalog."""
name: str
schema_count: int
table_count: int
view_count: int
volume_count: int
total_size_bytes: Optional[int] = None
last_updated: Optional[datetime] = None
@dataclass
class SchemaSummary:
"""Summary statistics for a schema."""
name: str
catalog_name: str
table_count: int
view_count: int
volume_count: int
tables: List[str]
views: List[str]
@dataclass
class DataDiscoveryResult:
"""Result of data discovery operations."""
tables: List[TableMetadata]
schemas: List[str]
catalogs: List[str]
total_tables: int
search_term: str
filters_applied: Dict[str, Any]
class UnityCatalogManager:
"""Advanced Unity Catalog operations and metadata management."""
def __init__(self, databricks_client: DatabricksClient):
self.client = databricks_client
self._catalog_cache: Dict[str, CatalogSummary] = {}
self._schema_cache: Dict[str, SchemaSummary] = {}
async def get_catalog_summary(self, catalog_name: str, use_cache: bool = True) -> CatalogSummary:
"""Get comprehensive summary statistics for a catalog."""
cache_key = catalog_name
if use_cache and cache_key in self._catalog_cache:
return self._catalog_cache[cache_key]
try:
# Get all schemas in the catalog
schemas = await self.client.list_schemas(catalog_name)
schema_count = len(schemas)
total_tables = 0
total_views = 0
total_volumes = 0
# Count tables and views in each schema
for schema in schemas:
try:
tables = await self.client.list_tables(catalog_name, schema.name)
for table in tables:
if table.table_type and 'VIEW' in table.table_type.value:
total_views += 1
else:
total_tables += 1
except Exception as e:
logger.warning(f"Error counting tables in {catalog_name}.{schema.name}: {e}")
continue
summary = CatalogSummary(
name=catalog_name,
schema_count=schema_count,
table_count=total_tables,
view_count=total_views,
volume_count=total_volumes, # Volume counting would require additional API calls
last_updated=datetime.now()
)
self._catalog_cache[cache_key] = summary
return summary
except Exception as e:
logger.error(f"Error getting catalog summary for {catalog_name}: {e}")
raise
async def get_schema_summary(self, catalog_name: str, schema_name: str, use_cache: bool = True) -> SchemaSummary:
"""Get comprehensive summary statistics for a schema."""
cache_key = f"{catalog_name}.{schema_name}"
if use_cache and cache_key in self._schema_cache:
return self._schema_cache[cache_key]
try:
tables = await self.client.list_tables(catalog_name, schema_name)
table_names = []
view_names = []
for table in tables:
if table.table_type and 'VIEW' in table.table_type.value:
view_names.append(table.name)
else:
table_names.append(table.name)
summary = SchemaSummary(
name=schema_name,
catalog_name=catalog_name,
table_count=len(table_names),
view_count=len(view_names),
volume_count=0, # Would require additional API calls
tables=table_names,
views=view_names
)
self._schema_cache[cache_key] = summary
return summary
except Exception as e:
logger.error(f"Error getting schema summary for {catalog_name}.{schema_name}: {e}")
raise
async def discover_data(self,
search_patterns: List[str],
catalogs: Optional[List[str]] = None,
schemas: Optional[List[str]] = None,
table_types: Optional[List[str]] = None,
include_views: bool = True,
include_metadata: bool = True) -> DataDiscoveryResult:
"""
Advanced data discovery with multiple search patterns and filters.
Args:
search_patterns: List of patterns to search for in table names, comments, etc.
catalogs: Optional list of catalogs to limit search to
schemas: Optional list of schemas to limit search to
table_types: Optional list of table types to include
include_views: Whether to include views in results
include_metadata: Whether to fetch detailed metadata for matching tables
"""
try:
discovered_tables = []
discovered_schemas = set()
discovered_catalogs = set()
# Get catalogs to search
if catalogs:
catalogs_to_search = catalogs
else:
catalog_infos = await self.client.list_catalogs()
catalogs_to_search = [cat.name for cat in catalog_infos]
# Search each catalog
for catalog_name in catalogs_to_search:
try:
# Get schemas to search
if schemas:
schemas_to_search = [s for s in schemas if '.' not in s or s.startswith(f"{catalog_name}.")]
else:
schema_infos = await self.client.list_schemas(catalog_name)
schemas_to_search = [schema.name for schema in schema_infos]
# Search each schema
for schema_name in schemas_to_search:
try:
tables = await self.client.list_tables(catalog_name, schema_name)
# Filter and search tables
for table in tables:
# Apply table type filter
if table_types and table.table_type:
if table.table_type.value not in table_types:
continue
# Apply view filter
if not include_views and table.table_type and 'VIEW' in table.table_type.value:
continue
# Check if table matches any search pattern
matches = False
for pattern in search_patterns:
pattern_lower = pattern.lower()
if (pattern_lower in table.name.lower() or
(table.comment and pattern_lower in table.comment.lower())):
matches = True
break
if matches:
if include_metadata:
# Get detailed metadata
table_metadata = await self.client.describe_table(
catalog_name, schema_name, table.name
)
discovered_tables.append(table_metadata)
else:
# Create basic metadata from table info
basic_metadata = TableMetadata(
name=table.name,
catalog_name=catalog_name,
schema_name=schema_name,
table_type=table.table_type.value if table.table_type else "UNKNOWN",
comment=table.comment,
owner=table.owner,
storage_location=table.storage_location
)
discovered_tables.append(basic_metadata)
discovered_schemas.add(f"{catalog_name}.{schema_name}")
discovered_catalogs.add(catalog_name)
except Exception as e:
logger.warning(f"Error searching schema {catalog_name}.{schema_name}: {e}")
continue
except Exception as e:
logger.warning(f"Error searching catalog {catalog_name}: {e}")
continue
return DataDiscoveryResult(
tables=discovered_tables,
schemas=list(discovered_schemas),
catalogs=list(discovered_catalogs),
total_tables=len(discovered_tables),
search_term=" OR ".join(search_patterns),
filters_applied={
"catalogs": catalogs,
"schemas": schemas,
"table_types": table_types,
"include_views": include_views,
"include_metadata": include_metadata
}
)
except Exception as e:
logger.error(f"Error during data discovery: {e}")
raise
async def analyze_table_relationships(self, catalog_name: str, schema_name: str) -> Dict[str, Any]:
"""
Analyze relationships between tables in a schema based on naming patterns and column similarities.
"""
try:
tables = await self.client.list_tables(catalog_name, schema_name)
relationships = {
"schema": f"{catalog_name}.{schema_name}",
"table_count": len(tables),
"potential_relationships": [],
"naming_patterns": {},
"column_patterns": {}
}
# Analyze naming patterns
table_names = [table.name for table in tables]
prefixes = {}
suffixes = {}
for name in table_names:
# Extract potential prefixes (first word)
if '_' in name:
prefix = name.split('_')[0]
prefixes[prefix] = prefixes.get(prefix, 0) + 1
# Extract potential suffixes (last word)
if '_' in name:
suffix = name.split('_')[-1]
suffixes[suffix] = suffixes.get(suffix, 0) + 1
relationships["naming_patterns"] = {
"common_prefixes": {k: v for k, v in prefixes.items() if v > 1},
"common_suffixes": {k: v for k, v in suffixes.items() if v > 1}
}
# For detailed column analysis, we'd need to fetch table schemas
# This is a simplified version
relationships["analysis_note"] = "Detailed column relationship analysis requires individual table schema inspection"
return relationships
except Exception as e:
logger.error(f"Error analyzing table relationships: {e}")
raise
async def get_data_quality_insights(self, catalog_name: str, schema_name: str, table_name: str) -> Dict[str, Any]:
"""
Get basic data quality insights for a table.
"""
try:
# Get table metadata
table_metadata = await self.client.describe_table(catalog_name, schema_name, table_name)
# Sample some data for analysis
sample_result = await self.client.sample_table(catalog_name, schema_name, table_name, limit=100)
insights = {
"table": f"{catalog_name}.{schema_name}.{table_name}",
"column_count": len(table_metadata.columns),
"sample_row_count": sample_result.row_count,
"columns": [],
"potential_issues": []
}
# Analyze columns
for col in table_metadata.columns:
col_info = {
"name": col["name"],
"type": col["type"],
"nullable": col.get("nullable", True),
"has_comment": bool(col.get("comment"))
}
# Check for potential naming issues
if not col.get("comment"):
insights["potential_issues"].append(f"Column '{col['name']}' lacks documentation")
if col["name"].lower() in ["id", "key"] and col.get("nullable", True):
insights["potential_issues"].append(f"Potential key column '{col['name']}' is nullable")
insights["columns"].append(col_info)
# Analyze sample data if available
if sample_result.status == "SUCCESS" and sample_result.data:
for col_name in sample_result.columns:
# Check for null values in sample
null_count = sum(1 for row in sample_result.data if row.get(col_name) is None)
if null_count > 0:
null_percentage = (null_count / len(sample_result.data)) * 100
if null_percentage > 50:
insights["potential_issues"].append(
f"Column '{col_name}' has {null_percentage:.1f}% null values in sample"
)
return insights
except Exception as e:
logger.error(f"Error getting data quality insights: {e}")
raise
def clear_cache(self) -> None:
"""Clear the internal caches."""
self._catalog_cache.clear()
self._schema_cache.clear()
logger.info("Unity Catalog manager caches cleared")