Skip to main content
Glama
by ideaxy
db_manager.py20.2 kB
""" MySQL MCP的数据库管理器 处理与元数据数据库和目标数据库的连接 """ import mysql.connector from mysql.connector import Error import configparser from typing import List, Dict, Any, Optional from database_models import DatabaseConnection, TableMetadata, ColumnMetadata, QueryHistory import json class MetadataDBManager: """管理与元数据数据库的连接和操作""" def __init__(self, config_file='config.ini'): self.config_file = config_file self.connection = None self.config = self._load_config() def _load_config(self) -> configparser.ConfigParser: """Load configuration from file""" config = configparser.ConfigParser() config.read(self.config_file) return config def connect(self): """连接到元数据数据库""" try: self.connection = mysql.connector.connect( host=self.config['metadata_db']['host'], port=self.config.getint('metadata_db', 'port'), user=self.config['metadata_db']['user'], password=self.config['metadata_db']['password'], database=self.config['metadata_db']['database'], charset='utf8mb4', collation='utf8mb4_unicode_ci', use_unicode=True ) return True except Error as e: print(f"连接元数据数据库时出错: {e}") return False def disconnect(self): """Disconnect from metadata database""" if self.connection and self.connection.is_connected(): self.connection.close() def get_all_databases(self) -> List[DatabaseConnection]: """从元数据数据库检索所有数据库连接""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(""" SELECT id, name, host, port, username, password, database_name, description, created_at, updated_at FROM database_connections ORDER BY name """) rows = cursor.fetchall() databases = [] for row in rows: db_conn = DatabaseConnection( id=row['id'], name=row['name'], host=row['host'], port=row['port'], username=row['username'], password=row['password'], database_name=row['database_name'], description=row['description'], created_at=row['created_at'], updated_at=row['updated_at'] ) databases.append(db_conn) return databases finally: cursor.close() def get_database_by_id(self, database_id: int) -> Optional[DatabaseConnection]: """根据ID检索特定的数据库连接""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(""" SELECT id, name, host, port, username, password, database_name, description, created_at, updated_at FROM database_connections WHERE id = %s """, (database_id,)) row = cursor.fetchone() if row: return DatabaseConnection( id=row['id'], name=row['name'], host=row['host'], port=row['port'], username=row['username'], password=row['password'], database_name=row['database_name'], description=row['description'], created_at=row['created_at'], updated_at=row['updated_at'] ) return None finally: cursor.close() def add_database(self, db_conn: DatabaseConnection) -> bool: """添加新的数据库连接到元数据数据库""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: cursor.execute(""" INSERT INTO database_connections (name, host, port, username, password, database_name, description) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( db_conn.name, db_conn.host, db_conn.port, db_conn.username, db_conn.password, db_conn.database_name, db_conn.description )) self.connection.commit() return True except Error as e: print(f"添加数据库时出错: {e}") return False finally: cursor.close() def update_database(self, db_conn: DatabaseConnection) -> bool: """更新元数据数据库中的现有数据库连接""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: cursor.execute(""" UPDATE database_connections SET name=%s, host=%s, port=%s, username=%s, password=%s, database_name=%s, description=%s, updated_at=CURRENT_TIMESTAMP WHERE id=%s """, ( db_conn.name, db_conn.host, db_conn.port, db_conn.username, db_conn.password, db_conn.database_name, db_conn.description, db_conn.id )) self.connection.commit() return True except Error as e: print(f"更新数据库时出错: {e}") return False finally: cursor.close() def delete_database(self, database_id: int) -> bool: """从元数据数据库中删除数据库连接""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: cursor.execute("DELETE FROM database_connections WHERE id = %s", (database_id,)) self.connection.commit() return cursor.rowcount > 0 except Error as e: print(f"删除数据库时出错: {e}") return False finally: cursor.close() def add_table_metadata(self, table_meta: TableMetadata) -> bool: """向table_metadata表添加表元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: # Always store empty string for table_schema to avoid JSON issues table_schema = "" cursor.execute(""" INSERT INTO table_metadata (database_id, table_name, table_schema, row_count, description) VALUES (%s, %s, %s, %s, %s) """, ( table_meta.database_id, table_meta.table_name, table_schema, # Always use empty string table_meta.row_count, table_meta.description )) self.connection.commit() return True except Error as e: print(f"Error adding table metadata: {e}") return False finally: cursor.close() def get_table_metadata(self, database_id: int, table_name: str) -> Optional[TableMetadata]: """获取特定表的表元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(""" SELECT id, database_id, table_name, table_schema, row_count, description, created_at, updated_at FROM table_metadata WHERE database_id = %s AND table_name = %s """, (database_id, table_name)) row = cursor.fetchone() if row: # Ensure table_schema is valid or empty table_schema = row['table_schema'] if row['table_schema'] else "" # Validate JSON if it's not empty if table_schema and table_schema.strip(): try: json.loads(table_schema) except (json.JSONDecodeError, TypeError): # If it's not valid JSON, use empty string table_schema = "" return TableMetadata( id=row['id'], database_id=row['database_id'], table_name=row['table_name'], table_schema=table_schema, # Use validated schema row_count=row['row_count'], description=row['description'], created_at=row['created_at'], updated_at=row['updated_at'] ) return None finally: cursor.close() def update_table_metadata(self, table_meta: TableMetadata) -> bool: """更新现有的表元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: # Always store empty string for table_schema to avoid JSON issues table_schema = "" cursor.execute(""" UPDATE table_metadata SET table_schema=%s, row_count=%s, description=%s, updated_at=CURRENT_TIMESTAMP WHERE id=%s """, ( table_schema, # Always use empty string table_meta.row_count, table_meta.description, table_meta.id )) self.connection.commit() return True except Error as e: print(f"Error updating table metadata: {e}") return False finally: cursor.close() def upsert_table_metadata(self, table_meta: TableMetadata) -> bool: """ 插入或更新表元数据。 如果表的元数据已存在,则更新它;否则插入新记录。 """ if not self.connection or not self.connection.is_connected(): raise Exception("Not connected to metadata database") # Check if table metadata already exists existing_meta = self.get_table_metadata(table_meta.database_id, table_meta.table_name) if existing_meta: # Update existing metadata table_meta.id = existing_meta.id return self.update_table_metadata(table_meta) else: # Insert new metadata return self.add_table_metadata(table_meta) def add_column_metadata(self, column_meta: ColumnMetadata) -> bool: """向column_metadata表添加列元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: cursor.execute(""" INSERT INTO column_metadata (table_id, column_name, data_type, is_nullable, column_key, column_comment) VALUES (%s, %s, %s, %s, %s, %s) """, ( column_meta.table_id, column_meta.column_name or "", # Ensure non-None value column_meta.data_type or "", # Ensure non-None value column_meta.is_nullable, column_meta.column_key or "", # Ensure non-None value column_meta.column_comment # Can be None )) self.connection.commit() return True except Error as e: print(f"Error adding column metadata: {e}") return False finally: cursor.close() def get_column_metadata_by_table(self, table_id: int) -> List[ColumnMetadata]: """获取特定表的所有列元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(""" SELECT id, table_id, column_name, data_type, is_nullable, column_key, column_comment, created_at, updated_at FROM column_metadata WHERE table_id = %s ORDER BY id """, (table_id,)) rows = cursor.fetchall() columns = [] for row in rows: col_meta = ColumnMetadata( id=row['id'], table_id=row['table_id'], column_name=row['column_name'], data_type=row['data_type'], is_nullable=row['is_nullable'], column_key=row['column_key'], column_comment=row['column_comment'], created_at=row['created_at'], updated_at=row['updated_at'] ) columns.append(col_meta) return columns finally: cursor.close() def clear_column_metadata_by_table(self, table_id: int) -> bool: """清除特定表的所有列元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: cursor.execute("DELETE FROM column_metadata WHERE table_id = %s", (table_id,)) self.connection.commit() return True except Error as e: print(f"Error clearing column metadata: {e}") return False finally: cursor.close() def upsert_column_metadata(self, column_meta: ColumnMetadata) -> bool: """插入或更新列元数据""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到元数据数据库") cursor = self.connection.cursor() try: # Check if column metadata already exists cursor.execute(""" SELECT id FROM column_metadata WHERE table_id = %s AND column_name = %s """, (column_meta.table_id, column_meta.column_name)) row = cursor.fetchone() if row: # Update existing column metadata cursor.execute(""" UPDATE column_metadata SET data_type=%s, is_nullable=%s, column_key=%s, column_comment=%s, updated_at=CURRENT_TIMESTAMP WHERE id=%s """, ( column_meta.data_type or "", # Ensure non-None value column_meta.is_nullable, column_meta.column_key or "", # Ensure non-None value column_meta.column_comment, # Can be None row['id'] )) else: # Insert new column metadata cursor.execute(""" INSERT INTO column_metadata (table_id, column_name, data_type, is_nullable, column_key, column_comment) VALUES (%s, %s, %s, %s, %s, %s) """, ( column_meta.table_id, column_meta.column_name or "", # Ensure non-None value column_meta.data_type or "", # Ensure non-None value column_meta.is_nullable, column_meta.column_key or "", # Ensure non-None value column_meta.column_comment # Can be None )) self.connection.commit() return True except Error as e: print(f"Error upserting column metadata: {e}") return False finally: cursor.close() class TargetDBManager: """管理与目标数据库的连接和操作""" def __init__(self, db_connection: DatabaseConnection): self.db_connection = db_connection self.connection = None def connect(self) -> bool: """连接到目标数据库""" try: self.connection = mysql.connector.connect( host=self.db_connection.host, port=self.db_connection.port, user=self.db_connection.username, password=self.db_connection.password, database=self.db_connection.database_name, charset='utf8mb4', collation='utf8mb4_unicode_ci', use_unicode=True ) return True except Error as e: print(f"连接目标数据库时出错: {e}") return False def disconnect(self): """Disconnect from target database""" if self.connection and self.connection.is_connected(): self.connection.close() def execute_query(self, query: str) -> List[Dict[str, Any]]: """执行SQL查询并返回结果""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到目标数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(query) if query.strip().upper().startswith('SELECT'): return cursor.fetchall() else: self.connection.commit() return [] except Exception as e: print(f"执行查询时出错: {e}") raise finally: cursor.close() def get_tables(self) -> List[str]: """获取数据库中的表列表""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到目标数据库") cursor = self.connection.cursor() try: cursor.execute("SHOW TABLES") return [row[0] for row in cursor.fetchall()] finally: cursor.close() def get_table_schema(self, table_name: str) -> Dict[str, Any]: """获取表的模式信息""" if not self.connection or not self.connection.is_connected(): raise Exception("未连接到目标数据库") cursor = self.connection.cursor(dictionary=True) try: cursor.execute(f"DESCRIBE `{table_name}`") columns = cursor.fetchall() # Get table creation statement for more detailed info cursor.execute(f"SHOW CREATE TABLE `{table_name}`") create_statement = cursor.fetchone() return { 'columns': columns, 'create_statement': create_statement.get('Create Table') if create_statement else None } finally: cursor.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ideaxy/mysql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server