Skip to main content
Glama

canonicalize_relations_v1

Normalize and deduplicate academic relationships in knowledge graphs while preserving all evidence sources. Use to maintain consistent connections between research papers and authors.

Instructions

规范化关系并合并重复项,保留所有证据。

Args: scope: 处理范围,"all", "doc_id:...", "comm_id:..." predicate_whitelist: 只处理这些谓词 qualifier_keys_keep: 规范化时保留哪些 qualifier 字段 dry_run: 仅计算建议,不写入数据库

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
scopeNoall
predicate_whitelistNo
qualifier_keys_keepNo
dry_runNo

Implementation Reference

  • The main handler function for canonicalize_relations_v1 tool. It queries relations based on scope, normalizes qualifiers, computes canonical hashes, upserts into canonical_relations table, and inserts evidence records while handling dry_run mode and errors.
    @mcp.tool()
    def canonicalize_relations_v1(
        scope: str = "all",
        predicate_whitelist: list[str] | None = None,
        qualifier_keys_keep: list[str] | None = None,
        dry_run: bool = False,
    ) -> dict[str, Any]:
        """规范化关系并合并重复项,保留所有证据。
        
        Args:
            scope: 处理范围,"all", "doc_id:...", "comm_id:..."
            predicate_whitelist: 只处理这些谓词
            qualifier_keys_keep: 规范化时保留哪些 qualifier 字段
            dry_run: 仅计算建议,不写入数据库
        """
        try:
            if qualifier_keys_keep is None:
                qualifier_keys_keep = ["id_family", "measure_family", "setting_country", "sample_period_bin"]
    
            # 1. 构建查询范围
            params = []
            where_clauses = []
            
            if scope.startswith("doc_id:"):
                doc_id = scope.split(":", 1)[1]
                # 这里假设关系与文档的关联是通过 evidence 字段里的 doc_id
                # 或者通过关系关联的 subj/obj (Paper entity)
                where_clauses.append("r.evidence->>'doc_id' = %s")
                params.append(doc_id)
            elif scope.startswith("comm_id:"):
                comm_id = int(scope.split(":", 1)[1])
                where_clauses.append("""
                    EXISTS (
                        SELECT 1 FROM community_members cm 
                        WHERE cm.comm_id = %s AND (cm.entity_id = r.subj_entity_id OR cm.entity_id = r.obj_entity_id)
                    )
                """)
                params.append(comm_id)
            
            if predicate_whitelist:
                where_clauses.append("r.predicate = ANY(%s)")
                params.append(predicate_whitelist)
            
            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
            
            # 2. 获取原始数据
            sql = f"""
                SELECT rel_id, subj_entity_id, predicate, obj_entity_id, qualifiers, confidence, evidence 
                FROM relations r
                {where_sql}
            """
            relations = query_all(sql, tuple(params))
            
            if not relations:
                return CanonicalizeRelationsOut(
                    new_canonical_relations=0,
                    new_evidence_records=0,
                    skipped_relations=0
                ).model_dump()
    
            # 3. 规范化并分组
            stats = {"new_canonical": 0, "new_evidence": 0, "skipped": 0}
            
            if dry_run:
                # 仅统计
                keys = set()
                for r in relations:
                    q_json = normalize_qualifiers(r["qualifiers"], qualifier_keys_keep)
                    key = compute_canonical_rel_hash(r["subj_entity_id"], r["predicate"], r["obj_entity_id"], q_json)
                    keys.add(key)
                return CanonicalizeRelationsOut(
                    new_canonical_relations=len(keys),
                    new_evidence_records=len(relations),
                    skipped_relations=0
                ).model_dump()
    
            # 4. 执行写入
            with get_db() as conn:
                for r in relations:
                    # 使用子事务(SAVEPOINT),即使某几条关系报错也不影响整批
                    try:
                        with conn.cursor() as cur:
                            with conn.transaction():
                                # 规范化 qualifiers (v1.1)
                                q_json = normalize_qualifiers(r["qualifiers"], qualifier_keys_keep)
                                key = compute_canonical_rel_hash(r["subj_entity_id"], r["predicate"], r["obj_entity_id"], q_json)
                                
                                # UPSERT canonical_relations
                                cur.execute("""
                                    INSERT INTO canonical_relations (subj_entity_id, predicate_norm, obj_entity_id, qualifiers_norm, canonical_key)
                                    VALUES (%s, %s, %s, %s::jsonb, %s)
                                    ON CONFLICT (canonical_key) DO UPDATE SET created_at = EXCLUDED.created_at
                                    RETURNING canon_rel_id
                                """, (r["subj_entity_id"], r["predicate"], r["obj_entity_id"], q_json, key))
                                canon_rel_id = cur.fetchone()["canon_rel_id"]
                                
                                # 插入证据
                                evidence_data = r["evidence"] or {}
                                doc_id = evidence_data.get("doc_id")
                                chunk_id = evidence_data.get("chunk_id")
                                quote = evidence_data.get("quote")
                                
                                cur.execute("""
                                    INSERT INTO canonical_relation_evidence (canon_rel_id, doc_id, chunk_id, quote, confidence, source_rel_id)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON CONFLICT (canon_rel_id, source_rel_id) DO NOTHING
                                """, (canon_rel_id, doc_id, chunk_id, quote, r["confidence"], r["rel_id"]))
                                
                                stats["new_evidence"] += 1
                    except Exception as e:
                        print(f"Error processing relation {r['rel_id']}: {e}")
                        stats["skipped"] += 1
                
                # 计算实际新增的 canonical 关系数 (在 commit 前查询)
                with conn.cursor() as cur:
                    cur.execute("SELECT COUNT(*) as count FROM canonical_relations")
                    stats["new_canonical"] = cur.fetchone()["count"]
    
            return CanonicalizeRelationsOut(
                new_canonical_relations=stats["new_canonical"],
                new_evidence_records=stats["new_evidence"],
                skipped_relations=stats["skipped"]
            ).model_dump()
            
        except Exception as e:
            return CanonicalizeRelationsOut(
                new_canonical_relations=0,
                new_evidence_records=0,
                skipped_relations=0,
                error=MCPErrorModel(code="SYSTEM_ERROR", message=str(e))
            ).model_dump()
  • Pydantic models defining input parameters (scope, predicate_whitelist, qualifier_keys_keep, dry_run) and output structure (counts of new canonical relations, evidence, skipped, and error) for the tool.
    # ============================================================
    # canonicalize_relations_v1 工具模型
    # ============================================================
    
    
    class CanonicalizeRelationsIn(BaseModel):
        """canonicalize_relations_v1 输入"""
        scope: Literal["all"] | str = "all"  # "all", "doc_id:...", "comm_id:..."
        predicate_whitelist: Optional[list[str]] = None
        qualifier_keys_keep: list[str] = ["id_family", "measure_family", "setting_country", "sample_period_bin"]
        dry_run: bool = False
    
    
    class CanonicalizeRelationsOut(BaseModel):
        """canonicalize_relations_v1 输出"""
        new_canonical_relations: int
        new_evidence_records: int
        skipped_relations: int
        error: Optional[MCPErrorModel] = None
  • Registers the graph relation canonicalization tools (including canonicalize_relations_v1) with the main FastMCP server instance.
    register_graph_relation_canonicalize_tools(mcp)
  • Helper functions to compute canonical hash for relations using SHA256 with unit separator, and to normalize qualifiers by filtering to whitelist keys, sorting, and compact JSON serialization.
    def compute_canonical_rel_hash(subj_id: int, predicate: str, obj_id: int, qualifiers_json: str) -> str:
        """计算规范化关系哈希 (v1.1: 使用明确分隔符)"""
        # 使用 \x1f (Unit Separator) 避免拼接歧义
        content = f"{subj_id}\x1f{predicate}\x1f{obj_id}\x1f{qualifiers_json}"
        return hashlib.sha256(content.encode()).hexdigest()
    
    
    def normalize_qualifiers(qualifiers: dict | None, keys_keep: list[str]) -> str:
        """规范化 qualifiers 为稳定的 JSON 字符串 (v1.1)"""
        if not qualifiers:
            return "{}"
        # 只保留白名单字段,递归排序,紧凑格式
        filtered = {k: qualifiers[k] for k in sorted(keys_keep) if k in qualifiers and qualifiers[k] not in (None, "", [], {})}
        return json.dumps(filtered, sort_keys=True, separators=(",", ":"), ensure_ascii=False)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server