Skip to main content
Glama

QueryNest

by niuzaishu
connection_manager.py14.9 kB
# -*- coding: utf-8 -*- """多实例MongoDB连接管理器""" import asyncio from typing import Dict, List, Optional, Any from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError import structlog from datetime import datetime, timedelta from config import QueryNestConfig, MongoInstanceConfig logger = structlog.get_logger(__name__) class InstanceConnection: """单个MongoDB实例连接""" def __init__(self, config: MongoInstanceConfig): self.config = config self.client: Optional[AsyncIOMotorClient] = None self.last_health_check: Optional[datetime] = None self.is_healthy = False self._lock = asyncio.Lock() self._connection_stats = { 'total_connections': 0, 'active_connections': 0, 'failed_connections': 0, 'last_connection_time': None } def _get_optimal_pool_config(self) -> dict: """获取最优连接池配置""" # 根据环境和负载动态调整连接池大小 if self.config.environment == "production": return { "maxPoolSize": 20, "minPoolSize": 5, "maxIdleTimeMS": 30000, "waitQueueTimeoutMS": 10000 } elif self.config.environment == "staging": return { "maxPoolSize": 10, "minPoolSize": 2, "maxIdleTimeMS": 60000, "waitQueueTimeoutMS": 5000 } else: # development return { "maxPoolSize": 5, "minPoolSize": 1, "maxIdleTimeMS": 120000, "waitQueueTimeoutMS": 3000 } async def connect(self) -> bool: """连接到MongoDB实例""" async with self._lock: if self.client is not None: return True try: # 动态连接池配置 pool_config = self._get_optimal_pool_config() self.client = AsyncIOMotorClient( self.config.connection_string, serverSelectionTimeoutMS=5000, connectTimeoutMS=5000, **pool_config ) # 测试连接 await self.client.admin.command('ping') self.is_healthy = True self.last_health_check = datetime.now() # 更新连接统计 self._connection_stats['total_connections'] += 1 self._connection_stats['active_connections'] += 1 self._connection_stats['last_connection_time'] = datetime.now() logger.info( "MongoDB实例连接成功", instance_name=self.config.name, environment=self.config.environment, pool_config=pool_config ) return True except (ConnectionFailure, ServerSelectionTimeoutError) as e: self.is_healthy = False self._connection_stats['failed_connections'] += 1 logger.error( "MongoDB实例连接失败", instance_name=self.config.name, error=str(e) ) return False except Exception as e: self.is_healthy = False self._connection_stats['failed_connections'] += 1 logger.error( "MongoDB实例连接异常", instance_name=self.config.name, error=str(e) ) return False async def disconnect(self): """断开连接""" async with self._lock: if self.client: self.client.close() self.client = None self.is_healthy = False # 更新连接统计 if self._connection_stats['active_connections'] > 0: self._connection_stats['active_connections'] -= 1 logger.info("MongoDB实例连接已断开", instance_name=self.config.name) async def health_check(self) -> bool: """健康检查""" try: if self.client is None: return False await self.client.admin.command('ping') self.is_healthy = True self.last_health_check = datetime.now() return True except Exception as e: self.is_healthy = False logger.warning( "MongoDB实例健康检查失败", instance_name=self.config.name, error=str(e) ) return False def get_database(self, db_name: str) -> Optional[AsyncIOMotorDatabase]: """获取数据库连接(支持连接复用)""" if not self.is_healthy or self.client is None: return None # 连接复用:直接返回已存在的客户端数据库引用 database = self.client[db_name] # 记录数据库访问(可用于后续优化) if not hasattr(self, '_db_access_count'): self._db_access_count = {} self._db_access_count[db_name] = self._db_access_count.get(db_name, 0) + 1 return database def needs_health_check(self, interval_minutes: int = 5) -> bool: """检查是否需要健康检查""" if self.last_health_check is None: return True return datetime.now() - self.last_health_check > timedelta(minutes=interval_minutes) def get_connection_stats(self) -> dict: """获取连接统计信息""" stats = self._connection_stats.copy() stats['db_access_count'] = getattr(self, '_db_access_count', {}) stats['pool_config'] = self._get_optimal_pool_config() return stats class ConnectionManager: """多实例MongoDB连接管理器""" def __init__(self, config: QueryNestConfig): self.config = config self.connections: Dict[str, InstanceConnection] = {} # 移除单独的元数据客户端,每个实例都管理自己的元数据库 self._health_check_task: Optional[asyncio.Task] = None self._shutdown = False async def initialize(self) -> bool: """初始化所有连接""" logger.info("开始初始化MongoDB连接管理器") # 初始化业务实例连接(不立即初始化元数据库) success_count = 0 for instance_key, instance_config in self.config.mongo_instances.items(): if instance_config.status != "active": continue connection = InstanceConnection(instance_config) if await connection.connect(): self.connections[instance_key] = connection # 使用配置键名而非name字段 success_count += 1 else: logger.warning( "实例连接失败,将跳过", instance_name=instance_config.name, instance_key=instance_key ) if success_count == 0: logger.error("没有成功连接的MongoDB实例") return False # 启动健康检查任务 self._health_check_task = asyncio.create_task(self._health_check_loop()) logger.info( "MongoDB连接管理器初始化完成", total_instances=len([i for i in self.config.mongo_instances.values() if i.status == "active"]), connected_instances=success_count ) return True async def init_instance_metadata_on_demand(self, instance_name: str) -> bool: """按需初始化实例元数据(基于文件存储)""" connection = self.get_instance_connection(instance_name) if connection is None: logger.error("实例连接不存在", instance=instance_name) return False try: # 基于文件存储,元数据初始化由FileMetadataManager处理 # 这里只需要确认实例连接正常即可 logger.info("实例元数据初始化成功(使用文件存储)", instance=instance_name) return True except Exception as e: logger.error("实例元数据初始化失败", instance=instance_name, error=str(e)) return False async def shutdown(self): """关闭所有连接""" logger.info("开始关闭MongoDB连接管理器") self._shutdown = True # 停止健康检查任务 if self._health_check_task: self._health_check_task.cancel() try: await self._health_check_task except asyncio.CancelledError: pass # 关闭所有实例连接 for connection in self.connections.values(): await connection.disconnect() self.connections.clear() # 元数据库连接已随各实例连接一起关闭 logger.info("MongoDB连接管理器已关闭") def get_instance_connection(self, instance_name: str) -> Optional[InstanceConnection]: """获取实例连接""" return self.connections.get(instance_name) def get_client(self, instance_name: str) -> Optional[AsyncIOMotorClient]: """获取MongoDB客户端 此方法作为兼容层,直接返回实例连接的client属性。 用于与期望直接获取client的代码兼容。 """ connection = self.get_instance_connection(instance_name) if connection: return connection.client return None def get_instance_database(self, instance_name: str, db_name: str) -> Optional[AsyncIOMotorDatabase]: """获取实例数据库连接""" connection = self.get_instance_connection(instance_name) if connection: return connection.get_database(db_name) return None def get_metadata_database(self, instance_name: str) -> Optional[str]: """获取指定实例的元数据存储路径(文件存储)""" # 返回实例名称,用于文件存储路径生成 connection = self.get_instance_connection(instance_name) if connection is None: return None return instance_name def get_available_instances(self) -> List[str]: """获取可用实例列表""" return [ name for name, connection in self.connections.items() if connection.is_healthy ] def get_instance_info(self, instance_name: str) -> Optional[Dict[str, Any]]: """获取实例信息""" connection = self.get_instance_connection(instance_name) if connection: return { "name": connection.config.name, "alias": connection.config.alias, "environment": connection.config.environment, "description": connection.config.description, "is_healthy": connection.is_healthy, "last_health_check": connection.last_health_check } return None def get_all_instances_info(self) -> List[Dict[str, Any]]: """获取所有实例信息""" instances_info = [] for instance_name in self.connections.keys(): info = self.get_instance_info(instance_name) if info: instances_info.append(info) return instances_info async def get_all_instances(self) -> Dict[str, Any]: """获取所有实例配置(用于MCP工具)""" instances = {} for instance_name, connection in self.connections.items(): instances[instance_name] = connection.config return instances def has_instance(self, instance_name: str) -> bool: """检查实例是否存在""" return instance_name in self.connections async def check_instance_health(self, instance_name: str) -> Dict[str, Any]: """检查实例健康状态""" connection = self.get_instance_connection(instance_name) if not connection: return { "healthy": False, "error": "实例连接不存在", "last_check": None } is_healthy = await connection.health_check() return { "healthy": is_healthy, "last_check": connection.last_health_check, "error": None if is_healthy else "连接检查失败" } async def _health_check_loop(self): """健康检查循环""" while not self._shutdown: try: # 检查所有实例连接 for connection in self.connections.values(): if connection.needs_health_check(): await connection.health_check() # 元数据库连接已随各实例连接一起管理,无需单独检查 # 等待下次检查 health_check_interval = self.config.connection_pool.health_check_interval await asyncio.sleep(health_check_interval) except asyncio.CancelledError: break except Exception as e: logger.error("健康检查循环异常", error=str(e)) health_check_interval = self.config.connection_pool.health_check_interval await asyncio.sleep(health_check_interval) async def validate_query_permissions(self, instance_name: str, operation: str) -> bool: """验证查询权限""" # 检查操作是否在允许列表中 if operation not in self.config.security.allowed_operations: logger.warning( "不允许的操作类型", instance_name=instance_name, operation=operation ) return False # 检查实例是否可用 connection = self.get_instance_connection(instance_name) if not connection or not connection.is_healthy: logger.warning( "实例不可用", instance_name=instance_name ) return False return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/niuzaishu/QueryNest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server