"""Governance and lineage MCP tools for Databricks."""
import os
from databricks.sdk import WorkspaceClient
def load_governance_tools(mcp_server):
"""Register governance and lineage MCP tools with the server.
Args:
mcp_server: The FastMCP server instance to register tools with
"""
@mcp_server.tool()
def list_audit_logs(start_time: str = None, end_time: str = None, user_id: str = None) -> dict:
"""List audit logs for the workspace.
Args:
start_time: Start time for audit logs (optional)
end_time: End time for audit logs (optional)
user_id: User ID to filter logs (optional)
Returns:
Dictionary with audit log listings or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Audit logs require specific permissions
# This is a placeholder for the concept
return {
'success': True,
'start_time': start_time,
'end_time': end_time,
'user_id': user_id,
'message': 'Audit log listing initiated',
'note': (
'Audit logs require specific permissions and may not be directly accessible via SDK'
),
'logs': [],
'count': 0,
}
except Exception as e:
print(f'❌ Error listing audit logs: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}', 'logs': [], 'count': 0}
@mcp_server.tool()
def get_audit_log(event_id: str) -> dict:
"""Get details of a specific audit log event.
Args:
event_id: The ID of the audit log event
Returns:
Dictionary with audit log details or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Audit log details require specific permissions
# This is a placeholder for the concept
return {
'success': True,
'event_id': event_id,
'message': f'Audit log {event_id} details retrieval initiated',
'note': (
'Audit log details require specific permissions and may not be '
'directly accessible via SDK'
),
'event': {},
}
except Exception as e:
print(f'❌ Error getting audit log details: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def export_audit_logs(start_time: str, end_time: str, format: str = 'json') -> dict:
"""Export audit logs for a specific time range.
Args:
start_time: Start time for audit logs
end_time: End time for audit logs
format: Export format (json, csv, etc.)
Returns:
Dictionary with operation result or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Audit log export requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'start_time': start_time,
'end_time': end_time,
'format': format,
'message': f'Audit log export initiated for {start_time} to {end_time}',
'note': (
'Audit log export requires specific permissions and may not be '
'directly accessible via SDK'
),
}
except Exception as e:
print(f'❌ Error exporting audit logs: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def list_governance_rules() -> dict:
"""List governance rules configured in the workspace.
Returns:
Dictionary with governance rule listings or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Governance rules require specific permissions
# This is a placeholder for the concept
return {
'success': True,
'message': 'Governance rule listing initiated',
'note': (
'Governance rules require specific permissions and may not be directly accessible via SDK'
),
'rules': [],
'count': 0,
}
except Exception as e:
print(f'❌ Error listing governance rules: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}', 'rules': [], 'count': 0}
@mcp_server.tool()
def get_governance_rule(rule_id: str) -> dict:
"""Get details of a specific governance rule.
Args:
rule_id: The ID of the governance rule
Returns:
Dictionary with governance rule details or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Governance rule details require specific permissions
# This is a placeholder for the concept
return {
'success': True,
'rule_id': rule_id,
'message': f'Governance rule {rule_id} details retrieval initiated',
'note': (
'Governance rule details require specific permissions and may not be '
'directly accessible via SDK'
),
'rule': {},
}
except Exception as e:
print(f'❌ Error getting governance rule details: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def create_governance_rule(rule_config: dict) -> dict:
"""Create a new governance rule.
Args:
rule_config: Dictionary containing rule configuration
Returns:
Dictionary with operation result or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Governance rule creation requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'rule_config': rule_config,
'message': 'Governance rule creation initiated',
'note': (
'Governance rule creation requires specific permissions and may not be '
'directly accessible via SDK'
),
}
except Exception as e:
print(f'❌ Error creating governance rule: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def update_governance_rule(rule_id: str, updates: dict) -> dict:
"""Update an existing governance rule.
Args:
rule_id: The ID of the governance rule to update
updates: Dictionary containing updates to apply
Returns:
Dictionary with operation result or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Governance rule updates require specific permissions
# This is a placeholder for the concept
return {
'success': True,
'rule_id': rule_id,
'updates': updates,
'message': f'Governance rule {rule_id} update initiated',
'note': (
'Governance rule updates require specific permissions and may not be '
'directly accessible via SDK'
),
}
except Exception as e:
print(f'❌ Error updating governance rule: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def delete_governance_rule(rule_id: str) -> dict:
"""Delete a governance rule.
Args:
rule_id: The ID of the governance rule to delete
Returns:
Dictionary with operation result or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Governance rule deletion requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'rule_id': rule_id,
'message': f'Governance rule {rule_id} deletion initiated',
'note': (
'Governance rule deletion requires specific permissions and may not be '
'directly accessible via SDK'
),
}
except Exception as e:
print(f'❌ Error deleting governance rule: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def get_table_lineage(table_name: str, depth: int = 1) -> dict:
"""Get data lineage information for a table.
Args:
table_name: Full table name in catalog.schema.table format
depth: Depth of lineage to retrieve (default: 1)
Returns:
Dictionary with table lineage information or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Table lineage requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'table_name': table_name,
'depth': depth,
'message': f'Table lineage retrieval initiated for {table_name}',
'note': (
'Table lineage requires specific permissions and may not be directly accessible via SDK'
),
'lineage': {},
}
except Exception as e:
print(f'❌ Error getting table lineage: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def get_column_lineage(table_name: str, column_name: str) -> dict:
"""Get column-level lineage information.
Args:
table_name: Full table name in catalog.schema.table format
column_name: Name of the column
Returns:
Dictionary with column lineage information or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Column lineage requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'table_name': table_name,
'column_name': column_name,
'message': f'Column lineage retrieval initiated for {table_name}.{column_name}',
'note': (
'Column lineage requires specific permissions and may not be directly accessible via SDK'
),
'lineage': {},
}
except Exception as e:
print(f'❌ Error getting column lineage: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def search_lineage(query: str, object_type: str = None) -> dict:
"""Search for lineage information.
Args:
query: Search query string
object_type: Type of object to search (optional)
Returns:
Dictionary with lineage search results or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Lineage search requires specific permissions
# This is a placeholder for the concept
return {
'success': True,
'query': query,
'object_type': object_type,
'message': 'Lineage search initiated',
'note': (
'Lineage search requires specific permissions and may not be directly accessible via SDK'
),
'results': [],
'count': 0,
}
except Exception as e:
print(f'❌ Error searching lineage: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def search_catalog(query: str, object_type: str = None) -> dict:
"""Search for catalog objects.
Args:
query: Search query string
object_type: Type of object to search (optional)
Returns:
Dictionary with catalog search results or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Catalog search requires Unity Catalog and specific permissions
# This is a placeholder for the concept
return {
'success': True,
'query': query,
'object_type': object_type,
'message': 'Catalog search initiated',
'note': (
'Catalog search requires Unity Catalog and specific permissions, '
'may not be directly accessible via SDK'
),
'results': [],
'count': 0,
}
except Exception as e:
print(f'❌ Error searching catalog: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}
@mcp_server.tool()
def get_object_usage_stats(object_name: str, time_range: str = '30d') -> dict:
"""Get usage statistics.
Args:
object_name: Full object name (catalog.schema.object)
time_range: Time range for statistics (default: "30d")
Returns:
Dictionary with usage statistics or error message
"""
try:
# Initialize Databricks SDK
WorkspaceClient(
host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN')
)
# Note: Usage statistics require specific permissions and may not be directly accessible
# This is a placeholder for the concept
return {
'success': True,
'object_name': object_name,
'time_range': time_range,
'message': f'Usage statistics retrieval initiated for {object_name}',
'note': (
'Usage statistics require specific permissions and may not be directly accessible via SDK'
),
'statistics': {},
}
except Exception as e:
print(f'❌ Error getting usage statistics: {str(e)}')
return {'success': False, 'error': f'Error: {str(e)}'}