Skip to main content
Glama
kami2k1

MCP MySQL Server

by kami2k1
security_tool.py3.23 kB
from typing import Dict, Any from .base_tool import BaseTool from src.database.query_executor import query_executor import logging logger = logging.getLogger(__name__) class SecurityStatusTool(BaseTool): def __init__(self): super().__init__( name="check_security_status", description="Check database security status and user privileges" ) def execute(self, **kwargs) -> Dict[str, Any]: try: # Get current user info user_info_query = "SELECT USER() as current_user, DATABASE() as current_database" user_info = query_executor.db.execute_query(user_info_query)[0] # Get user privileges privileges_query = "SHOW GRANTS FOR CURRENT_USER()" try: privileges = query_executor.db.execute_query(privileges_query) grant_statements = [list(grant.values())[0] for grant in privileges] except: grant_statements = ["Unable to retrieve grant information"] # Check if user has full access to current database has_full_access = any( 'ALL PRIVILEGES' in grant or f'ON `{user_info["current_database"]}`.*' in grant or 'ON *.*' in grant for grant in grant_statements ) # Get tables user can access (basic check) accessible_tables = [] try: tables = query_executor.get_tables() for table in tables: try: # Try to select from table to check access query_executor.db.execute_query(f"SELECT 1 FROM `{table['table_name']}` LIMIT 1") accessible_tables.append({ 'table_name': table['table_name'], 'access_level': 'READ_ACCESS' }) except: pass except Exception as e: logger.warning(f"Could not check table access: {str(e)}") result = { 'current_user': user_info['current_user'], 'current_database': user_info['current_database'], 'has_full_database_access': has_full_access, 'grant_statements': grant_statements, 'accessible_tables_count': len(accessible_tables), 'accessible_tables': accessible_tables[:10], # Show first 10 'security_warnings': [] } # Add security warnings if has_full_access: result['security_warnings'].append("User has full database access") if 'ON *.*' in str(grant_statements): result['security_warnings'].append("User has global privileges") logger.info("Security status check completed") return self.format_response(True, result) except Exception as e: logger.error(f"Security status tool failed: {str(e)}") return self.format_response(False, error=str(e))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kami2k1/MCP-MYSQL'

If you have feedback or need assistance with the MCP directory API, please join our Discord server