Skip to main content
Glama
jeweis

Jewei MSSQL MCP Server

by jeweis
core.py17.2 kB
# core.py """ 核心模块,包含数据库连接和核心功能 """ import json from typing import Dict, List, Optional, Union, Any from sqlalchemy import create_engine, text, MetaData, Table, Column from sqlalchemy.exc import SQLAlchemyError from .app_config import config # 数据库连接管理 engine = None def get_db_connection(): """获取数据库连接,如果不存在则创建新连接""" global engine if engine is None: try: print("正在创建数据库连接...") print(f"连接到: {config.DB_HOST}:{config.DB_PORT}, 数据库: {config.DB_NAME}, 用户: {config.DB_USER}") print(f"连接字符串: {config.CONNECTION_STRING}") # 创建引擎时设置连接池选项 engine = create_engine( config.CONNECTION_STRING, pool_pre_ping=True, # 检查连接是否有效 pool_recycle=3600, # 每小时回收连接 connect_args={ 'timeout': 30 # 连接超时时间(秒) } ) # 测试连接 with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() print(f"测试连接结果: {result}") print("数据库连接创建成功") except SQLAlchemyError as e: error_msg = f"数据库连接失败: {str(e)}" print(error_msg) # 尝试获取更详细的错误信息 if hasattr(e, 'orig') and e.orig: print(f"原始错误: {e.orig}") if hasattr(e.orig, 'args') and e.orig.args: print(f"错误参数: {e.orig.args}") raise Exception(error_msg) return engine def execute_query(sql: str) -> Dict[str, Any]: """执行SQL查询并返回结果集 Args: sql: SQL查询语句(必须是SELECT语句) Returns: 包含查询结果的字典 """ try: print(f"执行SQL查询: {sql[:100]}{'...' if len(sql) > 100 else ''}") # 安全检查:确保只执行SELECT语句 sql_lower = sql.lower().strip() if not sql_lower.startswith("select"): error_msg = "安全限制:只允许执行SELECT语句" print(f"{error_msg}, SQL: {sql}") return { "error": error_msg, "sql": sql } # 检查是否包含危险操作 dangerous_keywords = ["insert", "update", "delete", "drop", "alter", "create", "truncate", "exec", "execute"] for keyword in dangerous_keywords: if f" {keyword} " in f" {sql_lower} ": error_msg = f"安全限制:查询中包含禁止的关键字 '{keyword}'" print(f"{error_msg}, SQL: {sql}") return { "error": error_msg, "sql": sql } engine = get_db_connection() with engine.connect() as conn: result = conn.execute(text(sql)) # 获取列名 columns = list(result.keys()) # 转换结果为字典列表 result_rows = [] for row in result: try: # 尝试使用字典推导式创建字典 row_dict = {col: row[i] for i, col in enumerate(columns)} result_rows.append(row_dict) except Exception as row_err: print(f"处理行数据时出错: {row_err}") # 如果出错,尝试使用其他方式 try: # 尝试直接将行转换为字典 row_dict = {} for i, col in enumerate(columns): try: row_dict[col] = row[i] except: row_dict[col] = None result_rows.append(row_dict) except Exception as e: print(f"处理行数据的备选方法也失败: {e}") # 最后的备选方案,只添加原始值 result_rows.append({"value": str(row)}) print(f"查询成功,返回 {len(result_rows)} 条记录") return { "columns": columns, "rows": result_rows, "row_count": len(result_rows) } except Exception as e: error_msg = f"查询执行失败: {str(e)}" print(f"{error_msg}, SQL: {sql}") return { "error": str(e), "sql": sql } def get_table_info(table_name: str, schema: str = "dbo") -> Dict[str, Any]: """获取指定表的结构信息 Args: table_name: 表名 schema: 架构名,默认为dbo Returns: 包含表结构信息的字典 """ try: print(f"获取表结构信息: {schema}.{table_name}") engine = get_db_connection() # 查询列信息 columns_sql = f""" SELECT c.name AS column_name, t.name AS data_type, c.max_length, c.precision, c.scale, c.is_nullable, CAST(ISNULL(CAST(ep.value AS NVARCHAR(MAX)), '') AS NVARCHAR(MAX)) AS description FROM sys.columns c JOIN sys.types t ON c.user_type_id = t.user_type_id JOIN sys.tables tb ON c.object_id = tb.object_id JOIN sys.schemas s ON tb.schema_id = s.schema_id LEFT JOIN sys.extended_properties ep ON c.object_id = ep.major_id AND c.column_id = ep.minor_id AND ep.name = 'MS_Description' WHERE tb.name = '{table_name}' AND s.name = '{schema}' ORDER BY c.column_id """ # 查询主键信息 primary_keys_sql = f""" SELECT c.name AS column_name FROM sys.indexes i JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id JOIN sys.tables t ON i.object_id = t.object_id JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE i.is_primary_key = 1 AND t.name = '{table_name}' AND s.name = '{schema}' ORDER BY ic.key_ordinal """ # 查询外键信息 foreign_keys_sql = f""" SELECT fk.name AS fk_name, COL_NAME(fc.parent_object_id, fc.parent_column_id) AS column_name, OBJECT_NAME(fc.referenced_object_id) AS referenced_table, COL_NAME(fc.referenced_object_id, fc.referenced_column_id) AS referenced_column FROM sys.foreign_keys fk JOIN sys.foreign_key_columns fc ON fk.object_id = fc.constraint_object_id JOIN sys.tables t ON fk.parent_object_id = t.object_id JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.name = '{table_name}' AND s.name = '{schema}' ORDER BY fk.name, fc.constraint_column_id """ # 查询索引信息 indexes_sql = f""" SELECT i.name AS index_name, i.type_desc AS index_type, i.is_unique, i.is_primary_key, i.is_unique_constraint, STUFF(( SELECT ', ' + c2.name FROM sys.index_columns ic2 JOIN sys.columns c2 ON ic2.object_id = c2.object_id AND ic2.column_id = c2.column_id WHERE ic2.object_id = i.object_id AND ic2.index_id = i.index_id ORDER BY ic2.key_ordinal FOR XML PATH(''), TYPE ).value('.', 'NVARCHAR(MAX)'), 1, 2, '') AS columns FROM sys.indexes i JOIN sys.tables t ON i.object_id = t.object_id JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.name = '{table_name}' AND s.name = '{schema}' GROUP BY i.object_id, i.index_id, i.name, i.type_desc, i.is_unique, i.is_primary_key, i.is_unique_constraint ORDER BY i.name """ # Execute and process columns_sql columns = [] with engine.connect() as conn: columns_result = conn.execute(text(columns_sql)) # 处理列信息 for row in columns_result: column = { "name": row.column_name, "type": row.data_type, "max_length": row.max_length, "precision": row.precision, "scale": row.scale, "is_nullable": bool(row.is_nullable), "description": row.description } columns.append(column) # Execute and process primary_keys_sql primary_keys = [] with engine.connect() as conn: primary_keys_result = conn.execute(text(primary_keys_sql)) # 处理主键信息 primary_keys = [row.column_name for row in primary_keys_result] # Execute and process foreign_keys_sql foreign_keys = [] with engine.connect() as conn: foreign_keys_result = conn.execute(text(foreign_keys_sql)) # 处理外键信息 for row in foreign_keys_result: fk = { "name": row.fk_name, "column": row.column_name, "referenced_table": row.referenced_table, "referenced_column": row.referenced_column } foreign_keys.append(fk) # Execute and process indexes_sql indexes = [] with engine.connect() as conn: indexes_result = conn.execute(text(indexes_sql)) # 处理索引信息 for row in indexes_result: index = { "name": row.index_name, "type": row.index_type, "is_unique": bool(row.is_unique), "is_primary_key": bool(row.is_primary_key), "is_unique_constraint": bool(row.is_unique_constraint), "columns": row.columns.split(", ") if row.columns else [] } indexes.append(index) return { "columns": columns, "primary_keys": primary_keys, "foreign_keys": foreign_keys, "indexes": indexes } except Exception as e: error_msg = f"获取表结构失败: {str(e)}" print(error_msg) return { "error": str(e) } def list_show_tables(schema: str = "dbo") -> Dict[str, Any]: """列出数据库中的所有表 Args: schema: 架构名,默认为dbo Returns: 包含表列表的字典 """ try: print(f"列出架构 '{schema}' 中的所有表") engine = get_db_connection() # 修改SQL查询,避免使用可能导致类型不兼容的字段 # 使用CAST将可能有问题的字段转换为兼容的类型 sql = f""" SELECT t.name AS table_name, CAST(ISNULL(CAST(ep.value AS NVARCHAR(MAX)), '') AS NVARCHAR(MAX)) AS description, s.name AS schema_name FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id LEFT JOIN sys.extended_properties ep ON t.object_id = ep.major_id AND ep.minor_id = 0 AND ep.name = 'MS_Description' WHERE s.name = '{schema}' ORDER BY t.name """ # 如果上面的查询仍然不起作用,尝试使用更简单的查询 simple_sql = f""" SELECT t.name AS table_name, s.name AS schema_name FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = '{schema}' ORDER BY t.name """ try: with engine.connect() as conn: print("尝试执行带有表描述的查询...") result = conn.execute(text(sql)) col_names = list(result.keys()) tables = [] for row in result: try: # 安全地将行转换为字典 row_dict = {} for i, col in enumerate(col_names): try: # 尝试将每个值转换为字符串,避免类型问题 value = row[i] if value is not None: row_dict[col] = str(value) else: row_dict[col] = "" except Exception as val_err: print(f"处理列 {col} 的值时出错: {val_err}") row_dict[col] = "" tables.append(row_dict) except Exception as e: print(f"处理表信息时出错: {e}") tables.append({"table_name": str(row[0]) if row and len(row) > 0 else "unknown"}) except Exception as complex_query_error: print(f"复杂查询失败,尝试简单查询: {complex_query_error}") # 如果复杂查询失败,尝试简单查询 with engine.connect() as conn: print("执行简化的表查询...") result = conn.execute(text(simple_sql)) col_names = list(result.keys()) tables = [] for row in result: try: row_dict = {} for i, col in enumerate(col_names): try: value = row[i] row_dict[col] = str(value) if value is not None else "" except: row_dict[col] = "" # 添加空的描述字段 if "description" not in row_dict: row_dict["description"] = "" tables.append(row_dict) except Exception as e: print(f"处理简化表信息时出错: {e}") tables.append({"table_name": str(row[0]) if row and len(row) > 0 else "unknown"}) print(f"成功获取 {len(tables)} 个表") return { "tables": tables, "count": len(tables) } except Exception as e: error_msg = f"列出表失败: {str(e)}" print(f"{error_msg}, 架构: {schema}") return { "error": str(e), "schema": schema } def get_database_info() -> Dict[str, Any]: """获取数据库基本信息 Returns: 包含数据库信息的字典 """ try: print("获取数据库基本信息") engine = get_db_connection() # 获取数据库版本信息 version_sql = "SELECT @@VERSION AS version" # 获取数据库名称 db_name_sql = "SELECT DB_NAME() AS database_name" # 获取架构信息 schema_sql = "SELECT name AS schema_name FROM sys.schemas ORDER BY name" with engine.connect() as conn: # 获取版本信息 version_result = conn.execute(text(version_sql)).fetchone() version_info = version_result[0] if version_result else None # 获取数据库名称 db_name_result = conn.execute(text(db_name_sql)).fetchone() database_name = db_name_result[0] if db_name_result else None # 获取架构信息 schema_result = conn.execute(text(schema_sql)) schemas = [row[0] for row in schema_result] print(f"成功获取数据库信息: {database_name}") return { "database_name": database_name, "version": version_info, "schemas": schemas, "connection": { "host": config.DB_HOST, "port": config.DB_PORT, "database": config.DB_NAME, "user": config.DB_USER } } except Exception as e: error_msg = f"获取数据库信息失败: {str(e)}" print(error_msg) return { "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jeweis/jewei-mssql-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server