Skip to main content
Glama

QueryNest

by niuzaishu
metadata_manager.py43.4 kB
# -*- coding: utf-8 -*- """元数据管理器""" from typing import Dict, List, Optional, Any from motor.motor_asyncio import AsyncIOMotorDatabase, AsyncIOMotorCollection from datetime import datetime import structlog import asyncio from bson import ObjectId from database.connection_manager import ConnectionManager from storage.local_semantic_storage import LocalSemanticStorage from storage.semantic_file_manager import SemanticFileManager from storage.config import get_config logger = structlog.get_logger(__name__) class MetadataManager: """元数据管理器""" def __init__(self, connection_manager: ConnectionManager): self.connection_manager = connection_manager # 每个实例都有自己的元数据库,不再使用单一的_db self._instance_collections: Dict[str, Dict[str, AsyncIOMotorCollection]] = {} self.metadata_cache: Dict[str, Dict] = {} self.last_scan_time: Dict[str, datetime] = {} self._lock = asyncio.Lock() self._scan_stats = { 'total_scans': 0, 'incremental_scans': 0, 'full_scans': 0, 'cache_hits': 0, 'cache_misses': 0 } self._collection_timestamps: Dict[str, Dict[str, datetime]] = {} # instance -> {collection: last_modified} # 初始化本地存储组件 self.config = get_config() self.local_storage = LocalSemanticStorage() self.file_manager = SemanticFileManager(self.local_storage) def _should_perform_full_scan(self, instance_name: str) -> bool: """判断是否应该执行全量扫描""" last_scan = self.last_scan_time.get(instance_name) if not last_scan: return True # 如果距离上次扫描超过24小时,执行全量扫描 time_diff = datetime.now() - last_scan return time_diff.total_seconds() > 86400 # 24小时 async def scan_all_metadata(self, force_full_scan: bool = False) -> bool: """扫描所有实例的元数据""" try: scan_tasks = [] for instance_name in self.connection_manager.get_instance_names(): if force_full_scan or self._should_perform_full_scan(instance_name): scan_tasks.append(self.scan_instance_metadata(instance_name, full_scan=True)) else: scan_tasks.append(self.scan_instance_metadata_incremental(instance_name)) # 并发执行扫描任务 results = await asyncio.gather(*scan_tasks, return_exceptions=True) # 检查结果 success_count = sum(1 for result in results if result is True) total_count = len(results) self._scan_stats['total_scans'] += 1 if force_full_scan: self._scan_stats['full_scans'] += 1 else: self._scan_stats['incremental_scans'] += 1 logger.info(f"元数据扫描完成: {success_count}/{total_count} 个实例成功") return success_count == total_count except Exception as e: logger.error(f"扫描所有元数据失败: {e}") return False async def scan_instance_metadata(self, instance_name: str, full_scan: bool = False) -> bool: """扫描指定实例的元数据""" try: # 确保实例元数据已初始化 if not await self.init_instance_metadata(instance_name): logger.error(f"实例 {instance_name} 元数据初始化失败") return False # 获取实例连接 instance_connection = self.connection_manager.get_instance_connection(instance_name) if not instance_connection or not instance_connection.client: logger.error(f"无法获取实例 {instance_name} 的连接") return False # 获取实例信息 instance_doc = await self.get_instance_by_name(instance_name, instance_name) if not instance_doc: # 创建实例记录 instance_config = { "name": instance_name, "alias": instance_name, "connection_string": "auto_discovered", "description": "自动发现的实例", "environment": "unknown" } instance_id = await self.save_instance(instance_name, instance_config) else: instance_id = instance_doc["_id"] # 获取数据库列表 database_names = await instance_connection.client.list_database_names() scan_count = 0 for db_name in database_names: # 跳过系统数据库 if db_name in ['admin', 'config', 'local']: continue try: # 扫描数据库 if await self._scan_database_metadata(instance_name, instance_id, db_name, full_scan): scan_count += 1 except Exception as e: logger.warning(f"扫描数据库 {db_name} 失败", error=str(e)) continue # 更新扫描时间 self.last_scan_time[instance_name] = datetime.now() logger.info(f"实例 {instance_name} 元数据扫描完成,处理了 {scan_count} 个数据库") return True except Exception as e: logger.error(f"扫描实例 {instance_name} 元数据失败", error=str(e)) return False async def scan_instance_metadata_incremental(self, instance_name: str) -> bool: """增量扫描实例元数据""" try: # 检查缓存 cache_key = f"metadata_{instance_name}" if cache_key in self.metadata_cache: self._scan_stats['cache_hits'] += 1 logger.debug(f"使用缓存的元数据: {instance_name}") return True self._scan_stats['cache_misses'] += 1 # 执行增量扫描(只扫描变更的集合) return await self.scan_instance_metadata(instance_name, full_scan=False) except Exception as e: logger.error(f"增量扫描实例 {instance_name} 失败", error=str(e)) return False async def _scan_database_metadata(self, instance_name: str, instance_id: ObjectId, database_name: str, full_scan: bool = False) -> bool: """扫描数据库元数据""" try: # 保存数据库信息 db_info = { "name": database_name, "description": f"自动发现的数据库: {database_name}" } await self.save_database(instance_name, instance_id, db_info) # 获取数据库连接 database = self.connection_manager.get_instance_database(instance_name, database_name) if not database: return False # 获取集合列表 collection_names = await database.list_collection_names() for collection_name in collection_names: # 跳过系统集合 if collection_name.startswith('system.') or collection_name.startswith('_querynest'): continue try: await self._scan_collection_metadata( instance_name, instance_id, database_name, collection_name, database, full_scan ) except Exception as e: logger.warning(f"扫描集合 {collection_name} 失败", error=str(e)) continue return True except Exception as e: logger.error(f"扫描数据库 {database_name} 失败", error=str(e)) return False async def _scan_collection_metadata(self, instance_name: str, instance_id: ObjectId, database_name: str, collection_name: str, database, full_scan: bool = False) -> bool: """扫描集合元数据""" try: collection = database[collection_name] # 获取文档数量 doc_count = await collection.count_documents({}) # 获取样本文档 sample_docs = [] if doc_count > 0: cursor = collection.find().limit(3) async for doc in cursor: # 转换ObjectId为字符串以便序列化 sample_doc = self._serialize_document(doc) sample_docs.append(sample_doc) # 保存集合信息 collection_info = { "database": database_name, "name": collection_name, "description": f"自动发现的集合: {collection_name}", "document_count": doc_count, "sample_documents": sample_docs } await self.save_collection(instance_name, instance_id, collection_info) # 分析字段结构 if full_scan or doc_count <= 1000: # 小集合或全量扫描时分析字段 await self._analyze_collection_fields( instance_name, instance_id, database_name, collection_name, collection ) return True except Exception as e: logger.error(f"扫描集合 {collection_name} 失败", error=str(e)) return False async def _analyze_collection_fields(self, instance_name: str, instance_id: ObjectId, database_name: str, collection_name: str, collection) -> bool: """分析集合字段结构""" try: # 使用聚合管道分析字段 pipeline = [ {"$sample": {"size": 100}}, # 采样100个文档 {"$project": {"arrayofkeyvalue": {"$objectToArray": "$$ROOT"}}}, {"$unwind": "$arrayofkeyvalue"}, {"$group": { "_id": { "field": "$arrayofkeyvalue.k", "type": {"$type": "$arrayofkeyvalue.v"} }, "count": {"$sum": 1}, "examples": {"$addToSet": "$arrayofkeyvalue.v"} }}, {"$sort": {"count": -1}} ] field_stats = {} async for result in collection.aggregate(pipeline): field_name = result["_id"]["field"] field_type = result["_id"]["type"] count = result["count"] examples = result["examples"][:5] # 只保留前5个示例 if field_name not in field_stats: field_stats[field_name] = { "types": {}, "total_count": 0, "examples": [] } field_stats[field_name]["types"][field_type] = count field_stats[field_name]["total_count"] += count field_stats[field_name]["examples"].extend(examples) # 保存字段信息 for field_name, stats in field_stats.items(): # 确定主要类型 main_type = max(stats["types"], key=stats["types"].get) field_info = { "database": database_name, "collection": collection_name, "path": field_name, "type": main_type, "examples": list(set(str(ex) for ex in stats["examples"][:5])), "is_indexed": False, # 需要额外查询索引信息 "is_required": stats["total_count"] > 0 } await self.save_field(instance_name, instance_id, field_info) return True except Exception as e: logger.error(f"分析集合 {collection_name} 字段失败", error=str(e)) return False def _serialize_document(self, doc: Dict) -> Dict: """序列化文档,处理特殊类型""" if isinstance(doc, dict): result = {} for key, value in doc.items(): if isinstance(value, ObjectId): result[key] = str(value) elif isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, (dict, list)): result[key] = self._serialize_document(value) else: result[key] = value return result elif isinstance(doc, list): return [self._serialize_document(item) for item in doc] else: return doc async def initialize(self) -> bool: """初始化元数据管理器(不预先初始化实例元数据)""" try: # 元数据管理器初始化完成,实例元数据将按需初始化 logger.info("元数据管理器初始化完成(按需模式)") return True except Exception as e: logger.error("元数据管理器初始化失败", error=str(e)) return False async def _create_indexes_for_instance(self, instance_name: str): """为指定实例创建必要的索引""" try: collections = self._instance_collections[instance_name] # instances集合索引 await collections['instances'].create_index("instance_name", unique=True) await collections['instances'].create_index("environment") # databases集合索引 await collections['databases'].create_index( [("instance_id", 1), ("database_name", 1)], unique=True ) # collections集合索引 await collections['collections'].create_index( [("instance_id", 1), ("database_name", 1), ("collection_name", 1)], unique=True ) # fields集合索引 await collections['fields'].create_index( [("instance_id", 1), ("database_name", 1), ("collection_name", 1), ("field_path", 1)], unique=True ) # query_history集合索引 await collections['query_history'].create_index("session_id") await collections['query_history'].create_index("created_at") # semantic_learning集合索引(用于语义学习历史) await collections['semantic_learning'].create_index("instance_id") await collections['semantic_learning'].create_index("created_at") await collections['semantic_learning'].create_index([ ("instance_id", 1), ("database_name", 1), ("collection_name", 1) ]) logger.info("实例元数据索引创建完成", instance=instance_name) except Exception as e: logger.warning("创建实例索引时出现警告", instance=instance_name, error=str(e)) async def init_instance_metadata(self, instance_name: str) -> bool: """按需初始化指定实例的元数据集合""" try: # 先确保连接管理器中的元数据库已初始化 if not await self.connection_manager.init_instance_metadata_on_demand(instance_name): return False # 获取元数据库连接 metadata_db = self.connection_manager.get_metadata_database(instance_name) if metadata_db is None: logger.error("无法获取实例元数据库连接", instance=instance_name) return False # 初始化该实例的集合引用 self._instance_collections[instance_name] = { 'instances': metadata_db['instances'], 'databases': metadata_db['databases'], 'collections': metadata_db['collections'], 'fields': metadata_db['fields'], 'query_history': metadata_db['query_history'], 'semantic_learning': metadata_db['semantic_learning'] } # 为该实例创建索引 await self._create_indexes_for_instance(instance_name) logger.info("实例元数据按需初始化成功", instance=instance_name) return True except Exception as e: logger.error("实例元数据按需初始化失败", instance=instance_name, error=str(e)) return False def _get_instance_collections(self, instance_name: str) -> Optional[Dict[str, AsyncIOMotorCollection]]: """获取指定实例的集合引用""" return self._instance_collections.get(instance_name) # ==================== 实例管理 ==================== async def save_instance(self, target_instance_name: str, instance_config: Dict[str, Any]) -> ObjectId: """在指定实例的元数据库中保存实例配置""" collections = self._get_instance_collections(target_instance_name) if collections is None: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") instance_doc = { "instance_name": instance_config["name"], "instance_alias": instance_config["alias"], "connection_string": instance_config["connection_string"], "description": instance_config.get("description", ""), "environment": instance_config.get("environment", "dev"), "status": instance_config.get("status", "active"), "created_at": datetime.now(), "updated_at": datetime.now() } # 使用upsert更新或插入 result = await collections['instances'].replace_one( {"instance_name": instance_config["name"]}, instance_doc, upsert=True ) if result.upserted_id: logger.info("实例配置已保存", target_instance=target_instance_name, instance_name=instance_config["name"]) return result.upserted_id else: # 获取现有文档的ID existing = await collections['instances'].find_one( {"instance_name": instance_config["name"]} ) logger.info("实例配置已更新", target_instance=target_instance_name, instance_name=instance_config["name"]) return existing["_id"] async def get_instance_by_name(self, target_instance_name: str, instance_name: str) -> Optional[Dict[str, Any]]: """根据名称获取实例信息""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") return await collections['instances'].find_one( {"instance_name": instance_name} ) async def get_all_instances(self, target_instance_name: str, environment: Optional[str] = None) -> List[Dict[str, Any]]: """获取所有实例""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") filter_query = {} if environment: filter_query["environment"] = environment cursor = collections['instances'].find(filter_query) return await cursor.to_list(length=None) # ==================== 数据库管理 ==================== async def save_database(self, target_instance_name: str, instance_id: ObjectId, db_info: Dict[str, Any]) -> ObjectId: """保存数据库信息""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") db_doc = { "instance_id": instance_id, "database_name": db_info["name"], "description": db_info.get("description", ""), "business_domain": db_info.get("business_domain", ""), "created_at": datetime.now(), "updated_at": datetime.now(), "status": "active" } result = await collections['databases'].replace_one( {"instance_id": instance_id, "database_name": db_info["name"]}, db_doc, upsert=True ) if result.upserted_id: return result.upserted_id else: existing = await collections['databases'].find_one( {"instance_id": instance_id, "database_name": db_info["name"]} ) return existing["_id"] async def get_databases_by_instance(self, target_instance_name: str, instance_id: ObjectId) -> List[Dict[str, Any]]: """获取实例的所有数据库""" collections = self._get_instance_collections(target_instance_name) if collections is None: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") cursor = collections['databases'].find({"instance_id": instance_id}) return await cursor.to_list(length=None) async def search_databases(self, target_instance_name: str, query: str, instance_id: Optional[ObjectId] = None) -> List[Dict[str, Any]]: """搜索数据库""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") filter_query = { "$or": [ {"database_name": {"$regex": query, "$options": "i"}}, {"description": {"$regex": query, "$options": "i"}}, {"business_domain": {"$regex": query, "$options": "i"}} ] } if instance_id: filter_query["instance_id"] = instance_id cursor = collections['databases'].find(filter_query) return await cursor.to_list(length=None) # ==================== 集合管理 ==================== async def save_collection(self, target_instance_name: str, instance_id: ObjectId, collection_info: Dict[str, Any]) -> ObjectId: """保存集合信息""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") collection_doc = { "instance_id": instance_id, "database_name": collection_info["database"], "collection_name": collection_info["name"], "description": collection_info.get("description", ""), "business_purpose": collection_info.get("business_purpose", ""), "sample_documents": collection_info.get("sample_documents", []), "document_count": collection_info.get("document_count", 0), "created_at": datetime.now(), "updated_at": datetime.now() } result = await collections['collections'].replace_one( { "instance_id": instance_id, "database_name": collection_info["database"], "collection_name": collection_info["name"] }, collection_doc, upsert=True ) if result.upserted_id: return result.upserted_id else: existing = await collections['collections'].find_one({ "instance_id": instance_id, "database_name": collection_info["database"], "collection_name": collection_info["name"] }) return existing["_id"] async def get_collections_by_database(self, target_instance_name: str, instance_id: ObjectId, database_name: str) -> List[Dict[str, Any]]: """获取数据库的所有集合""" collections = self._get_instance_collections(target_instance_name) if collections is None: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") cursor = collections['collections'].find({ "instance_id": instance_id, "database_name": database_name }) return await cursor.to_list(length=None) # ==================== 字段管理 ==================== async def save_field(self, target_instance_name: str, instance_id: ObjectId, field_info: Dict[str, Any]) -> ObjectId: """保存字段信息""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") field_doc = { "instance_id": instance_id, "database_name": field_info["database"], "collection_name": field_info["collection"], "field_path": field_info["path"], "field_type": field_info.get("type", "unknown"), "business_meaning": field_info.get("business_meaning", ""), "examples": field_info.get("examples", []), "is_indexed": field_info.get("is_indexed", False), "is_required": field_info.get("is_required", False), "created_at": datetime.now(), "updated_at": datetime.now() } result = await collections['fields'].replace_one( { "instance_id": instance_id, "database_name": field_info["database"], "collection_name": field_info["collection"], "field_path": field_info["path"] }, field_doc, upsert=True ) if result.upserted_id: return result.upserted_id else: existing = await collections['fields'].find_one({ "instance_id": instance_id, "database_name": field_info["database"], "collection_name": field_info["collection"], "field_path": field_info["path"] }) return existing["_id"] async def get_fields_by_collection(self, target_instance_name: str, instance_id: ObjectId, database_name: str, collection_name: str) -> List[Dict[str, Any]]: """获取集合的所有字段""" collections = self._get_instance_collections(target_instance_name) if collections is None: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") cursor = collections['fields'].find({ "instance_id": instance_id, "database_name": database_name, "collection_name": collection_name }) return await cursor.to_list(length=None) async def update_field_semantics(self, target_instance_name: str, instance_id: ObjectId, database_name: str, collection_name: str, field_path: str, business_meaning: str, examples: List[str] = None) -> bool: """更新字段业务语义,使用本地文件存储""" try: success = await self.local_storage.save_field_semantics( target_instance_name, database_name, collection_name, field_path, business_meaning, examples or [] ) if success: logger.info( "字段语义已保存到本地文件", instance=target_instance_name, database=database_name, collection=collection_name, field=field_path, meaning=business_meaning ) return success except Exception as e: logger.error("本地文件语义保存失败", error=str(e)) return False async def _update_metadata_semantics(self, target_instance_name: str, instance_id: ObjectId, database_name: str, collection_name: str, field_path: str, business_meaning: str, examples: List[str] = None) -> bool: """在元数据库中更新字段语义""" try: collections = self._get_instance_collections(target_instance_name) if not collections: logger.warning("元数据库集合不可用", instance=target_instance_name) return False update_doc = { "business_meaning": business_meaning, "updated_at": datetime.now() } if examples: update_doc["examples"] = examples result = await collections['fields'].update_one( { "instance_id": instance_id, "database_name": database_name, "collection_name": collection_name, "field_path": field_path }, {"$set": update_doc} ) return result.modified_count > 0 except Exception as e: logger.warning( "元数据库语义更新失败,将尝试业务库存储", instance=target_instance_name, error=str(e) ) return False async def _update_business_semantics(self, target_instance_name: str, database_name: str, collection_name: str, field_path: str, business_meaning: str, examples: List[str] = None) -> bool: """在业务库中存储字段语义""" try: # 获取业务数据库连接 business_db = self.connection_manager.get_instance_database(target_instance_name, database_name) if business_db is None: logger.error("无法获取业务数据库连接", instance=target_instance_name, database=database_name) return False # 在业务库中创建或更新语义集合 semantics_collection = business_db['_querynest_semantics'] semantic_doc = { "collection_name": collection_name, "field_path": field_path, "business_meaning": business_meaning, "updated_at": datetime.now(), "source": "querynest_analyzer" } if examples: semantic_doc["examples"] = examples # 使用 upsert 更新或插入语义信息 result = await semantics_collection.replace_one( { "collection_name": collection_name, "field_path": field_path }, semantic_doc, upsert=True ) logger.info( "字段语义已存储至业务库", instance=target_instance_name, database=database_name, collection=collection_name, field=field_path, business_collection="_querynest_semantics" ) return True except Exception as e: logger.error( "业务库语义存储失败", instance=target_instance_name, database=database_name, error=str(e) ) return False # ==================== 查询历史管理 ==================== async def save_query_history(self, target_instance_name: str, query_info: Dict[str, Any]) -> ObjectId: """保存查询历史""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") query_doc = { "session_id": query_info["session_id"], "instance_id": query_info["instance_id"], "user_intent": query_info["user_intent"], "selected_database": query_info.get("selected_database"), "selected_collection": query_info.get("selected_collection"), "generated_query": query_info.get("generated_query"), "execution_result": query_info.get("execution_result"), "user_feedback": query_info.get("user_feedback", ""), "created_at": datetime.now() } result = await collections['query_history'].insert_one(query_doc) return result.inserted_id async def get_query_history(self, target_instance_name: str, session_id: str, limit: int = 10) -> List[Dict[str, Any]]: """获取查询历史""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") cursor = collections['query_history'].find( {"session_id": session_id} ).sort("created_at", -1).limit(limit) return await cursor.to_list(length=None) # ==================== 搜索和推荐 ==================== async def search_by_business_meaning(self, target_instance_name: str, query: str, instance_id: Optional[ObjectId] = None) -> Dict[str, List[Dict[str, Any]]]: """根据业务含义搜索""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") results = { "databases": [], "collections": [], "fields": [] } search_filter = { "$or": [ {"business_meaning": {"$regex": query, "$options": "i"}}, {"description": {"$regex": query, "$options": "i"}} ] } if instance_id: search_filter["instance_id"] = instance_id # 搜索数据库 db_cursor = collections['databases'].find(search_filter) results["databases"] = await db_cursor.to_list(length=None) # 搜索集合 collection_filter = search_filter.copy() collection_filter["$or"].append({"business_purpose": {"$regex": query, "$options": "i"}}) collection_cursor = collections['collections'].find(collection_filter) results["collections"] = await collection_cursor.to_list(length=None) # 搜索字段 field_cursor = collections['fields'].find(search_filter) results["fields"] = await field_cursor.to_list(length=None) return results async def search_fields_by_meaning(self, target_instance_name: str, search_term: str) -> List[Dict[str, Any]]: """根据业务含义搜索字段,使用本地文件存储""" try: results = await self.local_storage.search_semantics(target_instance_name, search_term) logger.info( "本地文件语义搜索完成", target_instance=target_instance_name, search_term=search_term, total_results=len(results) ) return results except Exception as e: logger.error("本地文件语义搜索失败", error=str(e)) return [] async def _search_metadata_semantics(self, target_instance_name: str, search_term: str) -> List[Dict[str, Any]]: """在元数据库中搜索语义信息""" collections = self._get_instance_collections(target_instance_name) if not collections: return [] # 构建搜索条件:在字段路径、业务含义、示例中搜索 search_filter = { "$or": [ {"field_path": {"$regex": search_term, "$options": "i"}}, {"business_meaning": {"$regex": search_term, "$options": "i"}}, {"examples": {"$elemMatch": {"$regex": search_term, "$options": "i"}}} ] } field_cursor = collections['fields'].find(search_filter) results = await field_cursor.to_list(length=None) # 为结果添加来源标记 for result in results: result["semantic_source"] = "metadata_db" return results async def _search_business_semantics(self, target_instance_name: str, search_term: str) -> List[Dict[str, Any]]: """在业务库中搜索语义信息""" results = [] # 获取实例的所有数据库 try: # 这里需要从配置或连接管理器获取数据库列表 # 暂时使用一个简化的方式 instance_connection = self.connection_manager.get_instance_connection(target_instance_name) if not instance_connection or not instance_connection.client: return [] # 获取所有数据库名称 database_names = await instance_connection.client.list_database_names() # 搜索每个数据库的语义集合 for db_name in database_names: # 跳过系统数据库 if db_name in ['admin', 'config', 'local', 'querynest_metadata']: continue try: business_db = instance_connection.get_database(db_name) if business_db is None: continue # 检查是否存在语义集合 collection_names = await business_db.list_collection_names() if '_querynest_semantics' not in collection_names: continue semantics_collection = business_db['_querynest_semantics'] # 构建搜索条件 - 修复examples字段的搜索语法 search_filter = { "$or": [ {"field_path": {"$regex": search_term, "$options": "i"}}, {"business_meaning": {"$regex": search_term, "$options": "i"}}, {"examples": {"$elemMatch": {"$regex": search_term, "$options": "i"}}} ] } cursor = semantics_collection.find(search_filter) db_results = await cursor.to_list(length=None) # 为结果添加数据库信息和来源标记 for result in db_results: result["database_name"] = db_name result["semantic_source"] = "business_db" # 确保有实例ID字段(兼容性) if "instance_id" not in result: result["instance_id"] = target_instance_name results.append(result) except Exception as e: logger.debug(f"搜索业务库 {db_name} 语义信息时出错", error=str(e)) continue except Exception as e: logger.warning("获取业务库列表失败", error=str(e)) return results def _deduplicate_semantic_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """去重语义搜索结果,优先级:本地文件 > 元数据库 > 业务库""" seen = set() unique_results = [] # 优先级1:本地文件的结果 for result in results: if result.get("semantic_source") == "local_file": key = (result.get("database_name", ""), result.get("collection_name", ""), result.get("field_path", "")) if key not in seen: seen.add(key) unique_results.append(result) # 优先级2:元数据库的结果 for result in results: if result.get("semantic_source") == "metadata_db": key = (result.get("database_name", ""), result.get("collection_name", ""), result.get("field_path", "")) if key not in seen: seen.add(key) unique_results.append(result) # 优先级3:业务库的结果 for result in results: if result.get("semantic_source") == "business_db": key = (result.get("database_name", ""), result.get("collection_name", ""), result.get("field_path", "")) if key not in seen: seen.add(key) unique_results.append(result) return unique_results async def get_statistics(self, target_instance_name: str) -> Dict[str, Any]: """获取统计信息""" collections = self._get_instance_collections(target_instance_name) if not collections: raise ValueError(f"实例 {target_instance_name} 的元数据库不可用") stats = {} # 实例统计 stats["instances_count"] = await collections['instances'].count_documents({}) stats["active_instances_count"] = await collections['instances'].count_documents({"status": "active"}) # 数据库统计 stats["databases_count"] = await collections['databases'].count_documents({}) # 集合统计 stats["collections_count"] = await collections['collections'].count_documents({}) # 字段统计 stats["fields_count"] = await collections['fields'].count_documents({}) stats["fields_with_semantics_count"] = await collections['fields'].count_documents({ "business_meaning": {"$ne": ""} }) # 查询历史统计 stats["query_history_count"] = await collections['query_history'].count_documents({}) # 扫描统计 stats["scan_stats"] = self._scan_stats.copy() return stats

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/niuzaishu/QueryNest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server