Skip to main content
Glama
PulkitXChadha

Databricks MCP Server

governance.py14.2 kB
"""Governance and lineage MCP tools for Databricks.""" import os from databricks.sdk import WorkspaceClient def load_governance_tools(mcp_server): """Register governance and lineage MCP tools with the server. Args: mcp_server: The FastMCP server instance to register tools with """ @mcp_server.tool() def list_audit_logs(start_time: str = None, end_time: str = None, user_id: str = None) -> dict: """List audit logs for the workspace. Args: start_time: Start time for audit logs (optional) end_time: End time for audit logs (optional) user_id: User ID to filter logs (optional) Returns: Dictionary with audit log listings or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Audit logs require specific permissions # This is a placeholder for the concept return { 'success': True, 'start_time': start_time, 'end_time': end_time, 'user_id': user_id, 'message': 'Audit log listing initiated', 'note': ( 'Audit logs require specific permissions and may not be directly accessible via SDK' ), 'logs': [], 'count': 0, } except Exception as e: print(f'❌ Error listing audit logs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'logs': [], 'count': 0} @mcp_server.tool() def get_audit_log(event_id: str) -> dict: """Get details of a specific audit log event. Args: event_id: The ID of the audit log event Returns: Dictionary with audit log details or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Audit log details require specific permissions # This is a placeholder for the concept return { 'success': True, 'event_id': event_id, 'message': f'Audit log {event_id} details retrieval initiated', 'note': ( 'Audit log details require specific permissions and may not be ' 'directly accessible via SDK' ), 'event': {}, } except Exception as e: print(f'❌ Error getting audit log details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def export_audit_logs(start_time: str, end_time: str, format: str = 'json') -> dict: """Export audit logs for a specific time range. Args: start_time: Start time for audit logs end_time: End time for audit logs format: Export format (json, csv, etc.) Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Audit log export requires specific permissions # This is a placeholder for the concept return { 'success': True, 'start_time': start_time, 'end_time': end_time, 'format': format, 'message': f'Audit log export initiated for {start_time} to {end_time}', 'note': ( 'Audit log export requires specific permissions and may not be ' 'directly accessible via SDK' ), } except Exception as e: print(f'❌ Error exporting audit logs: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def list_governance_rules() -> dict: """List governance rules configured in the workspace. Returns: Dictionary with governance rule listings or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Governance rules require specific permissions # This is a placeholder for the concept return { 'success': True, 'message': 'Governance rule listing initiated', 'note': ( 'Governance rules require specific permissions and may not be directly accessible via SDK' ), 'rules': [], 'count': 0, } except Exception as e: print(f'❌ Error listing governance rules: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}', 'rules': [], 'count': 0} @mcp_server.tool() def get_governance_rule(rule_id: str) -> dict: """Get details of a specific governance rule. Args: rule_id: The ID of the governance rule Returns: Dictionary with governance rule details or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Governance rule details require specific permissions # This is a placeholder for the concept return { 'success': True, 'rule_id': rule_id, 'message': f'Governance rule {rule_id} details retrieval initiated', 'note': ( 'Governance rule details require specific permissions and may not be ' 'directly accessible via SDK' ), 'rule': {}, } except Exception as e: print(f'❌ Error getting governance rule details: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def create_governance_rule(rule_config: dict) -> dict: """Create a new governance rule. Args: rule_config: Dictionary containing rule configuration Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Governance rule creation requires specific permissions # This is a placeholder for the concept return { 'success': True, 'rule_config': rule_config, 'message': 'Governance rule creation initiated', 'note': ( 'Governance rule creation requires specific permissions and may not be ' 'directly accessible via SDK' ), } except Exception as e: print(f'❌ Error creating governance rule: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def update_governance_rule(rule_id: str, updates: dict) -> dict: """Update an existing governance rule. Args: rule_id: The ID of the governance rule to update updates: Dictionary containing updates to apply Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Governance rule updates require specific permissions # This is a placeholder for the concept return { 'success': True, 'rule_id': rule_id, 'updates': updates, 'message': f'Governance rule {rule_id} update initiated', 'note': ( 'Governance rule updates require specific permissions and may not be ' 'directly accessible via SDK' ), } except Exception as e: print(f'❌ Error updating governance rule: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def delete_governance_rule(rule_id: str) -> dict: """Delete a governance rule. Args: rule_id: The ID of the governance rule to delete Returns: Dictionary with operation result or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Governance rule deletion requires specific permissions # This is a placeholder for the concept return { 'success': True, 'rule_id': rule_id, 'message': f'Governance rule {rule_id} deletion initiated', 'note': ( 'Governance rule deletion requires specific permissions and may not be ' 'directly accessible via SDK' ), } except Exception as e: print(f'❌ Error deleting governance rule: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def get_table_lineage(table_name: str, depth: int = 1) -> dict: """Get data lineage information for a table. Args: table_name: Full table name in catalog.schema.table format depth: Depth of lineage to retrieve (default: 1) Returns: Dictionary with table lineage information or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Table lineage requires specific permissions # This is a placeholder for the concept return { 'success': True, 'table_name': table_name, 'depth': depth, 'message': f'Table lineage retrieval initiated for {table_name}', 'note': ( 'Table lineage requires specific permissions and may not be directly accessible via SDK' ), 'lineage': {}, } except Exception as e: print(f'❌ Error getting table lineage: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def get_column_lineage(table_name: str, column_name: str) -> dict: """Get column-level lineage information. Args: table_name: Full table name in catalog.schema.table format column_name: Name of the column Returns: Dictionary with column lineage information or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Column lineage requires specific permissions # This is a placeholder for the concept return { 'success': True, 'table_name': table_name, 'column_name': column_name, 'message': f'Column lineage retrieval initiated for {table_name}.{column_name}', 'note': ( 'Column lineage requires specific permissions and may not be directly accessible via SDK' ), 'lineage': {}, } except Exception as e: print(f'❌ Error getting column lineage: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def search_lineage(query: str, object_type: str = None) -> dict: """Search for lineage information. Args: query: Search query string object_type: Type of object to search (optional) Returns: Dictionary with lineage search results or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Lineage search requires specific permissions # This is a placeholder for the concept return { 'success': True, 'query': query, 'object_type': object_type, 'message': 'Lineage search initiated', 'note': ( 'Lineage search requires specific permissions and may not be directly accessible via SDK' ), 'results': [], 'count': 0, } except Exception as e: print(f'❌ Error searching lineage: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def search_catalog(query: str, object_type: str = None) -> dict: """Search for catalog objects. Args: query: Search query string object_type: Type of object to search (optional) Returns: Dictionary with catalog search results or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Catalog search requires Unity Catalog and specific permissions # This is a placeholder for the concept return { 'success': True, 'query': query, 'object_type': object_type, 'message': 'Catalog search initiated', 'note': ( 'Catalog search requires Unity Catalog and specific permissions, ' 'may not be directly accessible via SDK' ), 'results': [], 'count': 0, } except Exception as e: print(f'❌ Error searching catalog: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'} @mcp_server.tool() def get_object_usage_stats(object_name: str, time_range: str = '30d') -> dict: """Get usage statistics. Args: object_name: Full object name (catalog.schema.object) time_range: Time range for statistics (default: "30d") Returns: Dictionary with usage statistics or error message """ try: # Initialize Databricks SDK WorkspaceClient( host=os.environ.get('DATABRICKS_HOST'), token=os.environ.get('DATABRICKS_TOKEN') ) # Note: Usage statistics require specific permissions and may not be directly accessible # This is a placeholder for the concept return { 'success': True, 'object_name': object_name, 'time_range': time_range, 'message': f'Usage statistics retrieval initiated for {object_name}', 'note': ( 'Usage statistics require specific permissions and may not be directly accessible via SDK' ), 'statistics': {}, } except Exception as e: print(f'❌ Error getting usage statistics: {str(e)}') return {'success': False, 'error': f'Error: {str(e)}'}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PulkitXChadha/awesome-databricks-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server