Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

tools_replication.py29.2 kB
""" Replication status tool handlers for MySQL. Includes tools for analyzing replication: - Master/Source status - Slave/Replica status - Replication lag analysis - Galera cluster status (MariaDB) - Group replication status (MySQL) Based on MySQLTuner's replication analysis patterns. """ from __future__ import annotations from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver from .toolhandler import ToolHandler class ReplicationStatusToolHandler(ToolHandler): """Tool handler for replication status analysis.""" name = "get_replication_status" title = "Replication Status" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get MySQL replication status and health. Analyzes: - Master/Source status and binary log position - Slave/Replica status and lag - Replication errors and warnings - Binary log configuration - Semi-sync replication status Works with both MySQL and MariaDB terminology.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "check_all_channels": { "type": "boolean", "description": "Check all replication channels (multi-source)", "default": True } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: check_all = arguments.get("check_all_channels", True) output = { "is_master": False, "is_slave": False, "master_status": {}, "slave_status": [], "binary_logs": {}, "replication_variables": {}, "issues": [], "recommendations": [] } variables = await self.sql_driver.get_server_variables() status = await self.sql_driver.get_server_status() # Check if binary logging is enabled (potential master) log_bin = variables.get("log_bin", "OFF") output["binary_logs"]["log_bin"] = log_bin if log_bin.upper() == "ON": await self._get_master_status(output) # Check slave status await self._get_slave_status(output, check_all) # Get replication-related variables await self._get_replication_variables(variables, output) # Check for semi-sync replication await self._check_semisync(variables, status, output) # Generate recommendations self._generate_recommendations(output) return self.format_json_result(output) except Exception as e: return self.format_error(e) async def _get_master_status(self, output: dict) -> None: """Get master/source status.""" try: # Try MySQL 8.0+ syntax first try: result = await self.sql_driver.execute_query( "SHOW BINARY LOG STATUS" ) except Exception: # Fall back to older syntax result = await self.sql_driver.execute_query("SHOW MASTER STATUS") if result: row = result[0] output["is_master"] = True output["master_status"] = { "file": row.get("File"), "position": row.get("Position"), "binlog_do_db": row.get("Binlog_Do_DB"), "binlog_ignore_db": row.get("Binlog_Ignore_DB"), "executed_gtid_set": row.get("Executed_Gtid_Set") } # Get binary log list try: logs = await self.sql_driver.execute_query("SHOW BINARY LOGS") total_size = sum(log.get("File_size", 0) for log in logs) output["binary_logs"]["files_count"] = len(logs) output["binary_logs"]["total_size_mb"] = round( total_size / 1024 / 1024, 2 ) output["binary_logs"]["latest_files"] = [ { "name": log.get("Log_name"), "size_mb": round( (log.get("File_size") or 0) / 1024 / 1024, 2 ) } for log in logs[-5:] # Last 5 files ] except Exception: pass # Get connected slaves try: # Try MySQL 8.0.22+ syntax try: slaves = await self.sql_driver.execute_query( "SHOW REPLICAS" ) except Exception: slaves = await self.sql_driver.execute_query( "SHOW SLAVE HOSTS" ) output["master_status"]["connected_replicas"] = len(slaves) output["master_status"]["replicas"] = [ { "server_id": s.get("Server_id"), "host": s.get("Host"), "port": s.get("Port"), "uuid": s.get("Replica_UUID") or s.get("Slave_UUID") } for s in slaves ] except Exception: pass except Exception: # Not a master or no privileges pass async def _get_slave_status(self, output: dict, check_all: bool) -> None: """Get slave/replica status.""" try: # Try MySQL 8.0.22+ syntax first try: if check_all: results = await self.sql_driver.execute_query( "SHOW REPLICA STATUS" ) else: results = await self.sql_driver.execute_query( "SHOW REPLICA STATUS FOR CHANNEL ''" ) except Exception: # Fall back to older syntax results = await self.sql_driver.execute_query("SHOW SLAVE STATUS") if results: output["is_slave"] = True for row in results: channel_name = row.get("Channel_Name", "") io_running = row.get("Slave_IO_Running") or row.get( "Replica_IO_Running" ) sql_running = row.get("Slave_SQL_Running") or row.get( "Replica_SQL_Running" ) slave_info = { "channel_name": channel_name, "master_host": row.get("Master_Host") or row.get( "Source_Host" ), "master_port": row.get("Master_Port") or row.get( "Source_Port" ), "master_user": row.get("Master_User") or row.get( "Source_User" ), "io_running": io_running, "sql_running": sql_running, "seconds_behind_master": row.get( "Seconds_Behind_Master" ) or row.get("Seconds_Behind_Source"), "last_io_error": row.get("Last_IO_Error"), "last_sql_error": row.get("Last_SQL_Error"), "relay_log_file": row.get("Relay_Log_File"), "relay_log_pos": row.get("Relay_Log_Pos"), "master_log_file": row.get("Master_Log_File") or row.get( "Source_Log_File" ), "read_master_log_pos": row.get( "Read_Master_Log_Pos" ) or row.get("Read_Source_Log_Pos"), "exec_master_log_pos": row.get( "Exec_Master_Log_Pos" ) or row.get("Exec_Source_Log_Pos"), "gtid_executed": row.get("Executed_Gtid_Set"), "auto_position": row.get("Auto_Position") } # Check for issues if io_running != "Yes": output["issues"].append( f"Channel '{channel_name}': Replica IO thread not running" ) if row.get("Last_IO_Error"): output["issues"].append( f"IO Error: {row.get('Last_IO_Error')}" ) if sql_running != "Yes": output["issues"].append( f"Channel '{channel_name}': Replica SQL thread not running" ) if row.get("Last_SQL_Error"): output["issues"].append( f"SQL Error: {row.get('Last_SQL_Error')}" ) # Check lag lag = slave_info["seconds_behind_master"] if lag is not None and lag > 60: output["issues"].append( f"Channel '{channel_name}': Replication lag is " f"{lag} seconds" ) output["slave_status"].append(slave_info) except Exception: # Not a slave or no privileges pass async def _get_replication_variables( self, variables: dict, output: dict ) -> None: """Get replication-related variables.""" repl_vars = {} # Key replication variables var_names = [ "server_id", "server_uuid", "log_bin", "binlog_format", "binlog_row_image", "gtid_mode", "enforce_gtid_consistency", "binlog_expire_logs_seconds", "expire_logs_days", "max_binlog_size", "sync_binlog", "relay_log_purge", "relay_log_recovery", "slave_parallel_workers", "slave_parallel_type", "replica_parallel_workers", "replica_parallel_type", "read_only", "super_read_only", "log_slave_updates", "log_replica_updates" ] for var in var_names: if var in variables: repl_vars[var] = variables[var] output["replication_variables"] = repl_vars # Check for common issues if repl_vars.get("server_id") == "0": output["issues"].append("server_id is 0 - replication requires unique ID") output["recommendations"].append( "Set server_id to a unique non-zero value" ) if repl_vars.get("gtid_mode") == "OFF" and output["is_slave"]: output["recommendations"].append( "Consider enabling GTID mode for easier replication management" ) sync_binlog = repl_vars.get("sync_binlog", "1") if sync_binlog == "0": output["issues"].append("sync_binlog=0 may cause data loss on crash") output["recommendations"].append( "Set sync_binlog=1 for crash-safe replication" ) async def _check_semisync( self, variables: dict, status: dict, output: dict ) -> None: """Check semi-synchronous replication status.""" semisync = {} # Check master semi-sync master_enabled = variables.get("rpl_semi_sync_master_enabled") or \ variables.get("rpl_semi_sync_source_enabled") if master_enabled: semisync["master_enabled"] = master_enabled == "ON" semisync["master_timeout"] = variables.get( "rpl_semi_sync_master_timeout" ) or variables.get("rpl_semi_sync_source_timeout") # Semi-sync status semisync["master_clients"] = status.get( "Rpl_semi_sync_master_clients" ) or status.get("Rpl_semi_sync_source_clients") semisync["master_status"] = status.get( "Rpl_semi_sync_master_status" ) or status.get("Rpl_semi_sync_source_status") # Check slave semi-sync slave_enabled = variables.get("rpl_semi_sync_slave_enabled") or \ variables.get("rpl_semi_sync_replica_enabled") if slave_enabled: semisync["slave_enabled"] = slave_enabled == "ON" semisync["slave_status"] = status.get( "Rpl_semi_sync_slave_status" ) or status.get("Rpl_semi_sync_replica_status") if semisync: output["semisync_replication"] = semisync def _generate_recommendations(self, output: dict) -> None: """Generate replication-related recommendations.""" if output["is_master"]: # Master recommendations binlog_format = output["replication_variables"].get("binlog_format") if binlog_format == "STATEMENT": output["recommendations"].append( "Consider using binlog_format=ROW for safer replication" ) expire_days = output["replication_variables"].get("expire_logs_days") expire_seconds = output["replication_variables"].get( "binlog_expire_logs_seconds" ) if not expire_days and not expire_seconds: output["recommendations"].append( "Binary log expiration not set - logs may accumulate. " "Set binlog_expire_logs_seconds or expire_logs_days." ) if output["is_slave"]: # Slave recommendations for slave in output["slave_status"]: if slave.get("auto_position") != 1: output["recommendations"].append( f"Channel '{slave.get('channel_name', '')}': " "Consider enabling MASTER_AUTO_POSITION=1 for GTID-based " "replication for easier failover" ) parallel_workers = output["replication_variables"].get( "slave_parallel_workers" ) or output["replication_variables"].get("replica_parallel_workers") if parallel_workers and int(parallel_workers) == 0: output["recommendations"].append( "Consider enabling parallel replication with " "replica_parallel_workers > 0 for faster apply" ) class GaleraClusterToolHandler(ToolHandler): """Tool handler for Galera cluster status.""" name = "get_galera_status" title = "Galera Cluster Status" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get Galera cluster status for MariaDB/Percona XtraDB Cluster. Analyzes: - Cluster membership and state - Node status (Primary, Donor, Joiner) - Flow control status - Replication health metrics - Certification and write-set conflicts Only applicable to Galera-enabled MySQL variants.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": {}, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: output = { "is_galera": False, "cluster_status": {}, "node_status": {}, "replication_health": {}, "flow_control": {}, "issues": [], "recommendations": [] } status = await self.sql_driver.get_server_status("wsrep%") # Check if Galera is enabled wsrep_on = status.get("wsrep_on") or status.get("wsrep_ready") if not wsrep_on or wsrep_on == "OFF": output["message"] = "Galera replication is not enabled" return self.format_json_result(output) output["is_galera"] = True # Cluster status output["cluster_status"] = { "cluster_name": status.get("wsrep_cluster_name"), "cluster_size": int(status.get("wsrep_cluster_size", 0)), "cluster_state_uuid": status.get("wsrep_cluster_state_uuid"), "cluster_status": status.get("wsrep_cluster_status"), "cluster_conf_id": status.get("wsrep_cluster_conf_id") } # Check cluster status cluster_status = status.get("wsrep_cluster_status") if cluster_status != "Primary": output["issues"].append( f"Cluster status is '{cluster_status}' (expected 'Primary')" ) cluster_size = int(status.get("wsrep_cluster_size", 0)) if cluster_size < 3: output["issues"].append( f"Cluster size is {cluster_size} (recommended minimum: 3)" ) # Node status output["node_status"] = { "node_name": status.get("wsrep_node_name"), "node_address": status.get("wsrep_node_address"), "local_state": int(status.get("wsrep_local_state", 0)), "local_state_comment": status.get("wsrep_local_state_comment"), "ready": status.get("wsrep_ready"), "connected": status.get("wsrep_connected"), "provider_version": status.get("wsrep_provider_version") } # Check node state (4 = Synced) local_state = int(status.get("wsrep_local_state", 0)) if local_state != 4: output["issues"].append( f"Node state is '{status.get('wsrep_local_state_comment')}' " "(expected 'Synced')" ) if status.get("wsrep_ready") != "ON": output["issues"].append("Node is not ready for queries") if status.get("wsrep_connected") != "ON": output["issues"].append("Node is not connected to cluster") # Replication health recv_queue = int(status.get("wsrep_local_recv_queue", 0)) send_queue = int(status.get("wsrep_local_send_queue", 0)) cert_failures = int(status.get("wsrep_local_cert_failures", 0)) bf_aborts = int(status.get("wsrep_local_bf_aborts", 0)) output["replication_health"] = { "local_recv_queue": recv_queue, "local_recv_queue_avg": float( status.get("wsrep_local_recv_queue_avg", 0) ), "local_send_queue": send_queue, "local_send_queue_avg": float( status.get("wsrep_local_send_queue_avg", 0) ), "cert_deps_distance": float( status.get("wsrep_cert_deps_distance", 0) ), "apply_window": float( status.get("wsrep_apply_window", 0) ), "commit_window": float( status.get("wsrep_commit_window", 0) ), "local_cert_failures": cert_failures, "local_bf_aborts": bf_aborts, "replicated_bytes": int(status.get("wsrep_replicated_bytes", 0)), "received_bytes": int(status.get("wsrep_received_bytes", 0)) } if recv_queue > 10: output["issues"].append( f"High receive queue ({recv_queue}) - node may be slow to apply" ) if cert_failures > 0: output["issues"].append( f"{cert_failures} certification failures detected - " "check for conflicting writes" ) # Flow control output["flow_control"] = { "paused": float(status.get("wsrep_flow_control_paused", 0)), "paused_ns": int(status.get("wsrep_flow_control_paused_ns", 0)), "sent": int(status.get("wsrep_flow_control_sent", 0)), "recv": int(status.get("wsrep_flow_control_recv", 0)) } paused_pct = float(status.get("wsrep_flow_control_paused", 0)) if paused_pct > 0.1: # More than 10% paused output["issues"].append( f"Flow control paused {paused_pct * 100:.1f}% of time - " "indicates slow node in cluster" ) output["recommendations"].append( "Investigate slow node or increase wsrep_slave_threads" ) # Recommendations if not output["issues"]: output["recommendations"].append( "Galera cluster appears healthy" ) return self.format_json_result(output) except Exception as e: return self.format_error(e) class GroupReplicationToolHandler(ToolHandler): """Tool handler for MySQL Group Replication status.""" name = "get_group_replication_status" title = "Group Replication Status" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get MySQL Group Replication status. Analyzes: - Group membership and state - Member roles (PRIMARY/SECONDARY) - Replication channels - Transaction certification - Flow control Only applicable to MySQL with Group Replication enabled.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": {}, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: output = { "is_group_replication": False, "group_status": {}, "members": [], "local_member": {}, "issues": [], "recommendations": [] } variables = await self.sql_driver.get_server_variables( "group_replication%" ) # Check if Group Replication is enabled gr_enabled = variables.get("group_replication_group_name") if not gr_enabled: output["message"] = "Group Replication is not configured" return self.format_json_result(output) # Get member status try: members_query = """ SELECT CHANNEL_NAME, MEMBER_ID, MEMBER_HOST, MEMBER_PORT, MEMBER_STATE, MEMBER_ROLE, MEMBER_VERSION FROM performance_schema.replication_group_members """ members = await self.sql_driver.execute_query(members_query) if not members: output["message"] = "Group Replication not active" return self.format_json_result(output) output["is_group_replication"] = True for member in members: member_info = { "member_id": member.get("MEMBER_ID"), "host": member.get("MEMBER_HOST"), "port": member.get("MEMBER_PORT"), "state": member.get("MEMBER_STATE"), "role": member.get("MEMBER_ROLE"), "version": member.get("MEMBER_VERSION") } output["members"].append(member_info) if member.get("MEMBER_STATE") != "ONLINE": output["issues"].append( f"Member {member.get('MEMBER_HOST')}:{member.get('MEMBER_PORT')} " f"is {member.get('MEMBER_STATE')}" ) except Exception: output["message"] = "Group Replication tables not available" return self.format_json_result(output) # Group status output["group_status"] = { "group_name": variables.get("group_replication_group_name"), "single_primary_mode": variables.get( "group_replication_single_primary_mode" ), "auto_rejoin_tries": variables.get( "group_replication_autorejoin_tries" ), "member_count": len(members) } # Check quorum online_members = len([ m for m in output["members"] if m["state"] == "ONLINE" ]) if online_members < (len(output["members"]) // 2 + 1): output["issues"].append( f"Potential quorum loss: only {online_members} of " f"{len(output['members'])} members online" ) # Get local member info try: local_query = """ SELECT MEMBER_ID, COUNT_TRANSACTIONS_IN_QUEUE, COUNT_TRANSACTIONS_CHECKED, COUNT_CONFLICTS_DETECTED, COUNT_TRANSACTIONS_ROWS_VALIDATING, TRANSACTIONS_COMMITTED_ALL_MEMBERS, LAST_CONFLICT_FREE_TRANSACTION FROM performance_schema.replication_group_member_stats WHERE MEMBER_ID = @@server_uuid """ local_stats = await self.sql_driver.execute_query(local_query) if local_stats: stat = local_stats[0] output["local_member"] = { "transactions_in_queue": stat.get( "COUNT_TRANSACTIONS_IN_QUEUE" ), "transactions_checked": stat.get( "COUNT_TRANSACTIONS_CHECKED" ), "conflicts_detected": stat.get("COUNT_CONFLICTS_DETECTED"), "rows_validating": stat.get( "COUNT_TRANSACTIONS_ROWS_VALIDATING" ) } conflicts = stat.get("COUNT_CONFLICTS_DETECTED") or 0 if conflicts > 0: output["issues"].append( f"{conflicts} certification conflicts detected" ) queue_size = stat.get("COUNT_TRANSACTIONS_IN_QUEUE") or 0 if queue_size > 100: output["issues"].append( f"High transaction queue ({queue_size})" ) except Exception: pass # Recommendations if len(output["members"]) < 3: output["recommendations"].append( "Consider adding more members for better fault tolerance " "(minimum 3 recommended)" ) if not output["issues"]: output["recommendations"].append( "Group Replication appears healthy" ) return self.format_json_result(output) except Exception as e: return self.format_error(e)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server