main_fastmcp_fixed.py•15.2 kB
#!/usr/bin/env python3
"""
MCP Server for MySQL Database Operations using FastMCP
Simple and direct implementation without complex tool registry
"""
from typing import Any, Dict, List
import logging
import json
import os
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
import pymysql
from pymysql.cursors import DictCursor
# Load environment variables
load_dotenv()
# Configure logging to stderr to avoid interfering with stdio communication
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("mysql-mcp-server")
# Database configuration
DB_CONFIG = {
'host': os.getenv('MYSQL_HOST', 'localhost'),
'port': int(os.getenv('MYSQL_PORT', 3306)),
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', ''),
'database': os.getenv('MYSQL_DATABASE', 'test'),
'charset': 'utf8mb4',
'autocommit': False,
'cursorclass': DictCursor
}
def get_db_connection():
"""Get database connection"""
return pymysql.connect(**DB_CONFIG)
def execute_db_query(query: str, params=None) -> Dict[str, Any]:
"""Execute database query and return results"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
if query.strip().upper().startswith('SELECT') or query.strip().upper().startswith('SHOW') or query.strip().upper().startswith('DESCRIBE'):
data = cursor.fetchall()
row_count = len(data)
else:
conn.commit()
data = []
row_count = cursor.rowcount
return {
"success": True,
"row_count": row_count,
"data": data
}
except Exception as e:
logger.error(f"Database query failed: {e}")
return {
"success": False,
"error": str(e),
"data": []
}
@mcp.tool()
async def execute_query(query: str) -> str:
"""Execute a SQL query and return results.
Args:
query: The SQL query to execute (SELECT, INSERT, UPDATE, DELETE, etc.)
"""
logger.info(f"Executing query: {query[:100]}...")
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def get_tables() -> str:
"""List all tables in the database with basic information."""
logger.info("Getting table list")
query = """
SELECT
TABLE_NAME as table_name,
TABLE_TYPE as table_type,
ENGINE as engine,
TABLE_ROWS as row_count,
DATA_LENGTH as data_length,
INDEX_LENGTH as index_length,
TABLE_COMMENT as comment
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME
"""
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def describe_table(table_name: str) -> str:
"""Get detailed information about a specific table including columns, data types, and constraints.
Args:
table_name: Name of the table to describe
"""
logger.info(f"Describing table: {table_name}")
query = """
SELECT
COLUMN_NAME as column_name,
DATA_TYPE as data_type,
IS_NULLABLE as is_nullable,
COLUMN_DEFAULT as default_value,
COLUMN_KEY as key_type,
EXTRA as extra,
COLUMN_COMMENT as comment
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
result = execute_db_query(query, (table_name,))
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def analyze_table(table_name: str) -> str:
"""Analyze table statistics including row count, size, and column information.
Args:
table_name: Name of the table to analyze
"""
logger.info(f"Analyzing table: {table_name}")
# Get table info
table_info_query = """
SELECT
TABLE_NAME as table_name,
ENGINE as engine,
TABLE_ROWS as row_count,
DATA_LENGTH as data_length,
INDEX_LENGTH as index_length,
CREATE_TIME as created,
UPDATE_TIME as updated
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
"""
# Get column count
column_count_query = """
SELECT COUNT(*) as column_count
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
"""
table_info = execute_db_query(table_info_query, (table_name,))
column_info = execute_db_query(column_count_query, (table_name,))
result = {
"table_info": table_info,
"column_count": column_info
}
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def insert_data(table_name: str, data: Dict[str, Any]) -> str:
"""Insert data into a table.
Args:
table_name: Name of the table to insert into
data: Dictionary of column names and values to insert
"""
logger.info(f"Inserting data into table: {table_name}")
if not data:
return json.dumps({"success": False, "error": "No data provided"})
columns = list(data.keys())
values = list(data.values())
placeholders = ', '.join(['%s'] * len(values))
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
result = execute_db_query(query, values)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def update_data(table_name: str, data: Dict[str, Any], where_clause: str) -> str:
"""Update data in a table.
Args:
table_name: Name of the table to update
data: Dictionary of column names and new values
where_clause: WHERE clause to specify which records to update (e.g., "id = 1")
"""
logger.info(f"Updating data in table: {table_name}")
if not data:
return json.dumps({"success": False, "error": "No data provided"})
set_clause = ', '.join([f"{col} = %s" for col in data.keys()])
query = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"
result = execute_db_query(query, list(data.values()))
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def delete_data(table_name: str, where_clause: str) -> str:
"""Delete data from a table.
Args:
table_name: Name of the table to delete from
where_clause: WHERE clause to specify which records to delete (e.g., "id = 1")
"""
logger.info(f"Deleting data from table: {table_name}")
query = f"DELETE FROM {table_name} WHERE {where_clause}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def optimize_table(table_name: str) -> str:
"""Optimize a table for better performance by rebuilding indexes and updating statistics.
Args:
table_name: Name of the table to optimize
"""
logger.info(f"Optimizing table: {table_name}")
query = f"OPTIMIZE TABLE {table_name}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def backup_table(table_name: str, backup_name: str) -> str:
"""Create a backup copy of a table.
Args:
table_name: Name of the table to backup
backup_name: Name for the backup table (will be created)
"""
logger.info(f"Backing up table {table_name} to {backup_name}")
query = f"CREATE TABLE {backup_name} AS SELECT * FROM {table_name}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def create_database(database_name: str, charset: str = "utf8mb4", collation: str = "utf8mb4_unicode_ci") -> str:
"""Create a new database with full control.
Args:
database_name: Name of the database to create
charset: Character set (default: utf8mb4)
collation: Collation (default: utf8mb4_unicode_ci)
"""
logger.info(f"Creating database: {database_name}")
query = f"CREATE DATABASE {database_name} CHARACTER SET {charset} COLLATE {collation}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def drop_database(database_name: str, confirm: bool = False) -> str:
"""Drop a database (DANGEROUS - requires confirmation).
Args:
database_name: Name of the database to drop
confirm: Must be True to actually drop the database
"""
if not confirm:
return json.dumps({"success": False, "error": "Must set confirm=True to drop database"})
logger.info(f"Dropping database: {database_name}")
query = f"DROP DATABASE {database_name}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def show_processlist() -> str:
"""Show all running processes/connections in MySQL."""
logger.info("Showing process list")
query = "SHOW PROCESSLIST"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def show_variables(pattern: str = "%") -> str:
"""Show MySQL system variables.
Args:
pattern: Pattern to filter variables (default: % for all)
"""
logger.info(f"Showing variables with pattern: {pattern}")
query = f"SHOW VARIABLES LIKE '{pattern}'"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def kill_process(process_id: int) -> str:
"""Kill a MySQL process/connection by ID.
Args:
process_id: The process ID to kill
"""
logger.info(f"Killing process: {process_id}")
query = f"KILL {process_id}"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def create_user(username: str, password: str, host: str = "%") -> str:
"""Create a new MySQL user with full administrative capabilities.
Args:
username: Username for the new user
password: Password for the new user
host: Host pattern (default: % for all hosts)
"""
logger.info(f"Creating user: {username}@{host}")
query = f"CREATE USER '{username}'@'{host}' IDENTIFIED BY '{password}'"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def grant_privileges(username: str, privileges: str, database: str = "*", table: str = "*", host: str = "%") -> str:
"""Grant specific privileges to a user.
Args:
username: Username to grant privileges to
privileges: Privileges to grant (e.g., "SELECT, INSERT" or "ALL PRIVILEGES")
database: Database name (default: * for all databases)
table: Table name (default: * for all tables)
host: Host pattern (default: % for all hosts)
"""
logger.info(f"Granting {privileges} on {database}.{table} to {username}@{host}")
query = f"GRANT {privileges} ON {database}.{table} TO '{username}'@'{host}'"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def execute_admin_command(command: str) -> str:
"""Execute administrative MySQL commands with full privileges.
Args:
command: Admin command (FLUSH, RESET, REPAIR, CHECK, etc.)
"""
logger.info(f"Executing admin command: {command}")
result = execute_db_query(command)
return json.dumps(result, indent=2, default=str)
@mcp.tool()
async def execute_multiple_queries(queries: str) -> str:
"""Execute multiple SQL queries separated by semicolons with full transaction control.
Args:
queries: Multiple SQL queries separated by semicolons
"""
logger.info("Executing multiple queries")
query_list = [q.strip() for q in queries.split(';') if q.strip()]
results = []
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
for query in query_list:
cursor.execute(query)
if query.upper().startswith('SELECT') or query.upper().startswith('SHOW'):
data = cursor.fetchall()
results.append({
"query": query,
"success": True,
"data": data,
"row_count": len(data)
})
else:
results.append({
"query": query,
"success": True,
"affected_rows": cursor.rowcount
})
conn.commit()
return json.dumps({
"success": True,
"total_queries": len(query_list),
"results": results
}, indent=2, default=str)
except Exception as e:
logger.error(f"Multiple queries failed: {e}")
return json.dumps({
"success": False,
"error": str(e),
"results": results
}, indent=2, default=str)
@mcp.tool()
async def set_variable(variable_name: str, value: str, scope: str = "SESSION") -> str:
"""Set a MySQL system variable.
Args:
variable_name: Name of the variable to set
value: Value to set
scope: Scope (SESSION or GLOBAL)
"""
logger.info(f"Setting {scope} variable {variable_name} = {value}")
query = f"SET {scope} {variable_name} = '{value}'"
result = execute_db_query(query)
return json.dumps(result, indent=2, default=str)
async def initialize_server():
"""Initialize and test server components"""
try:
# Test database connection
with get_db_connection() as conn:
logger.info("Database connection tested successfully")
logger.info("MCP MySQL Server initialized successfully with 20 tools")
return True
except Exception as e:
logger.error(f"Failed to initialize server: {e}")
return False
if __name__ == "__main__":
async def initialize_and_run():
"""Initialize and run server"""
logger.info("Starting MCP MySQL Server with FastMCP...")
# Initialize server components
if not await initialize_server():
logger.error("Server initialization failed, exiting...")
exit(1)
logger.info("Starting server on stdio transport...")
# Initialize first
import asyncio
asyncio.run(initialize_and_run())
# Then run the server normally
mcp.run(transport="stdio")