Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

sql_driver.py13.1 kB
""" SQL Driver for MySQL query execution. """ from __future__ import annotations import logging from typing import Any, Optional, Sequence import aiomysql from .db_pool import DbConnPool logger = logging.getLogger("mysqltuner_mcp") class SqlDriver: """ MySQL SQL driver for executing queries. Provides a high-level interface for executing queries and managing database interactions using the connection pool. """ def __init__(self, pool: DbConnPool): """ Initialize the SQL driver. Args: pool: Database connection pool instance """ self.pool = pool async def execute_query( self, query: str, params: Optional[Sequence[Any]] = None, fetch_all: bool = True ) -> list[dict[str, Any]]: """ Execute a SQL query and return results as a list of dictionaries. Args: query: SQL query string params: Optional query parameters fetch_all: Whether to fetch all results (default: True) Returns: List of dictionaries with column names as keys """ async with self.pool.get_pool().acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: try: await cursor.execute(query, params) if fetch_all: rows = await cursor.fetchall() return list(rows) if rows else [] else: row = await cursor.fetchone() return [row] if row else [] except Exception as e: logger.error(f"Query execution failed: {str(e)}") logger.debug(f"Query: {query}") raise async def execute_one( self, query: str, params: Optional[Sequence[Any]] = None ) -> Optional[dict[str, Any]]: """ Execute a query and return a single result. Args: query: SQL query string params: Optional query parameters Returns: Single result dictionary or None """ results = await self.execute_query(query, params, fetch_all=False) return results[0] if results else None async def execute_scalar( self, query: str, params: Optional[Sequence[Any]] = None ) -> Any: """ Execute a query and return the first column of the first row. Args: query: SQL query string params: Optional query parameters Returns: Scalar value or None """ result = await self.execute_one(query, params) if result: # Return the first value from the dictionary return next(iter(result.values()), None) return None async def execute_many( self, query: str, params_list: Sequence[Sequence[Any]] ) -> int: """ Execute a query multiple times with different parameters. Args: query: SQL query string params_list: List of parameter sequences Returns: Total number of affected rows """ async with self.pool.get_pool().acquire() as conn: async with conn.cursor() as cursor: try: await cursor.executemany(query, params_list) return cursor.rowcount except Exception as e: logger.error(f"Batch execution failed: {str(e)}") raise async def get_server_version(self) -> str: """ Get the MySQL server version. Returns: Server version string """ result = await self.execute_scalar("SELECT VERSION()") return str(result) if result else "unknown" async def get_server_variables(self, like_pattern: Optional[str] = None) -> dict[str, Any]: """ Get MySQL server variables. Args: like_pattern: Optional LIKE pattern to filter variables Returns: Dictionary of variable names to values """ if like_pattern: query = "SHOW VARIABLES LIKE %s" results = await self.execute_query(query, [like_pattern]) else: query = "SHOW VARIABLES" results = await self.execute_query(query) return {row["Variable_name"]: row["Value"] for row in results} async def get_server_status(self, like_pattern: Optional[str] = None) -> dict[str, Any]: """ Get MySQL server status variables. Args: like_pattern: Optional LIKE pattern to filter status Returns: Dictionary of status variable names to values """ if like_pattern: query = "SHOW GLOBAL STATUS LIKE %s" results = await self.execute_query(query, [like_pattern]) else: query = "SHOW GLOBAL STATUS" results = await self.execute_query(query) return {row["Variable_name"]: row["Value"] for row in results} async def get_databases(self) -> list[str]: """ Get list of all databases. Returns: List of database names """ results = await self.execute_query("SHOW DATABASES") return [row["Database"] for row in results] async def get_tables(self, database: Optional[str] = None) -> list[str]: """ Get list of tables in a database. Args: database: Database name (uses current database if None) Returns: List of table names """ if database: query = f"SHOW TABLES FROM `{database}`" else: query = "SHOW TABLES" results = await self.execute_query(query) if results: # The column name varies based on the database name key = list(results[0].keys())[0] return [row[key] for row in results] return [] async def get_innodb_status(self) -> dict[str, Any]: """ Get InnoDB engine status. Returns: Dictionary containing InnoDB status information """ try: results = await self.execute_query("SHOW ENGINE INNODB STATUS") if results: return { "type": results[0].get("Type"), "name": results[0].get("Name"), "status": results[0].get("Status") } except Exception as e: logger.error(f"Failed to get InnoDB status: {str(e)}") return {} async def get_storage_engines(self) -> list[dict[str, Any]]: """ Get list of available storage engines. Returns: List of storage engine information dictionaries """ query = """ SELECT ENGINE, SUPPORT, COMMENT, TRANSACTIONS, XA, SAVEPOINTS FROM information_schema.ENGINES ORDER BY ENGINE """ return await self.execute_query(query) async def get_processlist( self, full: bool = True, exclude_system: bool = True ) -> list[dict[str, Any]]: """ Get current process list. Args: full: Include full query text exclude_system: Exclude system threads Returns: List of process information """ query = "SHOW FULL PROCESSLIST" if full else "SHOW PROCESSLIST" results = await self.execute_query(query) if exclude_system: results = [ r for r in results if r.get("User") not in ("system user", "event_scheduler") ] return results async def get_binary_logs(self) -> list[dict[str, Any]]: """ Get list of binary logs. Returns: List of binary log information """ try: return await self.execute_query("SHOW BINARY LOGS") except Exception as e: logger.debug(f"Binary logs not available: {str(e)}") return [] async def get_master_status(self) -> Optional[dict[str, Any]]: """ Get master/source replication status. Returns: Master status dictionary or None """ try: # Try MySQL 8.0.22+ syntax first try: results = await self.execute_query("SHOW BINARY LOG STATUS") except Exception: results = await self.execute_query("SHOW MASTER STATUS") return results[0] if results else None except Exception as e: logger.debug(f"Master status not available: {str(e)}") return None async def get_slave_status(self) -> list[dict[str, Any]]: """ Get slave/replica replication status. Returns: List of slave status dictionaries (for multi-source replication) """ try: # Try MySQL 8.0.22+ syntax first try: return await self.execute_query("SHOW REPLICA STATUS") except Exception: return await self.execute_query("SHOW SLAVE STATUS") except Exception as e: logger.debug(f"Slave status not available: {str(e)}") return [] async def get_table_stats( self, schema: Optional[str] = None, table: Optional[str] = None ) -> list[dict[str, Any]]: """ Get table statistics from information_schema. Args: schema: Optional schema filter table: Optional table filter Returns: List of table statistics """ query = """ SELECT TABLE_SCHEMA, TABLE_NAME, ENGINE, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, DATA_FREE, AUTO_INCREMENT, CREATE_TIME, UPDATE_TIME FROM information_schema.TABLES WHERE TABLE_TYPE = 'BASE TABLE' """ conditions = [] params = [] if schema: conditions.append("TABLE_SCHEMA = %s") params.append(schema) else: conditions.append( "TABLE_SCHEMA NOT IN " "('mysql', 'information_schema', 'performance_schema', 'sys')" ) if table: conditions.append("TABLE_NAME = %s") params.append(table) if conditions: query += " AND " + " AND ".join(conditions) query += " ORDER BY DATA_LENGTH DESC" return await self.execute_query(query, params if params else None) async def get_index_stats( self, schema: Optional[str] = None, table: Optional[str] = None ) -> list[dict[str, Any]]: """ Get index statistics. Args: schema: Optional schema filter table: Optional table filter Returns: List of index statistics """ query = """ SELECT TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, NON_UNIQUE, SEQ_IN_INDEX, COLUMN_NAME, CARDINALITY, SUB_PART, NULLABLE, INDEX_TYPE FROM information_schema.STATISTICS WHERE 1=1 """ params = [] if schema: query += " AND TABLE_SCHEMA = %s" params.append(schema) else: query += ( " AND TABLE_SCHEMA NOT IN " "('mysql', 'information_schema', 'performance_schema', 'sys')" ) if table: query += " AND TABLE_NAME = %s" params.append(table) query += " ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX" return await self.execute_query(query, params if params else None) async def check_performance_schema(self) -> bool: """ Check if performance_schema is enabled. Returns: True if performance_schema is enabled """ try: result = await self.execute_scalar("SELECT @@performance_schema") return result == 1 or result == "1" except Exception: return False async def check_sys_schema(self) -> bool: """ Check if sys schema is available. Returns: True if sys schema exists """ try: result = await self.execute_scalar( "SELECT COUNT(*) FROM information_schema.SCHEMATA " "WHERE SCHEMA_NAME = 'sys'" ) return result and result > 0 except Exception: return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server