Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

tools_security.py31.6 kB
""" Security analysis tool handlers for MySQL. Includes tools for analyzing security: - User account security checks - Password policies - Privilege analysis - SSL/TLS configuration - Anonymous users and weak accounts Based on MySQLTuner's security recommendations patterns. """ from __future__ import annotations from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver from .toolhandler import ToolHandler class SecurityAnalysisToolHandler(ToolHandler): """Tool handler for MySQL security analysis.""" name = "analyze_security" title = "Security Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Perform comprehensive MySQL security analysis. Checks: - Anonymous user accounts - Users without passwords - Users with weak password policies - Root account security - Password validation plugin status - SSL/TLS configuration - Host-based access patterns - Dangerous privileges (SUPER, FILE, GRANT) Based on MySQLTuner's security_recommendations() function.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "include_user_list": { "type": "boolean", "description": "Include full user list in output", "default": False } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: include_users = arguments.get("include_user_list", False) output = { "security_score": 0, "checks": {}, "ssl_status": {}, "password_policy": {}, "issues": [], "recommendations": [], "users": [] } total_checks = 0 passed_checks = 0 # Check for anonymous users total_checks += 1 anon_check = await self._check_anonymous_users(output) if anon_check: passed_checks += 1 # Check for users without passwords total_checks += 1 no_pass_check = await self._check_users_without_password(output) if no_pass_check: passed_checks += 1 # Check root account total_checks += 1 root_check = await self._check_root_account(output) if root_check: passed_checks += 1 # Check password validation total_checks += 1 pw_policy_check = await self._check_password_validation(output) if pw_policy_check: passed_checks += 1 # Check SSL/TLS total_checks += 1 ssl_check = await self._check_ssl_status(output) if ssl_check: passed_checks += 1 # Check dangerous privileges total_checks += 1 priv_check = await self._check_dangerous_privileges(output) if priv_check: passed_checks += 1 # Check wildcard hosts total_checks += 1 host_check = await self._check_wildcard_hosts(output) if host_check: passed_checks += 1 # Check test databases total_checks += 1 test_db_check = await self._check_test_databases(output) if test_db_check: passed_checks += 1 # Calculate security score output["security_score"] = round((passed_checks / total_checks) * 100, 1) output["checks"]["total"] = total_checks output["checks"]["passed"] = passed_checks output["checks"]["failed"] = total_checks - passed_checks # Get user list if requested if include_users: await self._get_user_list(output) return self.format_json_result(output) except Exception as e: return self.format_error(e) async def _check_anonymous_users(self, output: dict) -> bool: """Check for anonymous user accounts.""" try: query = """ SELECT User, Host FROM mysql.user WHERE User = '' """ results = await self.sql_driver.execute_query(query) output["checks"]["anonymous_users"] = { "passed": len(results) == 0, "count": len(results) } if results: output["issues"].append( f"Found {len(results)} anonymous user account(s)" ) output["recommendations"].append( "Remove anonymous users: DROP USER ''@'host'" ) for row in results: output["issues"].append( f"Anonymous user: ''@'{row.get('Host')}'" ) return False return True except Exception: output["checks"]["anonymous_users"] = {"passed": None, "error": True} return False async def _check_users_without_password(self, output: dict) -> bool: """Check for users without passwords.""" try: # MySQL 5.7+ uses authentication_string query = """ SELECT User, Host, plugin FROM mysql.user WHERE (authentication_string = '' OR authentication_string IS NULL) AND plugin NOT IN ( 'auth_socket', 'unix_socket', 'mysql_native_password', 'caching_sha2_password' ) AND User != '' """ try: results = await self.sql_driver.execute_query(query) except Exception: # Try alternative for older versions query = """ SELECT User, Host FROM mysql.user WHERE Password = '' AND User != '' """ results = await self.sql_driver.execute_query(query) # Also check for users with empty password via plugin plugin_query = """ SELECT User, Host, plugin FROM mysql.user WHERE User != '' AND (authentication_string = '' OR authentication_string IS NULL) AND plugin IN ('mysql_native_password', 'caching_sha2_password') """ try: plugin_results = await self.sql_driver.execute_query(plugin_query) results.extend(plugin_results) except Exception: pass output["checks"]["users_without_password"] = { "passed": len(results) == 0, "count": len(results) } if results: output["issues"].append( f"Found {len(results)} user(s) without password" ) output["recommendations"].append( "Set passwords for all accounts or remove unused accounts" ) for row in results: output["issues"].append( f"User without password: '{row.get('User')}'@'{row.get('Host')}'" ) return False return True except Exception: output["checks"]["users_without_password"] = {"passed": None, "error": True} return False async def _check_root_account(self, output: dict) -> bool: """Check root account security.""" issues_found = False try: # Check if root can connect from any host query = """ SELECT User, Host FROM mysql.user WHERE User = 'root' AND Host NOT IN ('localhost', '127.0.0.1', '::1') """ results = await self.sql_driver.execute_query(query) output["checks"]["root_remote_access"] = { "passed": len(results) == 0, "count": len(results) } if results: issues_found = True output["issues"].append( "Root account allows remote access" ) output["recommendations"].append( "Restrict root to localhost only. Create separate admin accounts " "for remote access if needed." ) # Check if root exists at all (some setups rename it) root_check = """ SELECT COUNT(*) as cnt FROM mysql.user WHERE User = 'root' """ root_count = await self.sql_driver.execute_scalar(root_check) if root_count and root_count > 0: output["checks"]["root_exists"] = {"exists": True} output["recommendations"].append( "Consider renaming the root account for security through obscurity" ) return not issues_found except Exception: output["checks"]["root_account"] = {"passed": None, "error": True} return False async def _check_password_validation(self, output: dict) -> bool: """Check password validation plugin status.""" try: variables = await self.sql_driver.get_server_variables() # Check for validate_password component/plugin validate_policy = variables.get("validate_password.policy") or \ variables.get("validate_password_policy") validate_length = variables.get("validate_password.length") or \ variables.get("validate_password_length") if validate_policy: output["password_policy"] = { "enabled": True, "policy": validate_policy, "min_length": validate_length, "mixed_case": variables.get( "validate_password.mixed_case_count" ) or variables.get("validate_password_mixed_case_count"), "number_count": variables.get( "validate_password.number_count" ) or variables.get("validate_password_number_count"), "special_char": variables.get( "validate_password.special_char_count" ) or variables.get("validate_password_special_char_count") } output["checks"]["password_validation"] = {"passed": True} return True else: output["password_policy"] = {"enabled": False} output["issues"].append( "Password validation plugin is not enabled" ) output["recommendations"].append( "Enable validate_password component: " "INSTALL COMPONENT 'file://component_validate_password'" ) output["checks"]["password_validation"] = {"passed": False} return False except Exception: output["checks"]["password_validation"] = {"passed": None, "error": True} return False async def _check_ssl_status(self, output: dict) -> bool: """Check SSL/TLS configuration.""" try: variables = await self.sql_driver.get_server_variables() status = await self.sql_driver.get_server_status() have_ssl = variables.get("have_ssl") have_openssl = variables.get("have_openssl") ssl_cipher = variables.get("ssl_cipher") require_secure_transport = variables.get("require_secure_transport") output["ssl_status"] = { "have_ssl": have_ssl, "have_openssl": have_openssl, "ssl_cipher": ssl_cipher, "require_secure_transport": require_secure_transport, "ssl_ca": variables.get("ssl_ca"), "ssl_cert": variables.get("ssl_cert"), "ssl_key": variables.get("ssl_key"), "tls_version": variables.get("tls_version") } passed = True if have_ssl != "YES": passed = False output["issues"].append("SSL is not enabled") output["recommendations"].append( "Enable SSL for encrypted connections" ) else: # Check SSL usage ssl_accepts = int(status.get("Ssl_accepts", 0)) ssl_finished = int(status.get("Ssl_finished_accepts", 0)) output["ssl_status"]["ssl_connections"] = { "accepts": ssl_accepts, "finished": ssl_finished } if require_secure_transport != "ON": output["recommendations"].append( "Consider setting require_secure_transport=ON to enforce SSL" ) # Check for weak TLS versions tls_version = variables.get("tls_version", "") if "TLSv1" in tls_version and "TLSv1.2" not in tls_version: output["issues"].append( f"Weak TLS version enabled: {tls_version}" ) output["recommendations"].append( "Disable TLSv1 and TLSv1.1, use TLSv1.2 or higher" ) output["checks"]["ssl_enabled"] = {"passed": passed} return passed except Exception: output["checks"]["ssl_enabled"] = {"passed": None, "error": True} return False async def _check_dangerous_privileges(self, output: dict) -> bool: """Check for users with dangerous privileges.""" try: # Check for SUPER, FILE, PROCESS, SHUTDOWN privileges query = """ SELECT User, Host, Super_priv, File_priv, Process_priv, Shutdown_priv, Grant_priv FROM mysql.user WHERE User != 'root' AND User != 'mysql.sys' AND User != 'mysql.session' AND User != 'mysql.infoschema' AND (Super_priv = 'Y' OR File_priv = 'Y' OR Process_priv = 'Y' OR Shutdown_priv = 'Y') """ results = await self.sql_driver.execute_query(query) output["checks"]["dangerous_privileges"] = { "passed": len(results) == 0, "count": len(results) } if results: output["issues"].append( f"Found {len(results)} non-root user(s) with dangerous privileges" ) for row in results: privs = [] if row.get("Super_priv") == "Y": privs.append("SUPER") if row.get("File_priv") == "Y": privs.append("FILE") if row.get("Process_priv") == "Y": privs.append("PROCESS") if row.get("Shutdown_priv") == "Y": privs.append("SHUTDOWN") output["issues"].append( f"User '{row.get('User')}'@'{row.get('Host')}' has: " f"{', '.join(privs)}" ) output["recommendations"].append( "Review and revoke unnecessary SUPER, FILE, PROCESS, " "SHUTDOWN privileges" ) return False return True except Exception: output["checks"]["dangerous_privileges"] = {"passed": None, "error": True} return False async def _check_wildcard_hosts(self, output: dict) -> bool: """Check for users with wildcard host patterns.""" try: query = """ SELECT User, Host FROM mysql.user WHERE Host = '%' AND User != '' AND User NOT IN ( 'mysql.sys', 'mysql.session', 'mysql.infoschema' ) """ results = await self.sql_driver.execute_query(query) output["checks"]["wildcard_hosts"] = { "passed": len(results) == 0, "count": len(results) } if results: output["issues"].append( f"Found {len(results)} user(s) with wildcard host '%'" ) for row in results: output["issues"].append( f"Wildcard host: '{row.get('User')}'@'%'" ) output["recommendations"].append( "Restrict user accounts to specific hosts/IPs instead of '%'" ) return False return True except Exception: output["checks"]["wildcard_hosts"] = {"passed": None, "error": True} return False async def _check_test_databases(self, output: dict) -> bool: """Check for test databases.""" try: query = """ SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME LIKE 'test%' """ results = await self.sql_driver.execute_query(query) output["checks"]["test_databases"] = { "passed": len(results) == 0, "count": len(results) } if results: dbs = [row.get("SCHEMA_NAME") for row in results] output["issues"].append( f"Found test database(s): {', '.join(dbs)}" ) output["recommendations"].append( "Remove test databases in production: DROP DATABASE test" ) return False return True except Exception: output["checks"]["test_databases"] = {"passed": None, "error": True} return False async def _get_user_list(self, output: dict) -> None: """Get full user list.""" try: query = """ SELECT User, Host, plugin, account_locked, password_expired, password_lifetime, password_last_changed, max_connections, max_user_connections FROM mysql.user ORDER BY User, Host """ results = await self.sql_driver.execute_query(query) output["users"] = [ { "user": row.get("User"), "host": row.get("Host"), "plugin": row.get("plugin"), "account_locked": row.get("account_locked"), "password_expired": row.get("password_expired"), "password_lifetime": row.get("password_lifetime"), "password_last_changed": str(row.get("password_last_changed")), "max_connections": row.get("max_connections"), "max_user_connections": row.get("max_user_connections") } for row in results ] except Exception: output["users"] = [] class UserPrivilegesToolHandler(ToolHandler): """Tool handler for user privileges analysis.""" name = "analyze_user_privileges" title = "User Privileges Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze privileges for a specific user or all users. Shows: - Global privileges - Database-level privileges - Table-level privileges - Column-level privileges - Routine privileges Helps identify excessive or missing privileges.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "username": { "type": "string", "description": "Username to analyze (omit for all users)" }, "hostname": { "type": "string", "description": "Host pattern for the user", "default": "%" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: username = arguments.get("username") hostname = arguments.get("hostname", "%") output = { "users": [], "summary": {}, "recommendations": [] } if username: # Analyze specific user user_privs = await self._get_user_privileges(username, hostname) output["users"].append(user_privs) else: # Get list of all users and their privilege summary users_query = """ SELECT DISTINCT User, Host FROM mysql.user WHERE User != '' ORDER BY User, Host LIMIT 100 """ users = await self.sql_driver.execute_query(users_query) for user in users: user_privs = await self._get_user_privileges( user.get("User"), user.get("Host") ) output["users"].append(user_privs) # Generate summary total_users = len(output["users"]) super_users = len([ u for u in output["users"] if u.get("global_privileges", {}).get("Super_priv") == "Y" ]) grant_users = len([ u for u in output["users"] if u.get("global_privileges", {}).get("Grant_priv") == "Y" ]) output["summary"] = { "total_users_analyzed": total_users, "users_with_super": super_users, "users_with_grant": grant_users } if super_users > 2: output["recommendations"].append( f"{super_users} users have SUPER privilege - review necessity" ) return self.format_json_result(output) except Exception as e: return self.format_error(e) async def _get_user_privileges(self, username: str, hostname: str) -> dict: """Get privileges for a specific user.""" user_privs = { "user": username, "host": hostname, "global_privileges": {}, "database_privileges": [], "table_privileges": [], "column_privileges": [], "routine_privileges": [] } # Global privileges try: global_query = f""" SELECT * FROM mysql.user WHERE User = '{username}' AND Host = '{hostname}' """ global_result = await self.sql_driver.execute_query(global_query) if global_result: row = global_result[0] # Extract privilege columns (those ending in _priv) user_privs["global_privileges"] = { k: v for k, v in row.items() if k.endswith("_priv") and v == "Y" } except Exception: pass # Database privileges try: db_query = f""" SELECT Db, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Grant_priv, Index_priv, Alter_priv, Create_tmp_table_priv, Lock_tables_priv, Create_view_priv, Show_view_priv, Create_routine_priv, Alter_routine_priv, Execute_priv, Event_priv, Trigger_priv FROM mysql.db WHERE User = '{username}' AND Host = '{hostname}' """ db_results = await self.sql_driver.execute_query(db_query) for row in db_results: privs = {k: v for k, v in row.items() if v == "Y" and k != "Db"} user_privs["database_privileges"].append({ "database": row.get("Db"), "privileges": list(privs.keys()) }) except Exception: pass # Table privileges try: table_query = f""" SELECT Db, Table_name, Table_priv, Column_priv FROM mysql.tables_priv WHERE User = '{username}' AND Host = '{hostname}' """ table_results = await self.sql_driver.execute_query(table_query) for row in table_results: user_privs["table_privileges"].append({ "database": row.get("Db"), "table": row.get("Table_name"), "table_priv": row.get("Table_priv"), "column_priv": row.get("Column_priv") }) except Exception: pass return user_privs class AuditLogToolHandler(ToolHandler): """Tool handler for MySQL audit log analysis.""" name = "check_audit_log" title = "Audit Log Checker" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Check MySQL audit log configuration and status. Analyzes: - Audit plugin status - Audit log configuration - Recent audit events (if accessible) - Compliance recommendations Supports MySQL Enterprise Audit, MariaDB Audit Plugin, and Percona Audit Log Plugin.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": {}, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: output = { "audit_enabled": False, "audit_plugin": None, "configuration": {}, "recommendations": [] } variables = await self.sql_driver.get_server_variables() # Check for MySQL Enterprise Audit audit_log = variables.get("audit_log_file") if audit_log: output["audit_enabled"] = True output["audit_plugin"] = "MySQL Enterprise Audit" output["configuration"] = { "audit_log_file": audit_log, "audit_log_format": variables.get("audit_log_format"), "audit_log_policy": variables.get("audit_log_policy"), "audit_log_rotate_on_size": variables.get( "audit_log_rotate_on_size" ), "audit_log_buffer_size": variables.get("audit_log_buffer_size") } # Check for MariaDB Audit Plugin server_audit = variables.get("server_audit_logging") if server_audit: output["audit_enabled"] = server_audit == "ON" output["audit_plugin"] = "MariaDB Server Audit" output["configuration"] = { "server_audit_logging": server_audit, "server_audit_file_path": variables.get("server_audit_file_path"), "server_audit_events": variables.get("server_audit_events"), "server_audit_incl_users": variables.get( "server_audit_incl_users" ), "server_audit_excl_users": variables.get( "server_audit_excl_users" ) } # Check for Percona Audit Log audit_log_policy = variables.get("audit_log_policy") if audit_log_policy and not audit_log: output["audit_enabled"] = True output["audit_plugin"] = "Percona Audit Log" output["configuration"]["audit_log_policy"] = audit_log_policy # Check for loaded plugins try: plugins_query = """ SELECT PLUGIN_NAME, PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME LIKE '%audit%' """ plugins = await self.sql_driver.execute_query(plugins_query) if plugins: output["installed_plugins"] = [ { "name": p.get("PLUGIN_NAME"), "status": p.get("PLUGIN_STATUS") } for p in plugins ] except Exception: pass # Recommendations if not output["audit_enabled"]: output["recommendations"].append( "No audit logging detected. Consider enabling audit logging " "for compliance and security monitoring." ) output["recommendations"].append( "Options: MySQL Enterprise Audit (commercial), " "MariaDB Audit Plugin (free), or Percona Audit Log (free)" ) else: output["recommendations"].append( f"{output['audit_plugin']} is enabled" ) output["recommendations"].append( "Ensure audit logs are being monitored and regularly reviewed" ) output["recommendations"].append( "Configure log rotation to prevent disk space issues" ) return self.format_json_result(output) except Exception as e: return self.format_error(e)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server