Skip to main content
Glama

MCP MySQL Server

by kami2k1
main_fastmcp_fixed.py15.2 kB
#!/usr/bin/env python3 """ MCP Server for MySQL Database Operations using FastMCP Simple and direct implementation without complex tool registry """ from typing import Any, Dict, List import logging import json import os from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv import pymysql from pymysql.cursors import DictCursor # Load environment variables load_dotenv() # Configure logging to stderr to avoid interfering with stdio communication logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("mysql-mcp-server") # Database configuration DB_CONFIG = { 'host': os.getenv('MYSQL_HOST', 'localhost'), 'port': int(os.getenv('MYSQL_PORT', 3306)), 'user': os.getenv('MYSQL_USER', 'root'), 'password': os.getenv('MYSQL_PASSWORD', ''), 'database': os.getenv('MYSQL_DATABASE', 'test'), 'charset': 'utf8mb4', 'autocommit': False, 'cursorclass': DictCursor } def get_db_connection(): """Get database connection""" return pymysql.connect(**DB_CONFIG) def execute_db_query(query: str, params=None) -> Dict[str, Any]: """Execute database query and return results""" try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(query, params) if query.strip().upper().startswith('SELECT') or query.strip().upper().startswith('SHOW') or query.strip().upper().startswith('DESCRIBE'): data = cursor.fetchall() row_count = len(data) else: conn.commit() data = [] row_count = cursor.rowcount return { "success": True, "row_count": row_count, "data": data } except Exception as e: logger.error(f"Database query failed: {e}") return { "success": False, "error": str(e), "data": [] } @mcp.tool() async def execute_query(query: str) -> str: """Execute a SQL query and return results. Args: query: The SQL query to execute (SELECT, INSERT, UPDATE, DELETE, etc.) """ logger.info(f"Executing query: {query[:100]}...") result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def get_tables() -> str: """List all tables in the database with basic information.""" logger.info("Getting table list") query = """ SELECT TABLE_NAME as table_name, TABLE_TYPE as table_type, ENGINE as engine, TABLE_ROWS as row_count, DATA_LENGTH as data_length, INDEX_LENGTH as index_length, TABLE_COMMENT as comment FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() ORDER BY TABLE_NAME """ result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def describe_table(table_name: str) -> str: """Get detailed information about a specific table including columns, data types, and constraints. Args: table_name: Name of the table to describe """ logger.info(f"Describing table: {table_name}") query = """ SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type, IS_NULLABLE as is_nullable, COLUMN_DEFAULT as default_value, COLUMN_KEY as key_type, EXTRA as extra, COLUMN_COMMENT as comment FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION """ result = execute_db_query(query, (table_name,)) return json.dumps(result, indent=2, default=str) @mcp.tool() async def analyze_table(table_name: str) -> str: """Analyze table statistics including row count, size, and column information. Args: table_name: Name of the table to analyze """ logger.info(f"Analyzing table: {table_name}") # Get table info table_info_query = """ SELECT TABLE_NAME as table_name, ENGINE as engine, TABLE_ROWS as row_count, DATA_LENGTH as data_length, INDEX_LENGTH as index_length, CREATE_TIME as created, UPDATE_TIME as updated FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s """ # Get column count column_count_query = """ SELECT COUNT(*) as column_count FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s """ table_info = execute_db_query(table_info_query, (table_name,)) column_info = execute_db_query(column_count_query, (table_name,)) result = { "table_info": table_info, "column_count": column_info } return json.dumps(result, indent=2, default=str) @mcp.tool() async def insert_data(table_name: str, data: Dict[str, Any]) -> str: """Insert data into a table. Args: table_name: Name of the table to insert into data: Dictionary of column names and values to insert """ logger.info(f"Inserting data into table: {table_name}") if not data: return json.dumps({"success": False, "error": "No data provided"}) columns = list(data.keys()) values = list(data.values()) placeholders = ', '.join(['%s'] * len(values)) query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" result = execute_db_query(query, values) return json.dumps(result, indent=2, default=str) @mcp.tool() async def update_data(table_name: str, data: Dict[str, Any], where_clause: str) -> str: """Update data in a table. Args: table_name: Name of the table to update data: Dictionary of column names and new values where_clause: WHERE clause to specify which records to update (e.g., "id = 1") """ logger.info(f"Updating data in table: {table_name}") if not data: return json.dumps({"success": False, "error": "No data provided"}) set_clause = ', '.join([f"{col} = %s" for col in data.keys()]) query = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}" result = execute_db_query(query, list(data.values())) return json.dumps(result, indent=2, default=str) @mcp.tool() async def delete_data(table_name: str, where_clause: str) -> str: """Delete data from a table. Args: table_name: Name of the table to delete from where_clause: WHERE clause to specify which records to delete (e.g., "id = 1") """ logger.info(f"Deleting data from table: {table_name}") query = f"DELETE FROM {table_name} WHERE {where_clause}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def optimize_table(table_name: str) -> str: """Optimize a table for better performance by rebuilding indexes and updating statistics. Args: table_name: Name of the table to optimize """ logger.info(f"Optimizing table: {table_name}") query = f"OPTIMIZE TABLE {table_name}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def backup_table(table_name: str, backup_name: str) -> str: """Create a backup copy of a table. Args: table_name: Name of the table to backup backup_name: Name for the backup table (will be created) """ logger.info(f"Backing up table {table_name} to {backup_name}") query = f"CREATE TABLE {backup_name} AS SELECT * FROM {table_name}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def create_database(database_name: str, charset: str = "utf8mb4", collation: str = "utf8mb4_unicode_ci") -> str: """Create a new database with full control. Args: database_name: Name of the database to create charset: Character set (default: utf8mb4) collation: Collation (default: utf8mb4_unicode_ci) """ logger.info(f"Creating database: {database_name}") query = f"CREATE DATABASE {database_name} CHARACTER SET {charset} COLLATE {collation}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def drop_database(database_name: str, confirm: bool = False) -> str: """Drop a database (DANGEROUS - requires confirmation). Args: database_name: Name of the database to drop confirm: Must be True to actually drop the database """ if not confirm: return json.dumps({"success": False, "error": "Must set confirm=True to drop database"}) logger.info(f"Dropping database: {database_name}") query = f"DROP DATABASE {database_name}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def show_processlist() -> str: """Show all running processes/connections in MySQL.""" logger.info("Showing process list") query = "SHOW PROCESSLIST" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def show_variables(pattern: str = "%") -> str: """Show MySQL system variables. Args: pattern: Pattern to filter variables (default: % for all) """ logger.info(f"Showing variables with pattern: {pattern}") query = f"SHOW VARIABLES LIKE '{pattern}'" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def kill_process(process_id: int) -> str: """Kill a MySQL process/connection by ID. Args: process_id: The process ID to kill """ logger.info(f"Killing process: {process_id}") query = f"KILL {process_id}" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def create_user(username: str, password: str, host: str = "%") -> str: """Create a new MySQL user with full administrative capabilities. Args: username: Username for the new user password: Password for the new user host: Host pattern (default: % for all hosts) """ logger.info(f"Creating user: {username}@{host}") query = f"CREATE USER '{username}'@'{host}' IDENTIFIED BY '{password}'" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def grant_privileges(username: str, privileges: str, database: str = "*", table: str = "*", host: str = "%") -> str: """Grant specific privileges to a user. Args: username: Username to grant privileges to privileges: Privileges to grant (e.g., "SELECT, INSERT" or "ALL PRIVILEGES") database: Database name (default: * for all databases) table: Table name (default: * for all tables) host: Host pattern (default: % for all hosts) """ logger.info(f"Granting {privileges} on {database}.{table} to {username}@{host}") query = f"GRANT {privileges} ON {database}.{table} TO '{username}'@'{host}'" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) @mcp.tool() async def execute_admin_command(command: str) -> str: """Execute administrative MySQL commands with full privileges. Args: command: Admin command (FLUSH, RESET, REPAIR, CHECK, etc.) """ logger.info(f"Executing admin command: {command}") result = execute_db_query(command) return json.dumps(result, indent=2, default=str) @mcp.tool() async def execute_multiple_queries(queries: str) -> str: """Execute multiple SQL queries separated by semicolons with full transaction control. Args: queries: Multiple SQL queries separated by semicolons """ logger.info("Executing multiple queries") query_list = [q.strip() for q in queries.split(';') if q.strip()] results = [] try: with get_db_connection() as conn: with conn.cursor() as cursor: for query in query_list: cursor.execute(query) if query.upper().startswith('SELECT') or query.upper().startswith('SHOW'): data = cursor.fetchall() results.append({ "query": query, "success": True, "data": data, "row_count": len(data) }) else: results.append({ "query": query, "success": True, "affected_rows": cursor.rowcount }) conn.commit() return json.dumps({ "success": True, "total_queries": len(query_list), "results": results }, indent=2, default=str) except Exception as e: logger.error(f"Multiple queries failed: {e}") return json.dumps({ "success": False, "error": str(e), "results": results }, indent=2, default=str) @mcp.tool() async def set_variable(variable_name: str, value: str, scope: str = "SESSION") -> str: """Set a MySQL system variable. Args: variable_name: Name of the variable to set value: Value to set scope: Scope (SESSION or GLOBAL) """ logger.info(f"Setting {scope} variable {variable_name} = {value}") query = f"SET {scope} {variable_name} = '{value}'" result = execute_db_query(query) return json.dumps(result, indent=2, default=str) async def initialize_server(): """Initialize and test server components""" try: # Test database connection with get_db_connection() as conn: logger.info("Database connection tested successfully") logger.info("MCP MySQL Server initialized successfully with 20 tools") return True except Exception as e: logger.error(f"Failed to initialize server: {e}") return False if __name__ == "__main__": async def initialize_and_run(): """Initialize and run server""" logger.info("Starting MCP MySQL Server with FastMCP...") # Initialize server components if not await initialize_server(): logger.error("Server initialization failed, exiting...") exit(1) logger.info("Starting server on stdio transport...") # Initialize first import asyncio asyncio.run(initialize_and_run()) # Then run the server normally mcp.run(transport="stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kami2k1/MCP-MYSQL'

If you have feedback or need assistance with the MCP directory API, please join our Discord server