Skip to main content
Glama

canonicalize_entities_v1

Standardize and merge duplicate entities in academic literature by processing specified entity types and consolidating entries with identical canonical keys.

Instructions

规范化并合并重复实体

对指定类型的实体进行规范化处理,合并同一 canonical_key 的重复实体。

Args: types: 要处理的实体类型列表,默认 ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"] suggest_only: 是否只返回建议而不执行合并,默认 False max_groups: 最大处理组数,默认 5000

Returns: 合并统计信息和建议列表

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
typesNo
suggest_onlyNo
max_groupsNo

Implementation Reference

  • The core handler function for the canonicalize_entities_v1 tool. It normalizes canonical_keys for entities of specified types, identifies duplicates, and optionally merges them into the highest-confidence entity per group, updating mentions, relations, and aliases in the database.
    @mcp.tool()
    def canonicalize_entities_v1(
        types: list[str] | None = None,
        suggest_only: bool = False,
        max_groups: int = 5000,
    ) -> dict[str, Any]:
        """规范化并合并重复实体
        
        对指定类型的实体进行规范化处理,合并同一 canonical_key 的重复实体。
        
        Args:
            types: 要处理的实体类型列表,默认 ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"]
            suggest_only: 是否只返回建议而不执行合并,默认 False
            max_groups: 最大处理组数,默认 5000
            
        Returns:
            合并统计信息和建议列表
        """
        try:
            # 默认处理的类型
            if types is None:
                types = ["Topic", "MeasureProxy", "IdentificationStrategy", "Method"]
            
            # 验证类型
            valid_types = []
            for t in types:
                try:
                    valid_types.append(EntityType(t))
                except ValueError:
                    pass
            
            if not valid_types:
                return CanonicalizeEntitiesOut(
                    executed=False,
                    merged_groups=0,
                    merged_entities=0,
                    error=MCPErrorModel(code="VALIDATION_ERROR", message="No valid entity types provided"),
                ).model_dump()
            
            type_values = [t.value for t in valid_types]
            
            suggestions: list[MergeSuggestion] = []
            merged_groups = 0
            merged_entities = 0
            
            with get_db() as conn:
                # 1. 首先更新所有实体的 canonical_key(基于新的规范化规则)
                entities = query_all(
                    """
                    SELECT entity_id, type, canonical_name, canonical_key, confidence, is_locked
                    FROM entities
                    WHERE type = ANY(%s) AND is_locked IS NOT TRUE
                    ORDER BY entity_id
                    """,
                    (type_values,)
                )
                
                # 计算新的 canonical_key
                entity_new_keys: dict[int, str] = {}
                for entity in entities:
                    entity_type = EntityType(entity["type"])
                    new_key = generate_canonical_key_for_type(entity_type, entity["canonical_name"])
                    if new_key != entity["canonical_key"]:
                        entity_new_keys[entity["entity_id"]] = new_key
                
                # 更新 canonical_key(跳过会导致冲突的实体)
                skipped_conflicts = 0
                if not suggest_only and entity_new_keys:
                    with conn.cursor() as cur:
                        for entity_id, new_key in entity_new_keys.items():
                            # 使用条件更新来避免唯一约束冲突
                            # 只有当新 key 不会与其他实体冲突时才更新
                            cur.execute(
                                """
                                UPDATE entities SET canonical_key = %s, updated_at = now()
                                WHERE entity_id = %s
                                AND NOT EXISTS (
                                    SELECT 1 FROM entities e2
                                    WHERE e2.type = (SELECT type FROM entities WHERE entity_id = %s)
                                    AND e2.canonical_key = %s
                                    AND e2.entity_id != %s
                                )
                                """,
                                (new_key, entity_id, entity_id, new_key, entity_id)
                            )
                            if cur.rowcount == 0:
                                skipped_conflicts += 1
                
                # 2. 找出重复组
                duplicate_groups = query_all(
                    """
                    SELECT type, canonical_key, 
                           array_agg(entity_id ORDER BY confidence DESC, entity_id ASC) AS ids,
                           array_agg(canonical_name ORDER BY confidence DESC, entity_id ASC) AS names,
                           array_agg(confidence ORDER BY confidence DESC, entity_id ASC) AS confidences
                    FROM entities
                    WHERE type = ANY(%s) AND is_locked IS NOT TRUE
                    GROUP BY type, canonical_key
                    HAVING COUNT(*) > 1
                    ORDER BY COUNT(*) DESC
                    LIMIT %s
                    """,
                    (type_values, max_groups)
                )
                
                # 3. 处理每个重复组
                for group in duplicate_groups:
                    entity_type = EntityType(group["type"])
                    canonical_key = group["canonical_key"]
                    ids = group["ids"]
                    
                    if len(ids) < 2:
                        continue
                    
                    winner_id = ids[0]  # confidence 最高的
                    loser_ids = ids[1:]
                    
                    suggestion = MergeSuggestion(
                        type=entity_type,
                        canonical_key=canonical_key,
                        winner_entity_id=winner_id,
                        merged_entity_ids=loser_ids,
                    )
                    suggestions.append(suggestion)
                    
                    if not suggest_only:
                        execute_merge(conn, winner_id, loser_ids, "auto_canonicalize")
                        merged_groups += 1
                        merged_entities += len(loser_ids)
            
            return CanonicalizeEntitiesOut(
                executed=not suggest_only,
                merged_groups=merged_groups,
                merged_entities=merged_entities,
                suggestions=suggestions,
            ).model_dump()
            
        except Exception as e:
            return CanonicalizeEntitiesOut(
                executed=False,
                merged_groups=0,
                merged_entities=0,
                error=MCPErrorModel(code="DB_CONN_ERROR", message=str(e)),
            ).model_dump()
  • Pydantic schema definitions for canonicalize_entities_v1: input model (CanonicalizeEntitiesIn), output model (CanonicalizeEntitiesOut), and supporting MergeSuggestion model for validation and serialization.
    class CanonicalizeEntitiesIn(BaseModel):
        """canonicalize_entities_v1 输入"""
        types: list[EntityType] = Field(default_factory=lambda: [
            EntityType.Topic,
            EntityType.MeasureProxy,
            EntityType.IdentificationStrategy,
            EntityType.Method,
        ])
        suggest_only: bool = False
        max_groups: int = 5000
    
    
    class MergeSuggestion(BaseModel):
        """合并建议"""
        type: EntityType
        canonical_key: str
        winner_entity_id: int
        merged_entity_ids: list[int]
    
    
    class CanonicalizeEntitiesOut(BaseModel):
        """canonicalize_entities_v1 输出"""
        executed: bool
        merged_groups: int
        merged_entities: int
        suggestions: list[MergeSuggestion] = Field(default_factory=list)
        error: Optional[MCPErrorModel] = None
  • Registration of the graph_canonicalize tools module, which includes the canonicalize_entities_v1 tool, by calling register_graph_canonicalize_tools(mcp) in the main MCP server initialization.
    register_graph_canonicalize_tools(mcp)
  • Helper function to generate type-specific canonical keys for entities by normalizing the name and prefixing with type/family identifiers.
    def generate_canonical_key_for_type(entity_type: EntityType, canonical_name: str) -> str:
        """根据实体类型生成规范键"""
        alias_norm = normalize_alias(canonical_name)
        
        if entity_type == EntityType.Paper:
            return canonical_name  # Paper 的 key 就是 doc_id
        elif entity_type == EntityType.Topic:
            return f"topic|{alias_norm}"
        elif entity_type == EntityType.MeasureProxy:
            # 简化处理:使用归一化名称
            return f"measure|{alias_norm}"
        elif entity_type == EntityType.IdentificationStrategy:
            id_family = identify_id_family(canonical_name)
            return f"id|{id_family}|{alias_norm}"
        elif entity_type == EntityType.Method:
            # Method 使用第一个词作为 family
            method_family = alias_norm.split()[0] if alias_norm else "other"
            return f"method|{method_family}"
        elif entity_type == EntityType.Setting:
            return f"setting|{alias_norm}"
        elif entity_type == EntityType.DataSource:
            return f"data|{alias_norm}"
        elif entity_type == EntityType.Mechanism:
            return f"mechanism|{alias_norm}"
        elif entity_type == EntityType.LimitationGap:
            return f"gap|{alias_norm}"
        else:
            return f"other|{alias_norm}"
  • Database helper to execute entity merges by remapping mentions, relations, aliases to the winner entity, logging the merge, and deleting losers.
    def execute_merge(conn, winner_id: int, loser_ids: list[int], reason: str = "auto_canonicalize"):
        """执行实体合并"""
        with conn.cursor() as cur:
            # 1. Remap mentions
            cur.execute(
                "UPDATE mentions SET entity_id = %s WHERE entity_id = ANY(%s)",
                (winner_id, loser_ids)
            )
            
            # 2. Remap relations (both subj and obj)
            cur.execute(
                "UPDATE relations SET subj_entity_id = %s WHERE subj_entity_id = ANY(%s)",
                (winner_id, loser_ids)
            )
            cur.execute(
                "UPDATE relations SET obj_entity_id = %s WHERE obj_entity_id = ANY(%s)",
                (winner_id, loser_ids)
            )
            
            # 3. Remap aliases
            cur.execute(
                "UPDATE entity_aliases SET entity_id = %s WHERE entity_id = ANY(%s)",
                (winner_id, loser_ids)
            )
            
            # 4. 记录合并日志
            for loser_id in loser_ids:
                cur.execute(
                    """
                    INSERT INTO entity_merge_log(from_entity_id, to_entity_id, reason)
                    VALUES (%s, %s, %s)
                    """,
                    (loser_id, winner_id, reason)
                )
            
            # 5. 删除被合并的实体
            cur.execute(
                "DELETE FROM entities WHERE entity_id = ANY(%s)",
                (loser_ids,)
            )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server