Skip to main content
Glama

QueryNest

by niuzaishu
collection_analysis.py18.2 kB
# -*- coding: utf-8 -*- """集合结构分析工具""" from typing import Dict, List, Any, Optional import structlog from mcp.types import Tool, TextContent from database.connection_manager import ConnectionManager from database.metadata_manager import MetadataManager from scanner.semantic_analyzer import SemanticAnalyzer from utils.error_handler import with_error_handling, with_retry, RetryConfig logger = structlog.get_logger(__name__) class CollectionAnalysisTool: """集合结构分析工具""" def __init__(self, connection_manager: ConnectionManager, metadata_manager: MetadataManager, semantic_analyzer: SemanticAnalyzer): self.connection_manager = connection_manager self.metadata_manager = metadata_manager self.semantic_analyzer = semantic_analyzer def get_tool_definition(self) -> Tool: """获取工具定义""" return Tool( name="analyze_collection", description="分析指定集合的结构、字段类型和业务语义", inputSchema={ "type": "object", "properties": { "instance_id": { "type": "string", "description": "MongoDB实例ID(注意:参数名为instance_id但实际使用实例名称)" }, "database_name": { "type": "string", "description": "数据库名称" }, "collection_name": { "type": "string", "description": "集合名称" }, "include_semantics": { "type": "boolean", "description": "是否包含语义分析", "default": True }, "include_examples": { "type": "boolean", "description": "是否包含字段示例值", "default": True }, "include_indexes": { "type": "boolean", "description": "是否包含索引信息", "default": True }, "rescan": { "type": "boolean", "description": "是否重新扫描集合结构", "default": False } }, "required": ["instance_id", "database_name", "collection_name"] } ) @with_error_handling("集合分析失败") async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """执行集合分析""" instance_id = arguments["instance_id"] database_name = arguments["database_name"] collection_name = arguments["collection_name"] include_semantics = arguments.get("include_semantics", True) include_examples = arguments.get("include_examples", True) include_indexes = arguments.get("include_indexes", True) rescan = arguments.get("rescan", False) logger.info( "开始分析集合结构", instance_id=instance_id, database=database_name, collection=collection_name ) # 验证实例和连接 if not self.connection_manager.has_instance(instance_id): return [TextContent( type="text", text=f"实例 '{instance_id}' 不存在。请使用 discover_instances 工具查看可用实例。" )] # 检查实例健康状态 health_status = await self.connection_manager.check_instance_health(instance_id) if not health_status["healthy"]: return [TextContent( type="text", text=f"实例 '{instance_id}' 不健康: {health_status.get('error', 'Unknown error')}" )] # 验证集合是否存在 collection_exists = await self._check_collection_exists(instance_id, database_name, collection_name) if not collection_exists: return [TextContent( type="text", text=f"集合 '{database_name}.{collection_name}' 不存在。" )] # 如果需要重新扫描或集合信息不存在,进行扫描 if rescan or not await self._has_collection_metadata(instance_id, database_name, collection_name): await self._scan_collection(instance_id, database_name, collection_name) # 获取集合基本信息 from bson import ObjectId if isinstance(instance_id, str): # 如果是字符串,需要先获取实例信息来得到ObjectId instance_info = await self.metadata_manager.get_instance_by_name(instance_id, instance_id) if not instance_info: return [TextContent( type="text", text=f"无法找到实例 '{instance_id}'。" )] instance_obj_id = instance_info["_id"] else: instance_obj_id = instance_id collections = await self.metadata_manager.get_collections_by_database( instance_id, instance_obj_id, database_name ) # 查找指定的集合 collection_info = None for collection in collections: if collection.get("collection_name") == collection_name: collection_info = collection break if not collection_info: return [TextContent( type="text", text=f"无法获取集合 '{database_name}.{collection_name}' 的信息。请尝试重新扫描。" )] # 获取字段信息 # 先获取实例信息来得到正确的instance_id instance_info = await self.metadata_manager.get_instance_by_name(instance_id, instance_id) if not instance_info: return [TextContent( type="text", text=f"无法找到实例 '{instance_id}'。" )] actual_instance_id = instance_info["_id"] fields = await self.metadata_manager.get_fields_by_collection( instance_id, actual_instance_id, database_name, collection_name ) # 构建分析结果 result_text = await self._build_analysis_result( collection_info, fields, instance_id, database_name, collection_name, include_semantics, include_examples, include_indexes ) logger.info( "集合分析完成", instance_id=instance_id, database=database_name, collection=collection_name, field_count=len(fields) ) return [TextContent(type="text", text=result_text)] @with_retry(RetryConfig(max_attempts=3, base_delay=1.0)) @with_error_handling("检查集合存在性失败") async def _check_collection_exists(self, instance_id: str, database_name: str, collection_name: str) -> bool: """检查集合是否存在""" connection = self.connection_manager.get_instance_connection(instance_id) if connection is None: return False db = connection.get_database(database_name) collection_names = await db.list_collection_names() return collection_name in collection_names @with_error_handling("检查集合元数据失败") async def _has_collection_metadata(self, instance_id: str, database_name: str, collection_name: str) -> bool: """检查是否已有集合元数据""" # 获取实例的ObjectId from bson import ObjectId if isinstance(instance_id, str): # 如果是字符串,需要先获取实例信息来得到ObjectId instance_info = await self.metadata_manager.get_instance_by_name(instance_id, instance_id) if not instance_info: return False instance_obj_id = instance_info["_id"] else: instance_obj_id = instance_id collections = await self.metadata_manager.get_collections_by_database( instance_id, instance_obj_id, database_name ) # 检查是否存在指定的集合 for collection in collections: if collection.get("collection_name") == collection_name: return True return False async def _scan_collection(self, instance_id: str, database_name: str, collection_name: str) -> Dict[str, Any]: """扫描集合结构""" try: # 获取连接 connection = self.connection_manager.get_instance_connection(instance_id) if connection is None: return {"error": f"无法连接到实例 {instance_id}"} # 获取数据库 db = connection.get_database(database_name) # 检查集合是否存在 if collection_name not in await db.list_collection_names(): return {"error": f"集合 '{database_name}.{collection_name}' 不存在。"} # 获取实例信息 from bson import ObjectId try: instance_obj_id = ObjectId(instance_id) except: instance_obj_id = instance_id # 获取实例名称 # 直接使用 instance_id 作为 instance_name,因为在测试环境中它们通常是相同的 instance_name = instance_id # 扫描集合结构 from ..scanner.structure_scanner import StructureScanner from ..config import QueryNestConfig config = QueryNestConfig() scanner = StructureScanner(self.connection_manager, self.metadata_manager, config) structure = await scanner.scan_collection_structure(instance_name, instance_obj_id, database_name, collection_name) # 获取索引信息 indexes = await self._get_collection_indexes(instance_id, database_name, collection_name) return { "name": collection_name, "structure": structure, "indexes": indexes } except Exception as e: logger.error("扫描集合结构失败", instance_id=instance_id, database=database_name, collection=collection_name, error=str(e)) return {"error": f"扫描集合结构失败: {str(e)}"} logger.info( "集合结构扫描完成", instance_id=instance_id, database=database_name, collection=collection_name ) except Exception as e: logger.error( "扫描集合结构失败", instance_id=instance_id, database=database_name, collection=collection_name, error=str(e) ) raise async def _build_analysis_result(self, collection_info: Dict[str, Any], fields: List[Dict[str, Any]], instance_id: str, database_name: str, collection_name: str, include_semantics: bool, include_examples: bool, include_indexes: bool) -> str: """构建分析结果文本""" result_text = f"## 集合分析: {database_name}.{collection_name}\n\n" # 基本信息 result_text += "### 基本信息\n" result_text += f"- **实例**: {instance_id}\n" result_text += f"- **数据库**: {database_name}\n" result_text += f"- **集合**: {collection_name}\n" result_text += f"- **文档数量**: {collection_info.get('document_count', '未知')}\n" result_text += f"- **平均文档大小**: {self._format_size(collection_info.get('avg_doc_size', 0))}\n" if collection_info.get("description"): result_text += f"- **描述**: {collection_info['description']}\n" result_text += "\n" # 索引信息 if include_indexes: indexes = await self._get_collection_indexes(instance_id, database_name, collection_name) if indexes: result_text += "### 索引信息\n" for idx in indexes: idx_name = idx.get("name", "未知") idx_keys = idx.get("key", {}) key_desc = ", ".join([f"{k}: {v}" for k, v in idx_keys.items()]) result_text += f"- **{idx_name}**: {key_desc}\n" result_text += "\n" # 字段结构 if fields: result_text += "### 字段结构\n\n" # 按字段路径排序 fields.sort(key=lambda x: x["field_path"]) for field in fields: field_path = field["field_path"] field_type = field.get("field_type", "unknown") occurrence_rate = field.get("occurrence_rate", 0) result_text += f"#### {field_path}\n" result_text += f"- **类型**: {field_type}\n" result_text += f"- **出现率**: {occurrence_rate:.1%}\n" else: result_text += "### 字段结构\n\n" result_text += "集合 '{}.{}' 没有字段信息。请先使用 analyze_collection 工具扫描集合结构。\n\n".format(database_name, collection_name) # 语义分析总结 if include_semantics: semantic_summary = await self._get_semantic_summary(instance_id, database_name, collection_name, fields) if semantic_summary: result_text += semantic_summary # 使用建议 result_text += "### 使用建议\n\n" result_text += "- 使用 `manage_semantics` 工具来添加或更新字段的业务含义\n" result_text += "- 使用 `generate_query` 工具来生成针对此集合的查询\n" result_text += "- 对于高频查询字段,建议添加索引以提高性能\n" return result_text @with_retry(RetryConfig(max_attempts=2, base_delay=0.5)) @with_error_handling("获取索引信息失败") async def _get_collection_indexes(self, instance_id: str, database_name: str, collection_name: str) -> List[Dict[str, Any]]: """获取集合索引信息""" connection = self.connection_manager.get_instance_connection(instance_id) if connection is None: return [] db = connection.get_database(database_name) collection = db[collection_name] indexes = [] async for index in collection.list_indexes(): indexes.append(index) return indexes @with_error_handling("生成语义总结失败") async def _get_semantic_summary(self, instance_id: str, database_name: str, collection_name: str, fields: List[Dict[str, Any]]) -> str: """获取语义分析总结""" # 统计语义信息 total_fields = len(fields) fields_with_meaning = sum(1 for field in fields if field.get("business_meaning")) if total_fields == 0: return "" semantic_coverage = fields_with_meaning / total_fields summary = "### 语义分析总结\n\n" summary += f"- **字段总数**: {total_fields}\n" summary += f"- **已定义语义**: {fields_with_meaning} ({semantic_coverage:.1%})\n" if semantic_coverage < 0.5: summary += "- **建议**: 语义覆盖率较低,建议使用 `manage_semantics` 工具完善字段含义\n" elif semantic_coverage < 0.8: summary += "- **建议**: 语义覆盖率中等,可以进一步完善剩余字段的含义\n" else: summary += "- **状态**: ✅ 语义覆盖率良好\n" # 推荐业务领域 business_domains = await self.semantic_analyzer.suggest_business_domain( database_name, [{"collection_name": collection_name}] ) if business_domains: summary += f"- **推荐业务领域**: {', '.join(business_domains)}\n" summary += "\n" return summary def _format_size(self, size_bytes: int) -> str: """格式化字节大小""" if size_bytes == 0: return "0 B" units = ['B', 'KB', 'MB', 'GB', 'TB'] unit_index = 0 size = float(size_bytes) while size >= 1024 and unit_index < len(units) - 1: size /= 1024 unit_index += 1 return f"{size:.1f} {units[unit_index]}" @with_error_handling("获取字段建议失败") async def get_field_suggestions(self, instance_id: str, database_name: str, collection_name: str, query_description: str) -> List[Dict[str, Any]]: """根据查询描述获取字段建议""" # 先获取实例信息来得到正确的instance_id instance_info = await self.metadata_manager.get_instance_by_name(instance_id, instance_id) if not instance_info: return [] actual_instance_id = instance_info["_id"] fields = await self.metadata_manager.get_fields_by_collection( instance_id, actual_instance_id, database_name, collection_name ) if not fields: return [] # 使用语义分析器获取建议 suggestions = self.semantic_analyzer.get_semantic_suggestions_for_query( query_description, fields ) return suggestions

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/niuzaishu/QueryNest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server