AutoCAD MCP Server
by zh19980811
Verified
from sqlalchemy import create_engine, MetaData, Table, Column, inspect
from sqlalchemy.exc import SQLAlchemyError
class DatabaseManager:
def __init__(self, connection_string):
"""初始化数据库连接管理器"""
self.connection_string = connection_string
self.engine = None
self.metadata = None
self.inspector = None
def connect(self):
"""连接到数据库"""
try:
self.engine = create_engine(self.connection_string)
self.metadata = MetaData()
self.metadata.reflect(bind=self.engine)
self.inspector = inspect(self.engine)
return True
except Exception as e:
print(f"数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.engine:
self.engine.dispose()
def get_all_tables(self):
"""获取所有表名"""
try:
return self.inspector.get_table_names()
except SQLAlchemyError as e:
return f"获取表列表失败: {str(e)}"
def get_table_schema(self, table_name):
"""获取指定表的结构信息"""
try:
if table_name not in self.metadata.tables:
return f"表 '{table_name}' 不存在"
columns = []
for column in self.inspector.get_columns(table_name):
columns.append({
"name": column['name'],
"type": str(column['type']),
"nullable": column['nullable'],
"default": str(column['default']) if column['default'] else None
})
# 获取主键
primary_keys = self.inspector.get_pk_constraint(table_name)
# 获取外键
foreign_keys = []
for fk in self.inspector.get_foreign_keys(table_name):
foreign_keys.append({
"name": fk['name'],
"referred_table": fk['referred_table'],
"referred_columns": fk['referred_columns'],
"constrained_columns": fk['constrained_columns']
})
# 获取索引
indices = []
for index in self.inspector.get_indexes(table_name):
indices.append({
"name": index['name'],
"columns": index['column_names'],
"unique": index['unique']
})
return {
"table_name": table_name,
"columns": columns,
"primary_key": primary_keys,
"foreign_keys": foreign_keys,
"indices": indices
}
except SQLAlchemyError as e:
return f"获取表 '{table_name}' 结构失败: {str(e)}"
def execute_query(self, query, params=None):
"""执行自定义查询"""
try:
with self.engine.connect() as connection:
if params:
result = connection.execute(query, params)
else:
result = connection.execute(query)
# 检查是否是SELECT查询
if result.returns_rows:
columns = result.keys()
rows = [dict(zip(columns, row)) for row in result]
return {"columns": columns, "rows": rows}
else:
return {"affected_rows": result.rowcount}
except SQLAlchemyError as e:
return f"执行查询失败: {str(e)}"