db_manager.py•20.2 kB
"""
MySQL MCP的数据库管理器
处理与元数据数据库和目标数据库的连接
"""
import mysql.connector
from mysql.connector import Error
import configparser
from typing import List, Dict, Any, Optional
from database_models import DatabaseConnection, TableMetadata, ColumnMetadata, QueryHistory
import json
class MetadataDBManager:
"""管理与元数据数据库的连接和操作"""
def __init__(self, config_file='config.ini'):
self.config_file = config_file
self.connection = None
self.config = self._load_config()
def _load_config(self) -> configparser.ConfigParser:
"""Load configuration from file"""
config = configparser.ConfigParser()
config.read(self.config_file)
return config
def connect(self):
"""连接到元数据数据库"""
try:
self.connection = mysql.connector.connect(
host=self.config['metadata_db']['host'],
port=self.config.getint('metadata_db', 'port'),
user=self.config['metadata_db']['user'],
password=self.config['metadata_db']['password'],
database=self.config['metadata_db']['database'],
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
use_unicode=True
)
return True
except Error as e:
print(f"连接元数据数据库时出错: {e}")
return False
def disconnect(self):
"""Disconnect from metadata database"""
if self.connection and self.connection.is_connected():
self.connection.close()
def get_all_databases(self) -> List[DatabaseConnection]:
"""从元数据数据库检索所有数据库连接"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute("""
SELECT id, name, host, port, username, password, database_name, description,
created_at, updated_at
FROM database_connections
ORDER BY name
""")
rows = cursor.fetchall()
databases = []
for row in rows:
db_conn = DatabaseConnection(
id=row['id'],
name=row['name'],
host=row['host'],
port=row['port'],
username=row['username'],
password=row['password'],
database_name=row['database_name'],
description=row['description'],
created_at=row['created_at'],
updated_at=row['updated_at']
)
databases.append(db_conn)
return databases
finally:
cursor.close()
def get_database_by_id(self, database_id: int) -> Optional[DatabaseConnection]:
"""根据ID检索特定的数据库连接"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute("""
SELECT id, name, host, port, username, password, database_name, description,
created_at, updated_at
FROM database_connections
WHERE id = %s
""", (database_id,))
row = cursor.fetchone()
if row:
return DatabaseConnection(
id=row['id'],
name=row['name'],
host=row['host'],
port=row['port'],
username=row['username'],
password=row['password'],
database_name=row['database_name'],
description=row['description'],
created_at=row['created_at'],
updated_at=row['updated_at']
)
return None
finally:
cursor.close()
def add_database(self, db_conn: DatabaseConnection) -> bool:
"""添加新的数据库连接到元数据数据库"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
cursor.execute("""
INSERT INTO database_connections
(name, host, port, username, password, database_name, description)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
db_conn.name,
db_conn.host,
db_conn.port,
db_conn.username,
db_conn.password,
db_conn.database_name,
db_conn.description
))
self.connection.commit()
return True
except Error as e:
print(f"添加数据库时出错: {e}")
return False
finally:
cursor.close()
def update_database(self, db_conn: DatabaseConnection) -> bool:
"""更新元数据数据库中的现有数据库连接"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
cursor.execute("""
UPDATE database_connections SET
name=%s, host=%s, port=%s, username=%s, password=%s, database_name=%s, description=%s,
updated_at=CURRENT_TIMESTAMP
WHERE id=%s
""", (
db_conn.name,
db_conn.host,
db_conn.port,
db_conn.username,
db_conn.password,
db_conn.database_name,
db_conn.description,
db_conn.id
))
self.connection.commit()
return True
except Error as e:
print(f"更新数据库时出错: {e}")
return False
finally:
cursor.close()
def delete_database(self, database_id: int) -> bool:
"""从元数据数据库中删除数据库连接"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
cursor.execute("DELETE FROM database_connections WHERE id = %s", (database_id,))
self.connection.commit()
return cursor.rowcount > 0
except Error as e:
print(f"删除数据库时出错: {e}")
return False
finally:
cursor.close()
def add_table_metadata(self, table_meta: TableMetadata) -> bool:
"""向table_metadata表添加表元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
# Always store empty string for table_schema to avoid JSON issues
table_schema = ""
cursor.execute("""
INSERT INTO table_metadata
(database_id, table_name, table_schema, row_count, description)
VALUES (%s, %s, %s, %s, %s)
""", (
table_meta.database_id,
table_meta.table_name,
table_schema, # Always use empty string
table_meta.row_count,
table_meta.description
))
self.connection.commit()
return True
except Error as e:
print(f"Error adding table metadata: {e}")
return False
finally:
cursor.close()
def get_table_metadata(self, database_id: int, table_name: str) -> Optional[TableMetadata]:
"""获取特定表的表元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute("""
SELECT id, database_id, table_name, table_schema, row_count, description,
created_at, updated_at
FROM table_metadata
WHERE database_id = %s AND table_name = %s
""", (database_id, table_name))
row = cursor.fetchone()
if row:
# Ensure table_schema is valid or empty
table_schema = row['table_schema'] if row['table_schema'] else ""
# Validate JSON if it's not empty
if table_schema and table_schema.strip():
try:
json.loads(table_schema)
except (json.JSONDecodeError, TypeError):
# If it's not valid JSON, use empty string
table_schema = ""
return TableMetadata(
id=row['id'],
database_id=row['database_id'],
table_name=row['table_name'],
table_schema=table_schema, # Use validated schema
row_count=row['row_count'],
description=row['description'],
created_at=row['created_at'],
updated_at=row['updated_at']
)
return None
finally:
cursor.close()
def update_table_metadata(self, table_meta: TableMetadata) -> bool:
"""更新现有的表元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
# Always store empty string for table_schema to avoid JSON issues
table_schema = ""
cursor.execute("""
UPDATE table_metadata SET
table_schema=%s, row_count=%s, description=%s, updated_at=CURRENT_TIMESTAMP
WHERE id=%s
""", (
table_schema, # Always use empty string
table_meta.row_count,
table_meta.description,
table_meta.id
))
self.connection.commit()
return True
except Error as e:
print(f"Error updating table metadata: {e}")
return False
finally:
cursor.close()
def upsert_table_metadata(self, table_meta: TableMetadata) -> bool:
"""
插入或更新表元数据。
如果表的元数据已存在,则更新它;否则插入新记录。
"""
if not self.connection or not self.connection.is_connected():
raise Exception("Not connected to metadata database")
# Check if table metadata already exists
existing_meta = self.get_table_metadata(table_meta.database_id, table_meta.table_name)
if existing_meta:
# Update existing metadata
table_meta.id = existing_meta.id
return self.update_table_metadata(table_meta)
else:
# Insert new metadata
return self.add_table_metadata(table_meta)
def add_column_metadata(self, column_meta: ColumnMetadata) -> bool:
"""向column_metadata表添加列元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
cursor.execute("""
INSERT INTO column_metadata
(table_id, column_name, data_type, is_nullable, column_key, column_comment)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
column_meta.table_id,
column_meta.column_name or "", # Ensure non-None value
column_meta.data_type or "", # Ensure non-None value
column_meta.is_nullable,
column_meta.column_key or "", # Ensure non-None value
column_meta.column_comment # Can be None
))
self.connection.commit()
return True
except Error as e:
print(f"Error adding column metadata: {e}")
return False
finally:
cursor.close()
def get_column_metadata_by_table(self, table_id: int) -> List[ColumnMetadata]:
"""获取特定表的所有列元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute("""
SELECT id, table_id, column_name, data_type, is_nullable, column_key, column_comment,
created_at, updated_at
FROM column_metadata
WHERE table_id = %s
ORDER BY id
""", (table_id,))
rows = cursor.fetchall()
columns = []
for row in rows:
col_meta = ColumnMetadata(
id=row['id'],
table_id=row['table_id'],
column_name=row['column_name'],
data_type=row['data_type'],
is_nullable=row['is_nullable'],
column_key=row['column_key'],
column_comment=row['column_comment'],
created_at=row['created_at'],
updated_at=row['updated_at']
)
columns.append(col_meta)
return columns
finally:
cursor.close()
def clear_column_metadata_by_table(self, table_id: int) -> bool:
"""清除特定表的所有列元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
cursor.execute("DELETE FROM column_metadata WHERE table_id = %s", (table_id,))
self.connection.commit()
return True
except Error as e:
print(f"Error clearing column metadata: {e}")
return False
finally:
cursor.close()
def upsert_column_metadata(self, column_meta: ColumnMetadata) -> bool:
"""插入或更新列元数据"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到元数据数据库")
cursor = self.connection.cursor()
try:
# Check if column metadata already exists
cursor.execute("""
SELECT id FROM column_metadata
WHERE table_id = %s AND column_name = %s
""", (column_meta.table_id, column_meta.column_name))
row = cursor.fetchone()
if row:
# Update existing column metadata
cursor.execute("""
UPDATE column_metadata SET
data_type=%s, is_nullable=%s, column_key=%s, column_comment=%s, updated_at=CURRENT_TIMESTAMP
WHERE id=%s
""", (
column_meta.data_type or "", # Ensure non-None value
column_meta.is_nullable,
column_meta.column_key or "", # Ensure non-None value
column_meta.column_comment, # Can be None
row['id']
))
else:
# Insert new column metadata
cursor.execute("""
INSERT INTO column_metadata
(table_id, column_name, data_type, is_nullable, column_key, column_comment)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
column_meta.table_id,
column_meta.column_name or "", # Ensure non-None value
column_meta.data_type or "", # Ensure non-None value
column_meta.is_nullable,
column_meta.column_key or "", # Ensure non-None value
column_meta.column_comment # Can be None
))
self.connection.commit()
return True
except Error as e:
print(f"Error upserting column metadata: {e}")
return False
finally:
cursor.close()
class TargetDBManager:
"""管理与目标数据库的连接和操作"""
def __init__(self, db_connection: DatabaseConnection):
self.db_connection = db_connection
self.connection = None
def connect(self) -> bool:
"""连接到目标数据库"""
try:
self.connection = mysql.connector.connect(
host=self.db_connection.host,
port=self.db_connection.port,
user=self.db_connection.username,
password=self.db_connection.password,
database=self.db_connection.database_name,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
use_unicode=True
)
return True
except Error as e:
print(f"连接目标数据库时出错: {e}")
return False
def disconnect(self):
"""Disconnect from target database"""
if self.connection and self.connection.is_connected():
self.connection.close()
def execute_query(self, query: str) -> List[Dict[str, Any]]:
"""执行SQL查询并返回结果"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到目标数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute(query)
if query.strip().upper().startswith('SELECT'):
return cursor.fetchall()
else:
self.connection.commit()
return []
except Exception as e:
print(f"执行查询时出错: {e}")
raise
finally:
cursor.close()
def get_tables(self) -> List[str]:
"""获取数据库中的表列表"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到目标数据库")
cursor = self.connection.cursor()
try:
cursor.execute("SHOW TABLES")
return [row[0] for row in cursor.fetchall()]
finally:
cursor.close()
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""获取表的模式信息"""
if not self.connection or not self.connection.is_connected():
raise Exception("未连接到目标数据库")
cursor = self.connection.cursor(dictionary=True)
try:
cursor.execute(f"DESCRIBE `{table_name}`")
columns = cursor.fetchall()
# Get table creation statement for more detailed info
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
create_statement = cursor.fetchone()
return {
'columns': columns,
'create_statement': create_statement.get('Create Table') if create_statement else None
}
finally:
cursor.close()