canonicalize_entities_v1
Standardize and merge duplicate entities in academic literature by processing specified entity types and consolidating entries with identical canonical keys.
Instructions
规范化并合并重复实体
对指定类型的实体进行规范化处理,合并同一 canonical_key 的重复实体。
Args: types: 要处理的实体类型列表,默认 ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"] suggest_only: 是否只返回建议而不执行合并,默认 False max_groups: 最大处理组数,默认 5000
Returns: 合并统计信息和建议列表
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| types | No | ||
| suggest_only | No | ||
| max_groups | No |
Implementation Reference
- The core handler function for the canonicalize_entities_v1 tool. It normalizes canonical_keys for entities of specified types, identifies duplicates, and optionally merges them into the highest-confidence entity per group, updating mentions, relations, and aliases in the database.@mcp.tool() def canonicalize_entities_v1( types: list[str] | None = None, suggest_only: bool = False, max_groups: int = 5000, ) -> dict[str, Any]: """规范化并合并重复实体 对指定类型的实体进行规范化处理,合并同一 canonical_key 的重复实体。 Args: types: 要处理的实体类型列表,默认 ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"] suggest_only: 是否只返回建议而不执行合并,默认 False max_groups: 最大处理组数,默认 5000 Returns: 合并统计信息和建议列表 """ try: # 默认处理的类型 if types is None: types = ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"] # 验证类型 valid_types = [] for t in types: try: valid_types.append(EntityType(t)) except ValueError: pass if not valid_types: return CanonicalizeEntitiesOut( executed=False, merged_groups=0, merged_entities=0, error=MCPErrorModel(code="VALIDATION_ERROR", message="No valid entity types provided"), ).model_dump() type_values = [t.value for t in valid_types] suggestions: list[MergeSuggestion] = [] merged_groups = 0 merged_entities = 0 with get_db() as conn: # 1. 首先更新所有实体的 canonical_key(基于新的规范化规则) entities = query_all( """ SELECT entity_id, type, canonical_name, canonical_key, confidence, is_locked FROM entities WHERE type = ANY(%s) AND is_locked IS NOT TRUE ORDER BY entity_id """, (type_values,) ) # 计算新的 canonical_key entity_new_keys: dict[int, str] = {} for entity in entities: entity_type = EntityType(entity["type"]) new_key = generate_canonical_key_for_type(entity_type, entity["canonical_name"]) if new_key != entity["canonical_key"]: entity_new_keys[entity["entity_id"]] = new_key # 更新 canonical_key(跳过会导致冲突的实体) skipped_conflicts = 0 if not suggest_only and entity_new_keys: with conn.cursor() as cur: for entity_id, new_key in entity_new_keys.items(): # 使用条件更新来避免唯一约束冲突 # 只有当新 key 不会与其他实体冲突时才更新 cur.execute( """ UPDATE entities SET canonical_key = %s, updated_at = now() WHERE entity_id = %s AND NOT EXISTS ( SELECT 1 FROM entities e2 WHERE e2.type = (SELECT type FROM entities WHERE entity_id = %s) AND e2.canonical_key = %s AND e2.entity_id != %s ) """, (new_key, entity_id, entity_id, new_key, entity_id) ) if cur.rowcount == 0: skipped_conflicts += 1 # 2. 找出重复组 duplicate_groups = query_all( """ SELECT type, canonical_key, array_agg(entity_id ORDER BY confidence DESC, entity_id ASC) AS ids, array_agg(canonical_name ORDER BY confidence DESC, entity_id ASC) AS names, array_agg(confidence ORDER BY confidence DESC, entity_id ASC) AS confidences FROM entities WHERE type = ANY(%s) AND is_locked IS NOT TRUE GROUP BY type, canonical_key HAVING COUNT(*) > 1 ORDER BY COUNT(*) DESC LIMIT %s """, (type_values, max_groups) ) # 3. 处理每个重复组 for group in duplicate_groups: entity_type = EntityType(group["type"]) canonical_key = group["canonical_key"] ids = group["ids"] if len(ids) < 2: continue winner_id = ids[0] # confidence 最高的 loser_ids = ids[1:] suggestion = MergeSuggestion( type=entity_type, canonical_key=canonical_key, winner_entity_id=winner_id, merged_entity_ids=loser_ids, ) suggestions.append(suggestion) if not suggest_only: execute_merge(conn, winner_id, loser_ids, "auto_canonicalize") merged_groups += 1 merged_entities += len(loser_ids) return CanonicalizeEntitiesOut( executed=not suggest_only, merged_groups=merged_groups, merged_entities=merged_entities, suggestions=suggestions, ).model_dump() except Exception as e: return CanonicalizeEntitiesOut( executed=False, merged_groups=0, merged_entities=0, error=MCPErrorModel(code="DB_CONN_ERROR", message=str(e)), ).model_dump()
- Pydantic schema definitions for canonicalize_entities_v1: input model (CanonicalizeEntitiesIn), output model (CanonicalizeEntitiesOut), and supporting MergeSuggestion model for validation and serialization.class CanonicalizeEntitiesIn(BaseModel): """canonicalize_entities_v1 输入""" types: list[EntityType] = Field(default_factory=lambda: [ EntityType.Topic, EntityType.MeasureProxy, EntityType.IdentificationStrategy, EntityType.Method, ]) suggest_only: bool = False max_groups: int = 5000 class MergeSuggestion(BaseModel): """合并建议""" type: EntityType canonical_key: str winner_entity_id: int merged_entity_ids: list[int] class CanonicalizeEntitiesOut(BaseModel): """canonicalize_entities_v1 输出""" executed: bool merged_groups: int merged_entities: int suggestions: list[MergeSuggestion] = Field(default_factory=list) error: Optional[MCPErrorModel] = None
- src/paperlib_mcp/server.py:41-41 (registration)Registration of the graph_canonicalize tools module, which includes the canonicalize_entities_v1 tool, by calling register_graph_canonicalize_tools(mcp) in the main MCP server initialization.register_graph_canonicalize_tools(mcp)
- Helper function to generate type-specific canonical keys for entities by normalizing the name and prefixing with type/family identifiers.def generate_canonical_key_for_type(entity_type: EntityType, canonical_name: str) -> str: """根据实体类型生成规范键""" alias_norm = normalize_alias(canonical_name) if entity_type == EntityType.Paper: return canonical_name # Paper 的 key 就是 doc_id elif entity_type == EntityType.Topic: return f"topic|{alias_norm}" elif entity_type == EntityType.MeasureProxy: # 简化处理:使用归一化名称 return f"measure|{alias_norm}" elif entity_type == EntityType.IdentificationStrategy: id_family = identify_id_family(canonical_name) return f"id|{id_family}|{alias_norm}" elif entity_type == EntityType.Method: # Method 使用第一个词作为 family method_family = alias_norm.split()[0] if alias_norm else "other" return f"method|{method_family}" elif entity_type == EntityType.Setting: return f"setting|{alias_norm}" elif entity_type == EntityType.DataSource: return f"data|{alias_norm}" elif entity_type == EntityType.Mechanism: return f"mechanism|{alias_norm}" elif entity_type == EntityType.LimitationGap: return f"gap|{alias_norm}" else: return f"other|{alias_norm}"
- Database helper to execute entity merges by remapping mentions, relations, aliases to the winner entity, logging the merge, and deleting losers.def execute_merge(conn, winner_id: int, loser_ids: list[int], reason: str = "auto_canonicalize"): """执行实体合并""" with conn.cursor() as cur: # 1. Remap mentions cur.execute( "UPDATE mentions SET entity_id = %s WHERE entity_id = ANY(%s)", (winner_id, loser_ids) ) # 2. Remap relations (both subj and obj) cur.execute( "UPDATE relations SET subj_entity_id = %s WHERE subj_entity_id = ANY(%s)", (winner_id, loser_ids) ) cur.execute( "UPDATE relations SET obj_entity_id = %s WHERE obj_entity_id = ANY(%s)", (winner_id, loser_ids) ) # 3. Remap aliases cur.execute( "UPDATE entity_aliases SET entity_id = %s WHERE entity_id = ANY(%s)", (winner_id, loser_ids) ) # 4. 记录合并日志 for loser_id in loser_ids: cur.execute( """ INSERT INTO entity_merge_log(from_entity_id, to_entity_id, reason) VALUES (%s, %s, %s) """, (loser_id, winner_id, reason) ) # 5. 删除被合并的实体 cur.execute( "DELETE FROM entities WHERE entity_id = ANY(%s)", (loser_ids,) )