Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

tools_memory.py31.8 kB
""" Memory calculation tool handlers for MySQL. Includes tools for analyzing MySQL memory usage: - Per-thread buffer calculations - Server buffer calculations - Maximum memory estimation - Memory usage recommendations Based on MySQLTuner's calculations() function for memory analysis. """ from __future__ import annotations from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver from .toolhandler import ToolHandler class MemoryCalculationsToolHandler(ToolHandler): """Tool handler for MySQL memory calculations.""" name = "calculate_memory_usage" title = "Memory Calculator" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Calculate MySQL memory usage and provide recommendations. Analyzes: - Per-thread memory buffers (read_buffer, sort_buffer, join_buffer, etc.) - Global server buffers (key_buffer, innodb_buffer_pool, etc.) - Maximum potential memory usage - Current memory utilization Based on MySQLTuner's memory calculation methodology. Helps identify memory-related configuration issues.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "physical_memory_gb": { "type": "number", "description": "Physical memory in GB (for comparison). " "If not provided, uses system detection if available." }, "detailed": { "type": "boolean", "description": "Include detailed breakdown", "default": True } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: physical_memory_gb = arguments.get("physical_memory_gb") detailed = arguments.get("detailed", True) output = { "server_buffers": {}, "per_thread_buffers": {}, "memory_summary": {}, "recommendations": [], "issues": [] } # Get relevant variables variables = await self.sql_driver.get_server_variables() status = await self.sql_driver.get_server_status() # --- Server (Global) Buffers --- # These are allocated once for the entire server # Key buffer (MyISAM) key_buffer_size = int(variables.get("key_buffer_size", 0)) # Query cache (deprecated in 8.0, removed in 8.0.3+) query_cache_size = int(variables.get("query_cache_size", 0)) # InnoDB buffers innodb_buffer_pool_size = int( variables.get("innodb_buffer_pool_size", 0) ) innodb_log_buffer_size = int( variables.get("innodb_log_buffer_size", 0) ) innodb_additional_mem_pool_size = int( variables.get("innodb_additional_mem_pool_size", 0) ) # Table definition cache table_definition_cache = int( variables.get("table_definition_cache", 0) ) table_open_cache = int(variables.get("table_open_cache", 0)) # Binary log cache binlog_cache_size = int(variables.get("binlog_cache_size", 0)) output["server_buffers"] = { "key_buffer_size": self._format_bytes(key_buffer_size), "key_buffer_size_bytes": key_buffer_size, "query_cache_size": self._format_bytes(query_cache_size), "query_cache_size_bytes": query_cache_size, "innodb_buffer_pool_size": self._format_bytes( innodb_buffer_pool_size ), "innodb_buffer_pool_size_bytes": innodb_buffer_pool_size, "innodb_log_buffer_size": self._format_bytes(innodb_log_buffer_size), "innodb_log_buffer_size_bytes": innodb_log_buffer_size, "innodb_additional_mem_pool_size": self._format_bytes( innodb_additional_mem_pool_size ), "table_definition_cache": table_definition_cache, "table_open_cache": table_open_cache, "binlog_cache_size": self._format_bytes(binlog_cache_size), "binlog_cache_size_bytes": binlog_cache_size } # Total global buffers total_global_buffers = ( key_buffer_size + query_cache_size + innodb_buffer_pool_size + innodb_log_buffer_size + innodb_additional_mem_pool_size ) output["server_buffers"]["total"] = self._format_bytes(total_global_buffers) output["server_buffers"]["total_bytes"] = total_global_buffers # --- Per-Thread Buffers --- # These are allocated for each connection read_buffer_size = int(variables.get("read_buffer_size", 0)) read_rnd_buffer_size = int(variables.get("read_rnd_buffer_size", 0)) sort_buffer_size = int(variables.get("sort_buffer_size", 0)) join_buffer_size = int(variables.get("join_buffer_size", 0)) thread_stack = int(variables.get("thread_stack", 0)) binlog_stmt_cache_size = int( variables.get("binlog_stmt_cache_size", 0) ) net_buffer_length = int(variables.get("net_buffer_length", 0)) tmp_table_size = int(variables.get("tmp_table_size", 0)) max_heap_table_size = int(variables.get("max_heap_table_size", 0)) # Per-thread temp table is limited by smaller of tmp_table_size and # max_heap_table_size effective_tmp_table_size = min(tmp_table_size, max_heap_table_size) output["per_thread_buffers"] = { "read_buffer_size": self._format_bytes(read_buffer_size), "read_buffer_size_bytes": read_buffer_size, "read_rnd_buffer_size": self._format_bytes(read_rnd_buffer_size), "read_rnd_buffer_size_bytes": read_rnd_buffer_size, "sort_buffer_size": self._format_bytes(sort_buffer_size), "sort_buffer_size_bytes": sort_buffer_size, "join_buffer_size": self._format_bytes(join_buffer_size), "join_buffer_size_bytes": join_buffer_size, "thread_stack": self._format_bytes(thread_stack), "thread_stack_bytes": thread_stack, "binlog_stmt_cache_size": self._format_bytes(binlog_stmt_cache_size), "binlog_stmt_cache_size_bytes": binlog_stmt_cache_size, "net_buffer_length": self._format_bytes(net_buffer_length), "net_buffer_length_bytes": net_buffer_length, "tmp_table_size": self._format_bytes(tmp_table_size), "tmp_table_size_bytes": tmp_table_size, "max_heap_table_size": self._format_bytes(max_heap_table_size), "max_heap_table_size_bytes": max_heap_table_size, "effective_tmp_table_size": self._format_bytes( effective_tmp_table_size ) } # Total per-thread buffer size (worst case per connection) per_thread_total = ( read_buffer_size + read_rnd_buffer_size + sort_buffer_size + join_buffer_size + thread_stack + binlog_stmt_cache_size + net_buffer_length ) output["per_thread_buffers"]["total_per_thread"] = self._format_bytes( per_thread_total ) output["per_thread_buffers"]["total_per_thread_bytes"] = per_thread_total # --- Connection limits --- max_connections = int(variables.get("max_connections", 151)) current_connections = int(status.get("Threads_connected", 0)) max_used_connections = int(status.get("Max_used_connections", 0)) output["memory_summary"]["max_connections"] = max_connections output["memory_summary"]["current_connections"] = current_connections output["memory_summary"]["max_used_connections"] = max_used_connections # --- Memory calculations --- # Maximum possible memory (worst case: all connections using max buffers) max_total_memory = ( total_global_buffers + (per_thread_total * max_connections) ) # Peak memory (based on historical max connections) peak_memory = ( total_global_buffers + (per_thread_total * max_used_connections) ) # Current memory estimate current_memory = ( total_global_buffers + (per_thread_total * current_connections) ) output["memory_summary"]["max_total_memory"] = self._format_bytes( max_total_memory ) output["memory_summary"]["max_total_memory_bytes"] = max_total_memory output["memory_summary"]["max_total_memory_gb"] = round( max_total_memory / 1024 / 1024 / 1024, 2 ) output["memory_summary"]["peak_memory"] = self._format_bytes(peak_memory) output["memory_summary"]["peak_memory_bytes"] = peak_memory output["memory_summary"]["peak_memory_gb"] = round( peak_memory / 1024 / 1024 / 1024, 2 ) output["memory_summary"]["current_memory"] = self._format_bytes( current_memory ) output["memory_summary"]["current_memory_bytes"] = current_memory output["memory_summary"]["current_memory_gb"] = round( current_memory / 1024 / 1024 / 1024, 2 ) output["memory_summary"]["global_buffers"] = self._format_bytes( total_global_buffers ) output["memory_summary"]["global_buffers_bytes"] = total_global_buffers output["memory_summary"]["global_buffers_gb"] = round( total_global_buffers / 1024 / 1024 / 1024, 2 ) # Compare to physical memory if provided if physical_memory_gb: physical_memory_bytes = physical_memory_gb * 1024 * 1024 * 1024 output["memory_summary"]["physical_memory_gb"] = physical_memory_gb max_memory_pct = (max_total_memory / physical_memory_bytes) * 100 current_memory_pct = (current_memory / physical_memory_bytes) * 100 output["memory_summary"]["max_memory_pct_of_physical"] = round( max_memory_pct, 2 ) output["memory_summary"]["current_memory_pct_of_physical"] = round( current_memory_pct, 2 ) # Recommendations based on physical memory if max_memory_pct > 100: output["issues"].append( f"Max possible MySQL memory ({self._format_bytes(max_total_memory)}) " f"exceeds physical memory ({physical_memory_gb}GB)" ) output["recommendations"].append( "Reduce max_connections or per-thread buffer sizes to " "prevent potential OOM" ) elif max_memory_pct > 85: output["issues"].append( f"Max MySQL memory is {max_memory_pct:.1f}% of physical memory" ) output["recommendations"].append( "Consider reducing max_connections to leave headroom " "for OS and other processes" ) if detailed: await self._add_detailed_analysis(variables, status, output) # General recommendations self._generate_recommendations(variables, status, output) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _format_bytes(self, bytes_val: int) -> str: """Format bytes into human-readable string.""" if bytes_val < 1024: return f"{bytes_val} B" elif bytes_val < 1024 * 1024: return f"{bytes_val / 1024:.2f} KB" elif bytes_val < 1024 * 1024 * 1024: return f"{bytes_val / 1024 / 1024:.2f} MB" else: return f"{bytes_val / 1024 / 1024 / 1024:.2f} GB" async def _add_detailed_analysis( self, variables: dict, status: dict, output: dict ) -> None: """Add detailed memory analysis.""" # Key buffer usage (MyISAM) key_buffer_size = int(variables.get("key_buffer_size", 0)) key_blocks_used = int(status.get("Key_blocks_used", 0)) key_blocks_unused = int(status.get("Key_blocks_unused", 0)) key_block_size = int(variables.get("key_cache_block_size", 1024)) if key_buffer_size > 0: key_cache_used = key_blocks_used * key_block_size key_usage_pct = (key_cache_used / key_buffer_size) * 100 output["detailed"] = output.get("detailed", {}) output["detailed"]["key_buffer"] = { "size": self._format_bytes(key_buffer_size), "used": self._format_bytes(key_cache_used), "usage_pct": round(key_usage_pct, 2), "blocks_used": key_blocks_used, "blocks_unused": key_blocks_unused } if key_usage_pct < 10 and key_buffer_size > 64 * 1024 * 1024: output["recommendations"].append( f"Key buffer utilization is only {key_usage_pct:.1f}%. " "Consider reducing key_buffer_size if not using MyISAM." ) # Query cache usage (if enabled) query_cache_size = int(variables.get("query_cache_size", 0)) if query_cache_size > 0: qc_free_memory = int(status.get("Qcache_free_memory", 0)) qc_hits = int(status.get("Qcache_hits", 0)) qc_inserts = int(status.get("Qcache_inserts", 1)) output["detailed"] = output.get("detailed", {}) output["detailed"]["query_cache"] = { "size": self._format_bytes(query_cache_size), "free_memory": self._format_bytes(qc_free_memory), "usage_pct": round( (query_cache_size - qc_free_memory) / query_cache_size * 100, 2 ), "hit_ratio": round(qc_hits / max(qc_hits + qc_inserts, 1) * 100, 2) } # InnoDB buffer pool details innodb_bp_size = int(variables.get("innodb_buffer_pool_size", 0)) if innodb_bp_size > 0: bp_pages_total = int(status.get("Innodb_buffer_pool_pages_total", 1)) bp_pages_free = int(status.get("Innodb_buffer_pool_pages_free", 0)) bp_pages_data = int(status.get("Innodb_buffer_pool_pages_data", 0)) bp_pages_dirty = int(status.get("Innodb_buffer_pool_pages_dirty", 0)) read_requests = int(status.get("Innodb_buffer_pool_read_requests", 0)) reads = int(status.get("Innodb_buffer_pool_reads", 0)) hit_ratio = ( (read_requests - reads) / max(read_requests, 1) * 100 ) output["detailed"] = output.get("detailed", {}) output["detailed"]["innodb_buffer_pool"] = { "size": self._format_bytes(innodb_bp_size), "pages_total": bp_pages_total, "pages_free": bp_pages_free, "pages_data": bp_pages_data, "pages_dirty": bp_pages_dirty, "usage_pct": round( (bp_pages_total - bp_pages_free) / bp_pages_total * 100, 2 ), "hit_ratio_pct": round(hit_ratio, 4) } def _generate_recommendations( self, variables: dict, status: dict, output: dict ) -> None: """Generate memory-related recommendations.""" # Check sort buffer sort_buffer = int(variables.get("sort_buffer_size", 0)) if sort_buffer > 4 * 1024 * 1024: # > 4MB output["recommendations"].append( f"sort_buffer_size ({self._format_bytes(sort_buffer)}) is large. " "This is allocated per-sort, consider reducing if memory is tight." ) # Check join buffer join_buffer = int(variables.get("join_buffer_size", 0)) if join_buffer > 4 * 1024 * 1024: # > 4MB output["recommendations"].append( f"join_buffer_size ({self._format_bytes(join_buffer)}) is large. " "Multiple can be allocated per query for nested joins." ) # Check tmp_table_size vs max_heap_table_size mismatch tmp_table = int(variables.get("tmp_table_size", 0)) max_heap = int(variables.get("max_heap_table_size", 0)) if tmp_table != max_heap: output["recommendations"].append( f"tmp_table_size ({self._format_bytes(tmp_table)}) and " f"max_heap_table_size ({self._format_bytes(max_heap)}) differ. " "The smaller value is used. Consider aligning them." ) # Check for unused query cache in MySQL 5.7 qc_type = variables.get("query_cache_type", "OFF") qc_size = int(variables.get("query_cache_size", 0)) if qc_type == "OFF" and qc_size > 0: output["recommendations"].append( "query_cache_type is OFF but query_cache_size is allocated. " "Set query_cache_size=0 to free memory." ) # Check connection-related memory max_conn = int(variables.get("max_connections", 151)) if max_conn > 500: output["recommendations"].append( f"max_connections is high ({max_conn}). " "Consider using connection pooling to reduce memory overhead." ) # Thread cache thread_cache_size = int(variables.get("thread_cache_size", 0)) threads_created = int(status.get("Threads_created", 0)) connections = int(status.get("Connections", 1)) if connections > 0: thread_cache_miss_pct = (threads_created / connections) * 100 if thread_cache_miss_pct > 10 and thread_cache_size < 64: output["recommendations"].append( f"Thread cache miss rate is {thread_cache_miss_pct:.1f}%. " "Consider increasing thread_cache_size." ) class MemoryByHostToolHandler(ToolHandler): """Tool handler for memory usage by host/user.""" name = "get_memory_by_host" title = "Memory by Host" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get memory usage breakdown by host or user. Uses sys schema or performance_schema to show: - Memory allocated per host - Memory allocated per user - Memory by event/operation type Requires performance_schema memory instrumentation enabled.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "group_by": { "type": "string", "description": "Group memory by", "enum": ["host", "user", "event_name"], "default": "host" }, "limit": { "type": "integer", "description": "Maximum results to return", "default": 25 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: group_by = arguments.get("group_by", "host") limit = arguments.get("limit", 25) output = { "memory_usage": [], "total_allocated": {}, "recommendations": [] } # Check if memory instrumentation is available try: if group_by == "host": query = f""" SELECT host, current_count_used, current_allocated as current_bytes, current_avg_alloc, current_max_alloc, total_allocated FROM sys.memory_by_host_by_current_bytes WHERE host IS NOT NULL ORDER BY current_allocated DESC LIMIT {limit} """ elif group_by == "user": query = f""" SELECT user, current_count_used, current_allocated as current_bytes, current_avg_alloc, current_max_alloc, total_allocated FROM sys.memory_by_user_by_current_bytes WHERE user IS NOT NULL ORDER BY current_allocated DESC LIMIT {limit} """ else: # event_name query = f""" SELECT event_name, current_count, current_alloc as current_bytes, current_avg_alloc, high_count, high_alloc FROM sys.memory_global_by_current_bytes ORDER BY current_alloc DESC LIMIT {limit} """ results = await self.sql_driver.execute_query(query) for row in results: entry = {} if group_by == "host": entry["host"] = row.get("host") elif group_by == "user": entry["user"] = row.get("user") else: entry["event_name"] = row.get("event_name") entry["current_bytes"] = row.get("current_bytes") entry["current_alloc_readable"] = str( row.get("current_alloc") or row.get("current_allocated") ) entry["current_count"] = ( row.get("current_count_used") or row.get("current_count") ) output["memory_usage"].append(entry) # Get total total_query = """ SELECT SUM(current_alloc) as total FROM sys.memory_global_by_current_bytes """ total_result = await self.sql_driver.execute_scalar(total_query) output["total_allocated"]["bytes"] = total_result or 0 output["total_allocated"]["readable"] = self._format_bytes( total_result or 0 ) except Exception as e: # Memory instrumentation may not be enabled output["error"] = str(e) output["recommendations"].append( "Enable memory instrumentation in performance_schema: " "UPDATE performance_schema.setup_instruments " "SET ENABLED='YES' WHERE NAME LIKE 'memory/%'" ) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _format_bytes(self, bytes_val: int) -> str: """Format bytes into human-readable string.""" if bytes_val < 1024: return f"{bytes_val} B" elif bytes_val < 1024 * 1024: return f"{bytes_val / 1024:.2f} KB" elif bytes_val < 1024 * 1024 * 1024: return f"{bytes_val / 1024 / 1024:.2f} MB" else: return f"{bytes_val / 1024 / 1024 / 1024:.2f} GB" class TableMemoryUsageToolHandler(ToolHandler): """Tool handler for table memory and cache analysis.""" name = "get_table_memory_usage" title = "Table Memory Usage" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze memory usage for user tables and caches. Shows: - Table cache usage and hit rates - Table definition cache efficiency - Open tables vs table_open_cache - InnoDB buffer pool by table Note: Buffer pool breakdown by table only shows user/custom tables and excludes MySQL system tables (mysql, information_schema, performance_schema, sys). Helps optimize table caching parameters.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "include_buffer_pool": { "type": "boolean", "description": "Include InnoDB buffer pool by table", "default": True }, "top_n_tables": { "type": "integer", "description": "Number of top tables to show", "default": 20 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: include_bp = arguments.get("include_buffer_pool", True) top_n = arguments.get("top_n_tables", 20) output = { "table_cache": {}, "definition_cache": {}, "buffer_pool_by_table": [], "recommendations": [] } # Get relevant variables and status variables = await self.sql_driver.get_server_variables() status = await self.sql_driver.get_server_status() # Table cache analysis table_open_cache = int(variables.get("table_open_cache", 0)) open_tables = int(status.get("Open_tables", 0)) opened_tables = int(status.get("Opened_tables", 0)) table_open_cache_instances = int( variables.get("table_open_cache_instances", 1) ) output["table_cache"] = { "table_open_cache": table_open_cache, "table_open_cache_instances": table_open_cache_instances, "open_tables": open_tables, "opened_tables": opened_tables, "cache_usage_pct": round( open_tables / max(table_open_cache, 1) * 100, 2 ), "cache_misses": opened_tables - open_tables } # Calculate table cache hit ratio if opened_tables > 0: # Low ratio means cache misses miss_ratio = (opened_tables - open_tables) / opened_tables * 100 if miss_ratio > 10: output["recommendations"].append( f"Table cache miss ratio is {miss_ratio:.1f}%. " "Consider increasing table_open_cache." ) # Table definition cache table_definition_cache = int( variables.get("table_definition_cache", 0) ) open_table_definitions = int( status.get("Open_table_definitions", 0) ) opened_table_definitions = int( status.get("Opened_table_definitions", 0) ) output["definition_cache"] = { "table_definition_cache": table_definition_cache, "open_definitions": open_table_definitions, "opened_definitions": opened_table_definitions, "cache_usage_pct": round( open_table_definitions / max(table_definition_cache, 1) * 100, 2 ) } # Define system schemas to exclude from analysis system_schemas = "('mysql', 'information_schema', 'performance_schema', 'sys')" # InnoDB buffer pool by table if include_bp: try: bp_query = f""" SELECT object_schema, object_name, allocated, data, pages, pages_hashed, pages_old, rows_cached FROM sys.innodb_buffer_stats_by_table WHERE object_schema NOT IN {system_schemas} ORDER BY allocated DESC LIMIT {top_n} """ bp_results = await self.sql_driver.execute_query(bp_query) for row in bp_results: output["buffer_pool_by_table"].append({ "schema": row.get("object_schema"), "table": row.get("object_name"), "allocated": str(row.get("allocated")), "data": str(row.get("data")), "pages": row.get("pages"), "rows_cached": row.get("rows_cached") }) except Exception: # sys schema may not be available output["buffer_pool_by_table"] = [] output["recommendations"].append( "Install sys schema for detailed buffer pool breakdown" ) # Additional recommendations if open_tables >= table_open_cache * 0.9: output["recommendations"].append( f"Table cache is {output['table_cache']['cache_usage_pct']:.1f}% full. " "Consider increasing table_open_cache." ) return self.format_json_result(output) except Exception as e: return self.format_error(e)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server