# mysql_mcp.py
from fastmcp import FastMCP
import mysql.connector
from mysql.connector import Error
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
# Load environment variables
load_dotenv()
# Initialize FastMCP app
mcp = FastMCP(
"MySQL MCP",
description="MySQL database connector for Claude",
dependencies=["mysql-connector-python", "python-dotenv"]
)
# Database connection configuration
class DBConfig(BaseModel):
host: str = Field(default=os.getenv("MYSQL_HOST", "localhost"))
port: int = Field(default=int(os.getenv("MYSQL_PORT", "3306")))
user: str = Field(default=os.getenv("MYSQL_USER", "root"))
password: str = Field(default=os.getenv("MYSQL_PASSWORD", ""))
database: Optional[str] = Field(default=os.getenv("MYSQL_DATABASE"))
# Global connection state
current_db = os.getenv("MYSQL_DATABASE", "")
config = DBConfig()
def get_connection():
"""Create a MySQL connection using the current configuration"""
try:
conn = mysql.connector.connect(
host=config.host,
port=config.port,
user=config.user,
password=config.password,
database=config.database if config.database else None
)
return conn
except Error as e:
raise Exception(f"Database connection error: {e}")
@mcp.tool()
def query_sql(query: str) -> Dict[str, Any]:
"""Execute a SELECT query and return the results"""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query)
results = cursor.fetchall()
return {
"rows": results[:100], # Limit to 100 rows for safety
"row_count": cursor.rowcount,
"column_names": [desc[0] for desc in cursor.description] if cursor.description else []
}
except Error as e:
raise Exception(f"Query error: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def execute_sql(query: str) -> Dict[str, Any]:
"""Execute a non-SELECT query (INSERT, UPDATE, DELETE, etc.)"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute(query)
conn.commit()
return {
"affected_rows": cursor.rowcount,
"last_insert_id": cursor.lastrowid if cursor.lastrowid else None
}
except Error as e:
conn.rollback()
raise Exception(f"Query error: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def explain_sql(query: str) -> Dict[str, Any]:
"""Get the execution plan for a query"""
conn = get_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(f"EXPLAIN {query}")
results = cursor.fetchall()
return {
"plan": results
}
except Error as e:
raise Exception(f"EXPLAIN error: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def show_databases() -> Dict[str, Any]:
"""List all available databases"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SHOW DATABASES")
results = cursor.fetchall()
return {
"databases": [db[0] for db in results]
}
except Error as e:
raise Exception(f"Error listing databases: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def use_database(database: str) -> Dict[str, Any]:
"""Switch to a different database"""
global config, current_db
# Verify database exists
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SHOW DATABASES")
dbs = [db[0] for db in cursor.fetchall()]
if database not in dbs:
raise ValueError(f"Database '{database}' does not exist")
# Update configuration
config.database = database
current_db = database
return {
"current_database": database,
"status": "success"
}
except Error as e:
raise Exception(f"Error changing database: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def show_tables() -> Dict[str, Any]:
"""List all tables in the current database"""
if not config.database:
raise ValueError("No database selected. Use 'use_database' first.")
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SHOW TABLES")
results = cursor.fetchall()
return {
"database": config.database,
"tables": [table[0] for table in results]
}
except Error as e:
raise Exception(f"Error listing tables: {e}")
finally:
cursor.close()
conn.close()
@mcp.tool()
def describe_table(table: str) -> Dict[str, Any]:
"""Get column definitions for a table"""
if not config.database:
raise ValueError("No database selected. Use 'use_database' first.")
conn = get_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(f"DESCRIBE {table}")
columns = cursor.fetchall()
# Get index information
cursor.execute(f"SHOW INDEX FROM {table}")
indexes = cursor.fetchall()
return {
"table": table,
"columns": columns,
"indexes": indexes
}
except Error as e:
raise Exception(f"Error describing table: {e}")
finally:
cursor.close()
conn.close()
@mcp.resource(f"schema://{'{database}'}")
def get_database_schema(database: Optional[str] = None) -> str:
"""Get the full schema of a database as a resource"""
db_to_use = database or config.database
if not db_to_use:
raise ValueError("No database specified or selected")
conn = get_connection()
cursor = conn.cursor()
schema = []
try:
# Switch to the specified database
cursor.execute(f"USE {db_to_use}")
# Get all tables
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
# Get CREATE TABLE statements for each table
for table in tables:
cursor.execute(f"SHOW CREATE TABLE {table}")
create_stmt = cursor.fetchone()[1]
schema.append(create_stmt)
return "\n\n".join(schema)
except Error as e:
raise Exception(f"Error getting schema: {e}")
finally:
cursor.close()
conn.close()
@mcp.prompt()
def write_query_for_task(task: str) -> str:
"""Help Claude write an optimal SQL query for a given task"""
return f"""Task: {task}
Please write an SQL query that accomplishes this task efficiently.
Some guidelines:
1. Use appropriate JOINs (INNER, LEFT, RIGHT) based on the data relationships
2. Filter data in the WHERE clause to minimize data processing
3. Consider using indexes for better performance
4. Use appropriate aggregation functions when needed
5. Format the query with clear indentation for readability
If you need to see the database schema first, you can access it using the schema:// resource.
"""
@mcp.prompt()
def analyze_query_performance(query: str) -> str:
"""Help Claude analyze the performance of a query"""
return f"""Query: {query}
Please analyze this query for performance issues:
1. First, use the explain_sql tool to get the execution plan
2. Look for table scans instead of index usage
3. Check if the joins are efficient
4. Identify if the query can be optimized with better indexes
5. Suggest concrete improvements to make the query more efficient
"""
if __name__ == "__main__":
# Run the server directly
mcp.run()