Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

tools_engines.py29.5 kB
""" Storage engine analysis tool handlers for MySQL. Includes tools for analyzing storage engines: - Engine statistics and status - MyISAM-specific checks - InnoDB vs MyISAM comparison - Engine recommendation Based on MySQLTuner's storage engine analysis patterns. """ from __future__ import annotations from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver from .toolhandler import ToolHandler class StorageEngineAnalysisToolHandler(ToolHandler): """Tool handler for storage engine analysis.""" name = "analyze_storage_engines" title = "Storage Engine Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze storage engine usage and statistics for user tables. Provides: - List of available engines and their status - Table count and size by engine - Engine-specific metrics (InnoDB, MyISAM, MEMORY, etc.) - Recommendations for engine optimization Note: This tool only analyzes user/custom tables and excludes MySQL system tables (mysql, information_schema, performance_schema, sys) by default. Based on MySQLTuner's engine analysis patterns.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "include_table_details": { "type": "boolean", "description": "Include per-table engine details", "default": True }, "schema_name": { "type": "string", "description": "Filter by specific schema (optional)" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: include_details = arguments.get("include_table_details", True) schema_name = arguments.get("schema_name") output = { "available_engines": [], "engine_usage": {}, "engine_summary": {}, "myisam_analysis": {}, "innodb_analysis": {}, "issues": [], "recommendations": [] } # Get available storage engines engines_query = """ SELECT ENGINE, SUPPORT, COMMENT, TRANSACTIONS, XA, SAVEPOINTS FROM information_schema.ENGINES ORDER BY ENGINE """ engines = await self.sql_driver.execute_query(engines_query) for engine in engines: output["available_engines"].append({ "engine": engine.get("ENGINE"), "support": engine.get("SUPPORT"), "transactions": engine.get("TRANSACTIONS"), "xa": engine.get("XA"), "savepoints": engine.get("SAVEPOINTS"), "comment": engine.get("COMMENT") }) # Get table count and size by engine where_clause = "WHERE TABLE_SCHEMA NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')" if schema_name: where_clause = f"WHERE TABLE_SCHEMA = '{schema_name}'" engine_stats_query = f""" SELECT ENGINE, COUNT(*) as table_count, SUM(TABLE_ROWS) as total_rows, SUM(DATA_LENGTH) as data_size, SUM(INDEX_LENGTH) as index_size, SUM(DATA_LENGTH + INDEX_LENGTH) as total_size, SUM(DATA_FREE) as data_free FROM information_schema.TABLES {where_clause} AND TABLE_TYPE = 'BASE TABLE' AND ENGINE IS NOT NULL GROUP BY ENGINE ORDER BY total_size DESC """ engine_stats = await self.sql_driver.execute_query(engine_stats_query) total_tables = 0 total_data = 0 total_index = 0 for stat in engine_stats: engine_name = stat.get("ENGINE") table_count = stat.get("table_count") or 0 data_size = stat.get("data_size") or 0 index_size = stat.get("index_size") or 0 total_size = stat.get("total_size") or 0 data_free = stat.get("data_free") or 0 output["engine_usage"][engine_name] = { "table_count": table_count, "total_rows": stat.get("total_rows") or 0, "data_size_bytes": data_size, "data_size_mb": round(data_size / 1024 / 1024, 2), "index_size_bytes": index_size, "index_size_mb": round(index_size / 1024 / 1024, 2), "total_size_mb": round(total_size / 1024 / 1024, 2), "data_free_mb": round(data_free / 1024 / 1024, 2) } total_tables += table_count total_data += data_size total_index += index_size output["engine_summary"] = { "total_tables": total_tables, "total_data_size_mb": round(total_data / 1024 / 1024, 2), "total_index_size_mb": round(total_index / 1024 / 1024, 2), "total_size_mb": round((total_data + total_index) / 1024 / 1024, 2), "total_size_gb": round( (total_data + total_index) / 1024 / 1024 / 1024, 2 ) } # Analyze MyISAM if present if "MyISAM" in output["engine_usage"]: await self._analyze_myisam(output) # Analyze InnoDB if "InnoDB" in output["engine_usage"]: await self._analyze_innodb(output) # Analyze MEMORY tables if "MEMORY" in output["engine_usage"]: await self._analyze_memory_engine(output) # Table details by engine if include_details: await self._get_table_details(output, schema_name) # Generate recommendations self._generate_recommendations(output) return self.format_json_result(output) except Exception as e: return self.format_error(e) async def _analyze_myisam(self, output: dict) -> None: """Analyze MyISAM-specific metrics.""" variables = await self.sql_driver.get_server_variables() status = await self.sql_driver.get_server_status() key_buffer_size = int(variables.get("key_buffer_size", 0)) key_read_requests = int(status.get("Key_read_requests", 0)) key_reads = int(status.get("Key_reads", 0)) key_write_requests = int(status.get("Key_write_requests", 0)) key_writes = int(status.get("Key_writes", 0)) key_blocks_used = int(status.get("Key_blocks_used", 0)) key_blocks_unused = int(status.get("Key_blocks_unused", 0)) key_block_size = int(variables.get("key_cache_block_size", 1024)) output["myisam_analysis"] = { "key_buffer_size_mb": round(key_buffer_size / 1024 / 1024, 2), "key_read_requests": key_read_requests, "key_reads": key_reads, "key_write_requests": key_write_requests, "key_writes": key_writes, "key_blocks_used": key_blocks_used, "key_blocks_unused": key_blocks_unused } # Key cache hit ratio if key_read_requests > 0: key_cache_hit = ( (key_read_requests - key_reads) / key_read_requests * 100 ) output["myisam_analysis"]["key_cache_hit_ratio"] = round( key_cache_hit, 4 ) if key_cache_hit < 95: output["issues"].append( f"MyISAM key cache hit ratio is low ({key_cache_hit:.2f}%)" ) output["recommendations"].append( "Consider increasing key_buffer_size for MyISAM tables" ) # Key buffer usage if key_buffer_size > 0: key_used = key_blocks_used * key_block_size usage_pct = (key_used / key_buffer_size) * 100 output["myisam_analysis"]["key_buffer_usage_pct"] = round(usage_pct, 2) if usage_pct < 10 and key_buffer_size > 64 * 1024 * 1024: output["recommendations"].append( f"Key buffer usage is only {usage_pct:.1f}%. " "Consider reducing key_buffer_size." ) # MyISAM tables should generally be migrated to InnoDB myisam_tables = output["engine_usage"]["MyISAM"]["table_count"] if myisam_tables > 0: output["issues"].append( f"{myisam_tables} MyISAM tables found" ) output["recommendations"].append( "Consider migrating MyISAM tables to InnoDB for better " "crash recovery, transactions, and row-level locking" ) async def _analyze_innodb(self, output: dict) -> None: """Analyze InnoDB-specific metrics.""" variables = await self.sql_driver.get_server_variables("innodb%") status = await self.sql_driver.get_server_status("Innodb%") bp_size = int(variables.get("innodb_buffer_pool_size", 0)) bp_instances = int(variables.get("innodb_buffer_pool_instances", 1)) read_requests = int(status.get("Innodb_buffer_pool_read_requests", 0)) reads = int(status.get("Innodb_buffer_pool_reads", 0)) output["innodb_analysis"] = { "buffer_pool_size_gb": round(bp_size / 1024 / 1024 / 1024, 2), "buffer_pool_instances": bp_instances, "file_per_table": variables.get("innodb_file_per_table"), "flush_method": variables.get("innodb_flush_method"), "flush_log_at_trx_commit": variables.get( "innodb_flush_log_at_trx_commit" ), "doublewrite": variables.get("innodb_doublewrite"), "read_io_threads": variables.get("innodb_read_io_threads"), "write_io_threads": variables.get("innodb_write_io_threads") } # Buffer pool hit ratio if read_requests > 0: hit_ratio = (read_requests - reads) / read_requests * 100 output["innodb_analysis"]["buffer_pool_hit_ratio"] = round(hit_ratio, 4) # Compare buffer pool size to data size innodb_usage = output["engine_usage"].get("InnoDB", {}) innodb_data = innodb_usage.get("data_size_bytes", 0) innodb_index = innodb_usage.get("index_size_bytes", 0) total_innodb = innodb_data + innodb_index if total_innodb > 0: bp_coverage = (bp_size / total_innodb) * 100 output["innodb_analysis"]["buffer_pool_data_coverage_pct"] = round( bp_coverage, 2 ) if bp_coverage < 100: output["recommendations"].append( f"InnoDB buffer pool ({round(bp_size/1024/1024/1024, 2)}GB) " f"covers only {bp_coverage:.1f}% of InnoDB data " f"({round(total_innodb/1024/1024/1024, 2)}GB). " "Consider increasing innodb_buffer_pool_size." ) async def _analyze_memory_engine(self, output: dict) -> None: """Analyze MEMORY engine tables.""" variables = await self.sql_driver.get_server_variables() max_heap_table_size = int(variables.get("max_heap_table_size", 0)) tmp_table_size = int(variables.get("tmp_table_size", 0)) output["memory_engine"] = { "max_heap_table_size_mb": round(max_heap_table_size / 1024 / 1024, 2), "tmp_table_size_mb": round(tmp_table_size / 1024 / 1024, 2) } memory_usage = output["engine_usage"].get("MEMORY", {}) if memory_usage.get("table_count", 0) > 0: output["recommendations"].append( "MEMORY tables lose data on restart. Consider using InnoDB " "with appropriate caching if persistence is needed." ) async def _get_table_details(self, output: dict, schema_name: str = None) -> None: """Get detailed table information by engine.""" where_clause = "WHERE TABLE_SCHEMA NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')" if schema_name: where_clause = f"WHERE TABLE_SCHEMA = '{schema_name}'" # Get non-InnoDB tables (potential migration candidates) non_innodb_query = f""" SELECT TABLE_SCHEMA, TABLE_NAME, ENGINE, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, DATA_FREE FROM information_schema.TABLES {where_clause} AND ENGINE != 'InnoDB' AND TABLE_TYPE = 'BASE TABLE' ORDER BY DATA_LENGTH DESC LIMIT 20 """ non_innodb = await self.sql_driver.execute_query(non_innodb_query) output["non_innodb_tables"] = [ { "schema": row.get("TABLE_SCHEMA"), "table": row.get("TABLE_NAME"), "engine": row.get("ENGINE"), "rows": row.get("TABLE_ROWS"), "data_size_mb": round( (row.get("DATA_LENGTH") or 0) / 1024 / 1024, 2 ), "index_size_mb": round( (row.get("INDEX_LENGTH") or 0) / 1024 / 1024, 2 ) } for row in non_innodb ] # Get fragmented tables (DATA_FREE > 10% of DATA_LENGTH) fragmented_query = f""" SELECT TABLE_SCHEMA, TABLE_NAME, ENGINE, TABLE_ROWS, DATA_LENGTH, DATA_FREE, ROUND(DATA_FREE / DATA_LENGTH * 100, 2) as fragmentation_pct FROM information_schema.TABLES {where_clause} AND DATA_LENGTH > 0 AND DATA_FREE > DATA_LENGTH * 0.1 AND TABLE_TYPE = 'BASE TABLE' ORDER BY DATA_FREE DESC LIMIT 20 """ fragmented = await self.sql_driver.execute_query(fragmented_query) output["fragmented_tables"] = [ { "schema": row.get("TABLE_SCHEMA"), "table": row.get("TABLE_NAME"), "engine": row.get("ENGINE"), "rows": row.get("TABLE_ROWS"), "data_size_mb": round( (row.get("DATA_LENGTH") or 0) / 1024 / 1024, 2 ), "data_free_mb": round( (row.get("DATA_FREE") or 0) / 1024 / 1024, 2 ), "fragmentation_pct": row.get("fragmentation_pct") } for row in fragmented ] if output["fragmented_tables"]: output["recommendations"].append( f"{len(output['fragmented_tables'])} fragmented tables found. " "Consider running OPTIMIZE TABLE to reclaim space." ) def _generate_recommendations(self, output: dict) -> None: """Generate engine-related recommendations.""" # Check for engine diversity engine_count = len([ e for e in output["engine_usage"] if output["engine_usage"][e]["table_count"] > 0 ]) if engine_count > 2: output["issues"].append( f"Using {engine_count} different storage engines" ) output["recommendations"].append( "Consider consolidating to fewer storage engines (preferably InnoDB) " "for simpler management and better resource utilization" ) # Check default storage engine try: # Note: This would need to be async, adding as a recommendation output["recommendations"].append( "Ensure default_storage_engine is set to InnoDB for new tables" ) except Exception: pass class FragmentedTablesToolHandler(ToolHandler): """Tool handler for fragmented tables analysis.""" name = "get_fragmented_tables" title = "Fragmented Tables" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Find user tables with significant fragmentation. Fragmentation occurs when: - Data is deleted from tables - Tables are frequently updated - VARCHAR/TEXT columns are modified Note: This tool only analyzes user/custom tables and excludes MySQL system tables (mysql, information_schema, performance_schema, sys) by default. High fragmentation wastes disk space and can slow queries.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "min_fragmentation_pct": { "type": "number", "description": "Minimum fragmentation percentage threshold", "default": 10 }, "min_data_free_mb": { "type": "number", "description": "Minimum wasted space in MB", "default": 10 }, "schema_name": { "type": "string", "description": "Filter by specific schema" }, "limit": { "type": "integer", "description": "Maximum tables to return", "default": 50 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: min_frag_pct = arguments.get("min_fragmentation_pct", 10) min_data_free = arguments.get("min_data_free_mb", 10) * 1024 * 1024 schema_name = arguments.get("schema_name") limit = arguments.get("limit", 50) output = { "summary": {}, "fragmented_tables": [], "recommendations": [] } where_clause = """ WHERE TABLE_SCHEMA NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') """ if schema_name: where_clause = f"WHERE TABLE_SCHEMA = '{schema_name}'" query = f""" SELECT TABLE_SCHEMA, TABLE_NAME, ENGINE, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, DATA_FREE, ROUND(DATA_FREE / DATA_LENGTH * 100, 2) as fragmentation_pct FROM information_schema.TABLES {where_clause} AND TABLE_TYPE = 'BASE TABLE' AND DATA_LENGTH > 0 AND DATA_FREE >= {min_data_free} AND (DATA_FREE / DATA_LENGTH * 100) >= {min_frag_pct} ORDER BY DATA_FREE DESC LIMIT {limit} """ results = await self.sql_driver.execute_query(query) total_wasted = 0 for row in results: data_free = row.get("DATA_FREE") or 0 total_wasted += data_free output["fragmented_tables"].append({ "schema": row.get("TABLE_SCHEMA"), "table": row.get("TABLE_NAME"), "engine": row.get("ENGINE"), "rows": row.get("TABLE_ROWS"), "data_size_mb": round( (row.get("DATA_LENGTH") or 0) / 1024 / 1024, 2 ), "index_size_mb": round( (row.get("INDEX_LENGTH") or 0) / 1024 / 1024, 2 ), "data_free_mb": round(data_free / 1024 / 1024, 2), "fragmentation_pct": row.get("fragmentation_pct"), "optimize_command": ( f"OPTIMIZE TABLE `{row.get('TABLE_SCHEMA')}`." f"`{row.get('TABLE_NAME')}`" ) }) output["summary"] = { "fragmented_tables_count": len(results), "total_wasted_space_mb": round(total_wasted / 1024 / 1024, 2), "total_wasted_space_gb": round( total_wasted / 1024 / 1024 / 1024, 2 ) } if results: output["recommendations"].append( f"Found {len(results)} fragmented tables wasting " f"{output['summary']['total_wasted_space_mb']:.1f} MB" ) output["recommendations"].append( "Run OPTIMIZE TABLE on fragmented tables during low-traffic " "periods. Note: This locks the table for MyISAM." ) # InnoDB specific recommendation innodb_tables = [ t for t in output["fragmented_tables"] if t["engine"] == "InnoDB" ] if innodb_tables: output["recommendations"].append( "For InnoDB tables, consider ALTER TABLE ... ENGINE=InnoDB " "as an alternative to OPTIMIZE TABLE for online rebuild." ) return self.format_json_result(output) except Exception as e: return self.format_error(e) class AutoIncrementAnalysisToolHandler(ToolHandler): """Tool handler for auto-increment column analysis.""" name = "analyze_auto_increment" title = "Auto-Increment Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze auto-increment columns for potential overflow. Checks: - Current value vs maximum value for column type - Usage percentage - Tables approaching overflow Note: This tool only analyzes user/custom tables and excludes MySQL system tables (mysql, information_schema, performance_schema, sys) by default. Based on MySQLTuner's auto-increment analysis.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "warning_threshold_pct": { "type": "number", "description": "Warning threshold percentage", "default": 75 }, "schema_name": { "type": "string", "description": "Filter by specific schema" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: warning_pct = arguments.get("warning_threshold_pct", 75) schema_name = arguments.get("schema_name") output = { "summary": {}, "at_risk_tables": [], "all_auto_increment": [], "recommendations": [] } # Max values for different integer types max_values = { "tinyint": {"signed": 127, "unsigned": 255}, "smallint": {"signed": 32767, "unsigned": 65535}, "mediumint": {"signed": 8388607, "unsigned": 16777215}, "int": {"signed": 2147483647, "unsigned": 4294967295}, "bigint": {"signed": 9223372036854775807, "unsigned": 18446744073709551615} } where_clause = """ WHERE TABLE_SCHEMA NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') """ if schema_name: where_clause = f"WHERE TABLE_SCHEMA = '{schema_name}'" # Get auto_increment columns query = f""" SELECT t.TABLE_SCHEMA, t.TABLE_NAME, t.AUTO_INCREMENT, c.COLUMN_NAME, c.COLUMN_TYPE, c.DATA_TYPE FROM information_schema.TABLES t JOIN information_schema.COLUMNS c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME {where_clause} AND t.AUTO_INCREMENT IS NOT NULL AND c.EXTRA LIKE '%auto_increment%' ORDER BY t.AUTO_INCREMENT DESC """ results = await self.sql_driver.execute_query(query) at_risk_count = 0 for row in results: current_val = row.get("AUTO_INCREMENT") or 0 data_type = (row.get("DATA_TYPE") or "int").lower() column_type = (row.get("COLUMN_TYPE") or "").lower() # Determine if unsigned is_unsigned = "unsigned" in column_type sign_type = "unsigned" if is_unsigned else "signed" # Get max value for this type type_limits = max_values.get(data_type, max_values["int"]) max_val = type_limits[sign_type] # Calculate usage percentage usage_pct = (current_val / max_val) * 100 table_info = { "schema": row.get("TABLE_SCHEMA"), "table": row.get("TABLE_NAME"), "column": row.get("COLUMN_NAME"), "column_type": row.get("COLUMN_TYPE"), "current_value": current_val, "max_value": max_val, "usage_pct": round(usage_pct, 4) } output["all_auto_increment"].append(table_info) if usage_pct >= warning_pct: at_risk_count += 1 table_info["at_risk"] = True output["at_risk_tables"].append(table_info) output["summary"] = { "total_auto_increment_tables": len(results), "at_risk_tables_count": at_risk_count } # Generate recommendations for at-risk tables for table in output["at_risk_tables"]: data_type = table["column_type"].lower() if "bigint" in data_type: output["recommendations"].append( f"Table `{table['schema']}`.`{table['table']}` is at " f"{table['usage_pct']:.2f}% of BIGINT capacity. " "Consider data archival strategy." ) elif "unsigned" not in data_type: output["recommendations"].append( f"Table `{table['schema']}`.`{table['table']}` " f"({table['column_type']}) is at {table['usage_pct']:.2f}% " "capacity. Consider ALTER to UNSIGNED for 2x capacity or " "upgrade to larger integer type." ) else: next_type = { "tinyint": "smallint", "smallint": "mediumint", "mediumint": "int", "int": "bigint" } base_type = data_type.split()[0].replace("unsigned", "").strip() suggested = next_type.get(base_type, "bigint") output["recommendations"].append( f"Table `{table['schema']}`.`{table['table']}` " f"({table['column_type']}) is at {table['usage_pct']:.2f}% " f"capacity. Consider upgrading to {suggested.upper()} UNSIGNED." ) return self.format_json_result(output) except Exception as e: return self.format_error(e)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server