Skip to main content
Glama

QueryNest

by niuzaishu
semantic_analyzer.py23 kB
# -*- coding: utf-8 -*- """语义分析器""" import re from typing import Dict, List, Optional, Any, Tuple from datetime import datetime import structlog import asyncio from bson import ObjectId from database.metadata_manager_file import FileBasedMetadataManager from database.connection_manager import ConnectionManager from storage.local_semantic_storage import LocalSemanticStorage from storage.semantic_file_manager import SemanticFileManager from storage.config import get_config logger = structlog.get_logger(__name__) class SemanticAnalyzer: """语义分析器""" def __init__(self, metadata_manager: FileBasedMetadataManager, connection_manager: ConnectionManager): self.metadata_manager = metadata_manager self.connection_manager = connection_manager # 初始化本地存储 config = get_config() self.local_storage = LocalSemanticStorage(str(config.base_path)) self.file_manager = SemanticFileManager(self.local_storage) # 常见字段名称模式和对应的业务含义 self.field_patterns = { # 用户相关 r'user.*id|uid|user_id': '用户ID', r'user.*name|username': '用户名', r'user.*email|email': '用户邮箱', r'user.*phone|phone|mobile': '用户手机号', r'user.*avatar|avatar': '用户头像', r'user.*age|age': '用户年龄', r'user.*gender|gender|sex': '用户性别', # 订单相关 r'order.*id|order_id': '订单ID', r'order.*no|order_no|order_number': '订单号', r'order.*status|status': '订单状态', r'order.*amount|amount|price|total': '订单金额', r'order.*time|order_time|created_time': '订单时间', # 产品相关 r'product.*id|prod_id|product_id': '产品ID', r'product.*name|prod_name|product_name': '产品名称', r'product.*price|price': '产品价格', r'product.*desc|description': '产品描述', r'product.*category|category': '产品分类', # 时间相关 r'create.*time|created.*at|create_time': '创建时间', r'update.*time|updated.*at|update_time|modify_time': '更新时间', r'delete.*time|deleted.*at|delete_time': '删除时间', r'start.*time|start_time|begin_time': '开始时间', r'end.*time|end_time|finish_time': '结束时间', # 状态相关 r'.*status$|.*state$': '状态', r'.*flag$|.*enabled$|.*disabled$': '标志位', r'is_.*|has_.*': '布尔标识', # 地址相关 r'.*address|addr': '地址', r'.*city': '城市', r'.*province|.*state': '省份/州', r'.*country': '国家', r'.*zip.*code|.*postal.*code': '邮政编码', # 通用ID r'.*_id$|.*id$': 'ID标识', r'.*_no$|.*no$|.*number$': '编号', r'.*_code$|.*code$': '代码', r'.*_name$|.*name$': '名称', r'.*_type$|.*type$': '类型', r'.*_count$|.*count$': '数量', r'.*_url$|.*url$': 'URL地址', } # 数据类型对应的业务含义提示 self.type_hints = { 'objectId': 'MongoDB文档ID', 'string': '文本信息', 'integer': '整数值', 'double': '浮点数值', 'boolean': '布尔值(真/假)', 'date': '日期时间', 'array': '数组/列表', 'object': '嵌套对象' } # 常见值模式和对应含义 self.value_patterns = { r'^\d{11}$': '手机号码', r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$': '邮箱地址', r'^https?://': 'URL地址', r'^\d{4}-\d{2}-\d{2}': '日期格式', r'^[0-9a-fA-F]{24}$': 'MongoDB ObjectId', r'^\d{13}$': '时间戳(毫秒)', r'^\d{10}$': '时间戳(秒)', r'^[0-9]{6}$': '验证码/邮政编码', } async def analyze_field_semantics(self, instance_id: str, database_name: str, collection_name: str, field_path: str, field_info: Dict[str, Any]) -> Dict[str, Any]: """分析字段语义""" analysis_result = { "suggested_meaning": "", "confidence": 0.0, "reasoning": [], "suggestions": [] } try: # 基于字段名称分析 name_analysis = self._analyze_field_name(field_path) if name_analysis["meaning"]: analysis_result["suggested_meaning"] = name_analysis["meaning"] analysis_result["confidence"] += name_analysis["confidence"] analysis_result["reasoning"].append(f"字段名称匹配: {name_analysis['pattern']}") # 基于数据类型分析 type_analysis = self._analyze_field_type(field_info.get("type", "unknown")) if type_analysis["hint"]: analysis_result["reasoning"].append(f"数据类型: {type_analysis['hint']}") analysis_result["confidence"] += type_analysis["confidence"] # 基于示例值分析 examples = field_info.get("examples", []) if examples: value_analysis = self._analyze_field_values(examples) if value_analysis["meaning"]: if not analysis_result["suggested_meaning"]: analysis_result["suggested_meaning"] = value_analysis["meaning"] analysis_result["confidence"] += value_analysis["confidence"] analysis_result["reasoning"].append(f"值模式匹配: {value_analysis['pattern']}") # 基于上下文分析(集合名称、数据库名称) context_analysis = self._analyze_context(database_name, collection_name, field_path) if context_analysis["suggestions"]: analysis_result["suggestions"].extend(context_analysis["suggestions"]) analysis_result["reasoning"].append("基于上下文推断") # 标准化置信度 analysis_result["confidence"] = min(analysis_result["confidence"], 1.0) # 如果没有明确的语义建议,提供通用建议 if not analysis_result["suggested_meaning"]: analysis_result["suggested_meaning"] = self._generate_generic_meaning(field_path, field_info) analysis_result["confidence"] = 0.3 analysis_result["reasoning"].append("通用推断") # 生成改进建议 analysis_result["suggestions"].extend(self._generate_improvement_suggestions(field_info)) logger.debug( "字段语义分析完成", field_path=field_path, suggested_meaning=analysis_result["suggested_meaning"], confidence=analysis_result["confidence"] ) except Exception as e: logger.error("字段语义分析异常", field_path=field_path, error=str(e)) analysis_result["reasoning"].append(f"分析异常: {str(e)}") return analysis_result def _analyze_field_name(self, field_path: str) -> Dict[str, Any]: """基于字段名称分析语义""" field_name = field_path.lower().replace('_', '').replace('-', '') for pattern, meaning in self.field_patterns.items(): if re.search(pattern, field_name, re.IGNORECASE): return { "meaning": meaning, "confidence": 0.7, "pattern": pattern } return {"meaning": "", "confidence": 0.0, "pattern": ""} def _analyze_field_type(self, field_type: str) -> Dict[str, Any]: """基于数据类型分析""" hint = self.type_hints.get(field_type, "") confidence = 0.2 if hint else 0.0 return { "hint": hint, "confidence": confidence } def _analyze_field_values(self, examples: List[Any]) -> Dict[str, Any]: """基于示例值分析语义""" if not examples: return {"meaning": "", "confidence": 0.0, "pattern": ""} # 统计匹配的模式 pattern_matches = {} for example in examples: if example is None: continue example_str = str(example) for pattern, meaning in self.value_patterns.items(): if re.match(pattern, example_str): if meaning not in pattern_matches: pattern_matches[meaning] = 0 pattern_matches[meaning] += 1 if pattern_matches: # 选择匹配最多的模式 best_meaning = max(pattern_matches, key=pattern_matches.get) match_rate = pattern_matches[best_meaning] / len(examples) return { "meaning": best_meaning, "confidence": match_rate * 0.8, # 最高0.8的置信度 "pattern": best_meaning } return {"meaning": "", "confidence": 0.0, "pattern": ""} def _analyze_context(self, database_name: str, collection_name: str, field_path: str) -> Dict[str, Any]: """基于上下文分析""" suggestions = [] # 基于集合名称推断 collection_lower = collection_name.lower() if 'user' in collection_lower: if any(keyword in field_path.lower() for keyword in ['name', 'email', 'phone']): suggestions.append("用户相关信息") elif 'order' in collection_lower: if any(keyword in field_path.lower() for keyword in ['amount', 'price', 'total']): suggestions.append("订单金额相关") elif 'product' in collection_lower: if any(keyword in field_path.lower() for keyword in ['name', 'title', 'desc']): suggestions.append("产品信息相关") # 基于数据库名称推断 db_lower = database_name.lower() if 'ecommerce' in db_lower or 'shop' in db_lower: suggestions.append("电商业务相关") elif 'cms' in db_lower or 'content' in db_lower: suggestions.append("内容管理相关") elif 'user' in db_lower or 'auth' in db_lower: suggestions.append("用户认证相关") return {"suggestions": suggestions} def _generate_generic_meaning(self, field_path: str, field_info: Dict[str, Any]) -> str: """生成通用语义描述""" field_type = field_info.get("type", "unknown") # 基于字段路径的最后一部分 field_name = field_path.split('.')[-1] if field_name.endswith('_id') or field_name.endswith('Id'): return f"{field_name[:-3]}的标识符" elif field_name.endswith('_name') or field_name.endswith('Name'): return f"{field_name[:-5]}的名称" elif field_name.endswith('_time') or field_name.endswith('Time'): return f"{field_name[:-5]}的时间" elif field_name.endswith('_count') or field_name.endswith('Count'): return f"{field_name[:-6]}的数量" elif field_type == 'boolean': return f"{field_name}的状态标识" elif field_type in ['integer', 'double']: return f"{field_name}的数值" elif field_type == 'date': return f"{field_name}的日期时间" else: return f"{field_name}字段" def _generate_improvement_suggestions(self, field_info: Dict[str, Any]) -> List[str]: """生成改进建议""" suggestions = [] # 检查是否需要索引 if not field_info.get("is_indexed", False): occurrence_rate = field_info.get("occurrence_rate", 0) if occurrence_rate > 0.8: # 出现率高的字段建议添加索引 suggestions.append("建议为此字段添加索引以提高查询性能") # 检查数据一致性 types = field_info.get("types", {}) if len(types) > 1: suggestions.append("字段存在多种数据类型,建议统一数据格式") # 检查是否为必需字段 occurrence_rate = field_info.get("occurrence_rate", 0) if occurrence_rate < 0.5: suggestions.append("字段出现率较低,考虑是否为可选字段") elif occurrence_rate > 0.9 and not field_info.get("is_required", False): suggestions.append("字段出现率很高,建议设置为必需字段") return suggestions async def batch_analyze_collection(self, instance_id: str, database_name: str, collection_name: str) -> Dict[str, Any]: """批量分析集合中的所有字段""" logger.info( "开始批量分析集合字段语义", database=database_name, collection=collection_name ) try: # 获取集合的所有字段 # 先获取实例信息来得到正确的instance_id instance_info = await self.metadata_manager.get_instance_by_name(instance_id, instance_id) if not instance_info: return { "total_fields": 0, "analyzed_fields": 0, "updated_fields": 0, "analysis_results": {}, "error": f"实例 '{instance_id}' 不存在" } actual_instance_id = instance_info["_id"] fields = await self.metadata_manager.get_fields_by_collection( instance_id, actual_instance_id, database_name, collection_name ) analysis_results = {} updated_count = 0 for field in fields: field_path = field["field_path"] # 如果字段已有业务含义,跳过 if field.get("business_meaning"): continue # 分析字段语义 analysis = await self.analyze_field_semantics( instance_id, database_name, collection_name, field_path, field ) analysis_results[field_path] = analysis # 如果置信度足够高,自动更新字段语义 if analysis["confidence"] > 0.6 and analysis["suggested_meaning"]: try: # 使用新的本地存储保存字段语义 success = await self.save_field_semantics( instance_id, database_name, collection_name, field_path, analysis["suggested_meaning"] ) if success: updated_count += 1 except Exception as e: logger.warning( "自动更新字段语义失败,跳过该字段", field_path=field_path, error=str(e) ) logger.info( "集合字段语义分析完成", database=database_name, collection=collection_name, total_fields=len(fields), analyzed_fields=len(analysis_results), updated_fields=updated_count ) return { "total_fields": len(fields), "analyzed_fields": len(analysis_results), "updated_fields": updated_count, "analysis_results": analysis_results } except Exception as e: logger.error( "批量分析字段语义异常", database=database_name, collection=collection_name, error=str(e) ) return { "total_fields": 0, "analyzed_fields": 0, "updated_fields": 0, "analysis_results": {}, "error": str(e) } async def suggest_business_domain(self, database_name: str, collections: List[Dict[str, Any]]) -> List[str]: """基于数据库和集合信息推断业务领域""" suggestions = set() # 基于数据库名称 db_lower = database_name.lower() if any(keyword in db_lower for keyword in ['ecommerce', 'shop', 'store', 'mall']): suggestions.add("电子商务") elif any(keyword in db_lower for keyword in ['cms', 'content', 'blog', 'news']): suggestions.add("内容管理") elif any(keyword in db_lower for keyword in ['user', 'auth', 'account']): suggestions.add("用户管理") elif any(keyword in db_lower for keyword in ['finance', 'payment', 'billing']): suggestions.add("金融支付") elif any(keyword in db_lower for keyword in ['log', 'analytics', 'stats']): suggestions.add("数据分析") # 基于集合名称 collection_names = [coll["collection_name"].lower() for coll in collections] if any('user' in name for name in collection_names): suggestions.add("用户管理") if any(keyword in ' '.join(collection_names) for keyword in ['order', 'product', 'cart']): suggestions.add("电子商务") if any(keyword in ' '.join(collection_names) for keyword in ['article', 'post', 'content']): suggestions.add("内容管理") if any(keyword in ' '.join(collection_names) for keyword in ['payment', 'transaction', 'billing']): suggestions.add("金融支付") return list(suggestions) def get_semantic_suggestions_for_query(self, query_description: str, available_fields: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """为查询描述提供语义建议""" suggestions = [] query_lower = query_description.lower() for field in available_fields: field_path = field["field_path"] business_meaning = field.get("business_meaning", "") # 计算相关性得分 relevance_score = 0.0 # 基于业务含义匹配 if business_meaning: meaning_lower = business_meaning.lower() common_words = set(query_lower.split()) & set(meaning_lower.split()) if common_words: relevance_score += len(common_words) * 0.3 # 基于字段名称匹配 field_words = re.split(r'[._]', field_path.lower()) query_words = query_lower.split() field_matches = set(field_words) & set(query_words) if field_matches: relevance_score += len(field_matches) * 0.2 # 基于示例值匹配 examples = field.get("examples", []) for example in examples: if str(example).lower() in query_lower: relevance_score += 0.1 if relevance_score > 0: suggestions.append({ "field_path": field_path, "business_meaning": business_meaning, "relevance_score": relevance_score, "field_type": field.get("field_type", "unknown") }) # 按相关性得分排序 suggestions.sort(key=lambda x: x["relevance_score"], reverse=True) return suggestions[:10] # 返回前10个最相关的字段 async def save_field_semantics(self, instance_name: str, database_name: str, collection_name: str, field_path: str, business_meaning: str, examples: List[str] = None) -> bool: """保存字段语义信息""" try: success = False # 使用本地文件存储 semantic_data = { 'business_meaning': business_meaning, 'examples': examples or [], 'updated_at': datetime.now().isoformat(), 'source': 'semantic_analyzer' } success = await self.local_storage.save_field_semantics( instance_name, database_name, collection_name, field_path, semantic_data ) if success: logger.info( "字段语义已保存到本地文件", instance=instance_name, database=database_name, collection=collection_name, field=field_path, meaning=business_meaning ) return success except Exception as e: logger.error( "保存字段语义失败", instance=instance_name, database=database_name, collection=collection_name, field=field_path, error=str(e) ) return False async def search_semantics(self, instance_name: str, search_term: str) -> List[Dict[str, Any]]: """搜索语义信息""" try: results = [] # 从本地文件搜索 results = await self.local_storage.search_semantics( instance_name, search_term ) logger.info( "语义搜索完成", instance=instance_name, search_term=search_term, total_results_count=len(results) ) return results except Exception as e: logger.error( "语义搜索失败", instance=instance_name, search_term=search_term, error=str(e) ) return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/niuzaishu/QueryNest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server