Skip to main content
Glama
server.py7.37 kB
"""SQLite MCP server implementation""" import json import sqlite3 from contextlib import closing from pathlib import Path from typing import Optional import mcp.types as types # 获取包信息用于日志命名 from ..base import LOG_NAME, ConnectionServer from ..log import create_logger from .config import SQLiteConfig class SQLiteServer(ConnectionServer): def __init__(self, config: SQLiteConfig, config_path: Optional[str] = None): """初始化 SQLite 服务器 Args: config: SQLite 配置 """ super().__init__(config_path, config.debug) self.config = config self.config_path = config_path self.log = create_logger(f"{LOG_NAME}.db.sqlite", config.debug) # 确保数据库目录存在 db_file = Path(self.config.absolute_path) db_file.parent.mkdir(parents=True, exist_ok=True) # 测试连接 try: self.log("debug", f"正在连接: {self.config.get_masked_connection_info()}") connection_params = self.config.get_connection_params() with closing(sqlite3.connect(**connection_params)) as conn: conn.row_factory = sqlite3.Row self.log("info", "连接测试成功") except sqlite3.Error as e: self.log("error", f"连接失败: {str(e)}") raise def _get_connection(self): """获取数据库连接""" connection_params = self.config.get_connection_params() conn = sqlite3.connect(**connection_params) conn.row_factory = sqlite3.Row return conn async def list_resources(self) -> list[types.Resource]: """列出所有表资源""" try: # 使用默认连接 conn = self._get_connection() with closing(conn) as _: cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ) tables = cursor.fetchall() return [ types.Resource( uri=f"sqlite://{table[0]}/schema", name=f"{table[0]} schema", mimeType="application/json" ) for table in tables ] except sqlite3.Error as e: error_msg = f"获取表列表失败: {str(e)}" self.log("error", error_msg) raise async def read_resource(self, uri: str) -> str: """读取表结构信息""" try: table_name = uri.split('/')[-2] with closing(self._get_connection()) as conn: # 获取表结构 cursor = conn.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() # 获取索引信息 cursor = conn.execute(f"PRAGMA index_list({table_name})") indexes = cursor.fetchall() schema_info = { 'columns': [{ 'name': col['name'], 'type': col['type'], 'nullable': not col['notnull'], 'primary_key': bool(col['pk']) } for col in columns], 'indexes': [{ 'name': idx['name'], 'unique': bool(idx['unique']) } for idx in indexes] } return json.dumps(schema_info) except sqlite3.Error as e: error_msg = f"读取表结构失败: {str(e)}" self.log("error", error_msg) raise def get_tools(self) -> list[types.Tool]: """获取可用工具列表""" return [ types.Tool( name="query", description="执行只读SQL查询", inputSchema={ "type": "object", "properties": { "connection": { "type": "string", "description": "数据库连接名称(可选)" }, "sql": { "type": "string", "description": "SQL查询语句(仅支持SELECT)" } }, "required": ["sql"] } ) ] async def call_tool(self, name: str, arguments: dict) -> list[types.TextContent]: """执行工具调用""" if name != "query": raise ValueError(f"未知工具: {name}") sql = arguments.get("sql", "").strip() if not sql: raise ValueError("SQL查询不能为空") # 仅允许SELECT语句 if not sql.lower().startswith("select"): raise ValueError("仅支持SELECT查询") conn = None connection = arguments.get("connection") config_name = connection if isinstance(connection, str) else 'default' results = [] columns = [] formatted_results = [] try: if connection and self.config_path: # 使用指定的数据库连接 config = SQLiteConfig.from_yaml(self.config_path, connection) connection_params = config.get_connection_params() masked_params = config.get_masked_connection_info() self.log("info", f"使用配置 {connection} 连接: {masked_params}") conn = sqlite3.connect(**connection_params) conn.row_factory = sqlite3.Row else: # 使用默认连接 conn = self._get_connection() with closing(conn) as _: self.log("info", f"执行查询: {sql}") cursor = conn.execute(sql) results = cursor.fetchall() columns = [desc[0] for desc in cursor.description] formatted_results = [dict(zip(columns, row)) for row in results] # 处理结果(在连接操作完成后) result_text = json.dumps({ 'type': 'sqlite', 'config_name': config_name, 'query_result': { 'columns': columns, 'rows': formatted_results, 'row_count': len(results) } }) self.log("info", f"查询完成,返回{len(results)}行结果") return [types.TextContent(type="text", text=result_text)] except sqlite3.Error as e: error_msg = json.dumps({ 'type': 'sqlite', 'config_name': config_name, 'error': f"查询执行失败: {str(e)}" }) self.log("error", error_msg) return [types.TextContent(type="text", text=error_msg)] finally: # 确保连接被正确关闭(如果不是使用with语句) if conn and not isinstance(connection, str): try: conn.close() except Exception as e: self.log("warning", f"关闭连接时出错: {str(e)}") async def cleanup(self): """清理资源""" # SQLite不需要特别的清理操作 pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donghao1393/mcp-dbutils'

If you have feedback or need assistance with the MCP directory API, please join our Discord server