Skip to main content
Glama
by ideaxy
mysql_mcp_server.py6.62 kB
#!/usr/bin/env python3 """ MySQL MCP Server using FastMCP Framework 允许AI助手通过自然语言查询与多个MySQL数据库交互 """ from fastmcp import FastMCP import mysql.connector from mysql.connector import Error import configparser import json import os from typing import List, Dict, Any, Optional from nlp_processor import NLPProcessor from db_manager import MetadataDBManager # 初始化FastMCP应用 app = FastMCP("MySQL MCP Server 🚀") def load_config(config_file='config.ini'): """从文件加载配置""" config = configparser.ConfigParser() config.read(config_file) return config def get_metadata_db_connection(): """创建到元数据数据库的连接""" config = load_config() try: connection = mysql.connector.connect( host=config['metadata_db']['host'], port=config.getint('metadata_db', 'port'), user=config['metadata_db']['user'], password=config['metadata_db']['password'], database=config['metadata_db']['database'], charset='utf8mb4', collation='utf8mb4_unicode_ci', use_unicode=True ) return connection except Error as e: raise Exception(f"连接元数据数据库时出错: {e}") def get_target_db_connection(database_id: int): """根据database_id创建到目标数据库的连接""" # 首先从元数据数据库获取数据库连接信息 metadata_conn = get_metadata_db_connection() try: cursor = metadata_conn.cursor(dictionary=True) cursor.execute( "SELECT host, port, username, password, database_name FROM database_connections WHERE id = %s", (database_id,) ) db_info = cursor.fetchone() if not db_info: raise Exception(f"未找到ID为 {database_id} 的数据库") # 现在连接到目标数据库 target_conn = mysql.connector.connect( host=db_info['host'], port=db_info['port'], user=db_info['username'], password=db_info['password'], database=db_info['database_name'], charset='utf8mb4', collation='utf8mb4_unicode_ci', use_unicode=True ) return target_conn finally: if metadata_conn.is_connected(): metadata_conn.close() @app.tool def list_databases() -> List[Dict[str, Any]]: """ 列出元数据数据库中所有已注册的数据库 Returns: 包含数据库信息的字典列表 """ connection = get_metadata_db_connection() try: cursor = connection.cursor(dictionary=True) cursor.execute(""" SELECT id, name, host, port, username, database_name, description FROM database_connections ORDER BY name """) databases = cursor.fetchall() return databases except Error as e: raise Exception(f"列出数据库时出错: {e}") finally: if connection.is_connected(): connection.close() @app.tool def execute_query(database_id: int, query: str) -> List[Dict[str, Any]]: """ 在指定数据库上执行SQL查询 Args: database_id: 要查询的数据库ID query: 要执行的SQL查询语句 Returns: 查询结果,以字典列表形式返回 """ connection = get_target_db_connection(database_id) try: cursor = connection.cursor(dictionary=True) cursor.execute(query) if query.strip().upper().startswith('SELECT'): results = cursor.fetchall() return results else: connection.commit() return [{"message": f"查询执行成功。影响行数: {cursor.rowcount}"}] except Error as e: raise Exception(f"执行查询时出错: {e}") finally: if connection.is_connected(): connection.close() @app.tool def natural_language_query(database_id: int, natural_query: str) -> List[Dict[str, Any]]: """ 在指定数据库上执行自然语言查询 将自然语言转换为SQL并执行 Args: database_id: 要查询的数据库ID natural_query: 自然语言查询 (例如, "显示所有用户", "统计产品记录数") Returns: 查询结果,以字典列表形式返回 """ # 使用NLPProcessor将自然语言转换为SQL metadata_manager = MetadataDBManager() # 确保连接到元数据数据库 if not metadata_manager.connect(): raise Exception("无法连接到元数据数据库") nlp_processor = NLPProcessor(metadata_manager) try: # 将自然语言转换为SQL sql_query = nlp_processor.process_query(natural_query, database_id) # 执行转换后的查询 connection = get_target_db_connection(database_id) try: cursor = connection.cursor(dictionary=True) cursor.execute(sql_query) if sql_query.strip().upper().startswith('SELECT'): results = cursor.fetchall() return results else: connection.commit() return [{"message": f"查询执行成功。影响行数: {cursor.rowcount}"}] except Error as e: raise Exception(f"执行查询时出错: {e}") finally: if connection.is_connected(): connection.close() except ValueError as e: raise Exception(str(e)) finally: # 清理连接 metadata_manager.disconnect() @app.tool def get_database_tables(database_id: int) -> List[str]: """ 获取指定数据库中的表列表 Args: database_id: 要查询的数据库ID Returns: 表名列表 """ try: connection = get_target_db_connection(database_id) except Exception as e: # 返回更详细的错误信息 raise Exception(f"无法连接到ID为 {database_id} 的数据库。错误: {str(e)}") try: cursor = connection.cursor() cursor.execute("SHOW TABLES") tables = [row[0] for row in cursor.fetchall()] return tables except Error as e: raise Exception(f"获取表列表时出错: {e}") finally: if connection.is_connected(): connection.close() if __name__ == "__main__": # 运行FastMCP服务器 app.run(transport='sse')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ideaxy/mysql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server