#!/usr/bin/env python3
"""
MySQL MCP Server - A Model Context Protocol server for MySQL database operations.
Supports multiple named connections and query execution.
"""
import asyncio
import json
import logging
import os
import sqlite3
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import aiomysql
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
INVALID_PARAMS,
INTERNAL_ERROR,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mysql-mcp")
# Default path for SQLite database
DB_PATH = Path(os.environ.get("MCP_CONFIG_PATH", "/app/config")) / "connections.db"
class ConnectionStore:
"""SQLite-based persistent storage for connection configurations."""
def __init__(self, db_path: Path):
self.db_path = db_path
self._ensure_db()
def _ensure_db(self):
"""Ensure the database and table exist."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS connections (
name TEXT PRIMARY KEY,
host TEXT NOT NULL,
port INTEGER NOT NULL,
user TEXT NOT NULL,
password TEXT NOT NULL,
database TEXT
)
""")
conn.commit()
logger.info(f"Connection store initialized at {self.db_path}")
def save(self, name: str, host: str, port: int, user: str, password: str, database: str | None) -> None:
"""Save or update a connection configuration."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO connections (name, host, port, user, password, database)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, host, port, user, password, database))
conn.commit()
logger.info(f"Connection '{name}' saved to store")
def delete(self, name: str) -> bool:
"""Delete a connection configuration."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("DELETE FROM connections WHERE name = ?", (name,))
conn.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.info(f"Connection '{name}' deleted from store")
return deleted
def get_all(self) -> list[dict[str, Any]]:
"""Get all saved connection configurations."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM connections")
return [dict(row) for row in cursor.fetchall()]
def get(self, name: str) -> dict[str, Any] | None:
"""Get a specific connection configuration."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM connections WHERE name = ?", (name,))
row = cursor.fetchone()
return dict(row) if row else None
def update(self, name: str, **updates) -> bool:
"""Update specific fields of a connection. Returns True if updated."""
existing = self.get(name)
if not existing:
return False
# Handle rename case
new_name = updates.pop("new_name", None)
# Build update query for remaining fields
if updates:
set_clauses = []
values = []
for field, value in updates.items():
if field in ("host", "port", "user", "password", "database"):
set_clauses.append(f"{field} = ?")
values.append(value)
if set_clauses:
values.append(name)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
f"UPDATE connections SET {', '.join(set_clauses)} WHERE name = ?",
values
)
conn.commit()
# Handle rename after other updates
if new_name and new_name != name:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE connections SET name = ? WHERE name = ?",
(new_name, name)
)
conn.commit()
logger.info(f"Connection '{name}' renamed to '{new_name}'")
return True
@dataclass
class ConnectionConfig:
host: str
port: int
user: str
password: str
database: str | None = None
class ConnectionManager:
"""Manages multiple MySQL connections."""
def __init__(self, store: ConnectionStore):
self._pools: dict[str, aiomysql.Pool] = {}
self._configs: dict[str, ConnectionConfig] = {}
self._store = store
async def add_connection(
self,
name: str,
host: str,
port: int,
user: str,
password: str,
database: str | None = None,
) -> None:
"""Add a new named connection."""
if name in self._pools:
await self.remove_connection(name)
config = ConnectionConfig(
host=host, port=port, user=user, password=password, database=database
)
self._configs[name] = config
# Persist to SQLite
self._store.save(name, host, port, user, password, database)
pool = await aiomysql.create_pool(
host=host,
port=port,
user=user,
password=password,
db=database,
minsize=1,
maxsize=5,
autocommit=True,
)
self._pools[name] = pool
logger.info(f"Connection '{name}' added successfully")
async def remove_connection(self, name: str) -> bool:
"""Remove a named connection."""
# Remove from store first
deleted_from_store = self._store.delete(name)
if name not in self._pools:
return deleted_from_store
pool = self._pools.pop(name)
self._configs.pop(name, None)
pool.close()
await pool.wait_closed()
logger.info(f"Connection '{name}' removed")
return True
def list_connections(self) -> list[dict[str, Any]]:
"""List all active connections."""
return [
{
"name": name,
"host": config.host,
"port": config.port,
"user": config.user,
"database": config.database,
}
for name, config in self._configs.items()
]
@asynccontextmanager
async def get_connection(self, name: str):
"""Get a connection from the pool."""
if name not in self._pools:
raise ValueError(f"Connection '{name}' not found")
pool = self._pools[name]
async with pool.acquire() as conn:
yield conn
async def close_all(self):
"""Close all connection pools (does not remove from store)."""
for name in list(self._pools.keys()):
pool = self._pools.pop(name)
self._configs.pop(name, None)
pool.close()
await pool.wait_closed()
async def load_from_store(self):
"""Load and connect to all saved connections from the store."""
saved_connections = self._store.get_all()
for conn in saved_connections:
try:
await self.add_connection(
name=conn["name"],
host=conn["host"],
port=conn["port"],
user=conn["user"],
password=conn["password"],
database=conn.get("database"),
)
logger.info(f"Restored connection '{conn['name']}' from store")
except Exception as e:
logger.warning(f"Failed to restore connection '{conn['name']}': {e}")
async def update_connection(self, name: str, **updates) -> str:
"""Update a connection's configuration. Reconnects if needed."""
# Get current config from store (includes password)
current = self._store.get(name)
if not current:
raise ValueError(f"Connection '{name}' not found")
new_name = updates.get("new_name", name)
# Check what's changing
reconnect_needed = any(
updates.get(k) is not None and updates.get(k) != current.get(k)
for k in ("host", "port", "user", "password", "database")
)
# Update the store
self._store.update(name, **updates)
# If connection params changed, reconnect
if reconnect_needed or new_name != name:
# Close old pool if exists
if name in self._pools:
pool = self._pools.pop(name)
self._configs.pop(name, None)
pool.close()
await pool.wait_closed()
# Get updated config and reconnect
updated = self._store.get(new_name)
if updated:
pool = await aiomysql.create_pool(
host=updated["host"],
port=updated["port"],
user=updated["user"],
password=updated["password"],
db=updated.get("database"),
minsize=1,
maxsize=5,
autocommit=True,
)
self._pools[new_name] = pool
self._configs[new_name] = ConnectionConfig(
host=updated["host"],
port=updated["port"],
user=updated["user"],
password=updated["password"],
database=updated.get("database"),
)
logger.info(f"Connection '{new_name}' reconnected with new settings")
elif new_name != name:
# Just rename in memory
if name in self._pools:
self._pools[new_name] = self._pools.pop(name)
if name in self._configs:
self._configs[new_name] = self._configs.pop(name)
return new_name
# Global connection store and manager
conn_store = ConnectionStore(DB_PATH)
conn_manager = ConnectionManager(conn_store)
# Create MCP server
server = Server("mysql-mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="add_connection",
description="Add a new MySQL connection with a unique name",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Unique name for this connection",
},
"host": {
"type": "string",
"description": "MySQL server hostname",
},
"port": {
"type": "integer",
"description": "MySQL server port (default: 3306)",
"default": 3306,
},
"user": {
"type": "string",
"description": "MySQL username",
},
"password": {
"type": "string",
"description": "MySQL password",
},
"database": {
"type": "string",
"description": "Default database to use (optional)",
},
},
"required": ["name", "host", "user", "password"],
},
),
Tool(
name="remove_connection",
description="Remove an existing MySQL connection",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the connection to remove",
},
},
"required": ["name"],
},
),
Tool(
name="list_connections",
description="List all active MySQL connections",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="execute_query",
description="Execute a SQL query on a named connection",
inputSchema={
"type": "object",
"properties": {
"connection": {
"type": "string",
"description": "Name of the connection to use",
},
"query": {
"type": "string",
"description": "SQL query to execute",
},
"params": {
"type": "array",
"description": "Query parameters for parameterized queries",
"items": {},
},
},
"required": ["connection", "query"],
},
),
Tool(
name="list_databases",
description="List all databases on a connection",
inputSchema={
"type": "object",
"properties": {
"connection": {
"type": "string",
"description": "Name of the connection to use",
},
},
"required": ["connection"],
},
),
Tool(
name="list_tables",
description="List all tables in a database",
inputSchema={
"type": "object",
"properties": {
"connection": {
"type": "string",
"description": "Name of the connection to use",
},
"database": {
"type": "string",
"description": "Database name (uses connection default if not specified)",
},
},
"required": ["connection"],
},
),
Tool(
name="describe_table",
description="Get the schema/structure of a table",
inputSchema={
"type": "object",
"properties": {
"connection": {
"type": "string",
"description": "Name of the connection to use",
},
"table": {
"type": "string",
"description": "Table name",
},
"database": {
"type": "string",
"description": "Database name (uses connection default if not specified)",
},
},
"required": ["connection", "table"],
},
),
Tool(
name="edit_connection",
description="Edit an existing MySQL connection. Only provide fields you want to change. Password can only be updated, not retrieved.",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Current name of the connection to edit",
},
"new_name": {
"type": "string",
"description": "New name for the connection (optional)",
},
"host": {
"type": "string",
"description": "New MySQL server hostname (optional)",
},
"port": {
"type": "integer",
"description": "New MySQL server port (optional)",
},
"user": {
"type": "string",
"description": "New MySQL username (optional)",
},
"password": {
"type": "string",
"description": "New MySQL password (optional, write-only)",
},
"database": {
"type": "string",
"description": "New default database (optional)",
},
},
"required": ["name"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "add_connection":
await conn_manager.add_connection(
name=arguments["name"],
host=arguments["host"],
port=arguments.get("port", 3306),
user=arguments["user"],
password=arguments["password"],
database=arguments.get("database"),
)
return [
TextContent(
type="text",
text=f"Connection '{arguments['name']}' added successfully",
)
]
elif name == "remove_connection":
removed = await conn_manager.remove_connection(arguments["name"])
if removed:
return [
TextContent(
type="text",
text=f"Connection '{arguments['name']}' removed successfully",
)
]
else:
return [
TextContent(
type="text",
text=f"Connection '{arguments['name']}' not found",
)
]
elif name == "list_connections":
connections = conn_manager.list_connections()
return [
TextContent(
type="text",
text=json.dumps(connections, indent=2),
)
]
elif name == "execute_query":
conn_name = arguments["connection"]
query = arguments["query"]
params = arguments.get("params")
async with conn_manager.get_connection(conn_name) as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params)
# Check if this is a SELECT-like query
if cursor.description:
rows = await cursor.fetchall()
result = {
"rows": rows,
"row_count": len(rows),
"columns": [col[0] for col in cursor.description],
}
else:
result = {
"affected_rows": cursor.rowcount,
"last_insert_id": cursor.lastrowid,
}
return [
TextContent(
type="text",
text=json.dumps(result, indent=2, default=str),
)
]
elif name == "list_databases":
conn_name = arguments["connection"]
async with conn_manager.get_connection(conn_name) as conn:
async with conn.cursor() as cursor:
await cursor.execute("SHOW DATABASES")
rows = await cursor.fetchall()
databases = [row[0] for row in rows]
return [
TextContent(
type="text",
text=json.dumps({"databases": databases}, indent=2),
)
]
elif name == "list_tables":
conn_name = arguments["connection"]
database = arguments.get("database")
async with conn_manager.get_connection(conn_name) as conn:
async with conn.cursor() as cursor:
if database:
await cursor.execute(f"SHOW TABLES FROM `{database}`")
else:
await cursor.execute("SHOW TABLES")
rows = await cursor.fetchall()
tables = [row[0] for row in rows]
return [
TextContent(
type="text",
text=json.dumps({"tables": tables}, indent=2),
)
]
elif name == "describe_table":
conn_name = arguments["connection"]
table = arguments["table"]
database = arguments.get("database")
async with conn_manager.get_connection(conn_name) as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
if database:
await cursor.execute(f"DESCRIBE `{database}`.`{table}`")
else:
await cursor.execute(f"DESCRIBE `{table}`")
rows = await cursor.fetchall()
return [
TextContent(
type="text",
text=json.dumps({"schema": rows}, indent=2, default=str),
)
]
elif name == "edit_connection":
conn_name = arguments["name"]
updates = {}
for field in ("new_name", "host", "port", "user", "password", "database"):
if field in arguments and arguments[field] is not None:
updates[field] = arguments[field]
if not updates:
return [
TextContent(
type="text",
text="No changes specified",
)
]
new_name = await conn_manager.update_connection(conn_name, **updates)
changes = []
if "new_name" in updates:
changes.append(f"renamed to '{new_name}'")
if "host" in updates:
changes.append(f"host={updates['host']}")
if "port" in updates:
changes.append(f"port={updates['port']}")
if "user" in updates:
changes.append(f"user={updates['user']}")
if "password" in updates:
changes.append("password=****")
if "database" in updates:
changes.append(f"database={updates['database']}")
return [
TextContent(
type="text",
text=f"Connection '{conn_name}' updated: {', '.join(changes)}",
)
]
else:
return [
TextContent(
type="text",
text=f"Unknown tool: {name}",
)
]
except ValueError as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
except Exception as e:
logger.exception(f"Error executing tool {name}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
"""Main entry point."""
logger.info("Starting MySQL MCP Server")
logger.info(f"Using config path: {DB_PATH}")
# Load saved connections from SQLite store
await conn_manager.load_from_store()
# Also load connections from environment if provided (these will be saved to store)
connections_json = os.environ.get("MYSQL_CONNECTIONS")
if connections_json:
try:
connections = json.loads(connections_json)
for conn in connections:
await conn_manager.add_connection(
name=conn["name"],
host=conn["host"],
port=conn.get("port", 3306),
user=conn["user"],
password=conn["password"],
database=conn.get("database"),
)
except Exception as e:
logger.error(f"Failed to load connections from environment: {e}")
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())