Skip to main content
Glama

export_evidence_matrix_v1

Export PaperMatrix and ClaimMatrix tables to analyze academic evidence for literature reviews. Specify community ID or topic to generate structured data in JSON or CSV format.

Instructions

导出证据矩阵

导出 PaperMatrix(论文级)和 ClaimMatrix(结论级)两张表。

Args: comm_id: 社区 ID(与 topic 二选一) topic: 主题名称或 canonical_key(与 comm_id 二选一) format: 输出格式,"json" 或 "csv" limit_docs: 限制文档数量

Returns: paper_matrix 和 claim_matrix

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
comm_idNo
topicNo
formatNojson
limit_docsNo

Implementation Reference

  • Core handler function that executes the tool logic: determines relevant documents from community or topic, queries relations and claims to build paper_matrix (doc metadata + attributes) and claim_matrix, returns structured JSON.
    @mcp.tool()
    def export_evidence_matrix_v1(
        comm_id: int | None = None,
        topic: str | None = None,
        format: str = "json",
        limit_docs: int | None = None,
    ) -> dict[str, Any]:
        """导出证据矩阵
        
        导出 PaperMatrix(论文级)和 ClaimMatrix(结论级)两张表。
        
        Args:
            comm_id: 社区 ID(与 topic 二选一)
            topic: 主题名称或 canonical_key(与 comm_id 二选一)
            format: 输出格式,"json" 或 "csv"
            limit_docs: 限制文档数量
            
        Returns:
            paper_matrix 和 claim_matrix
        """
        try:
            if not comm_id and not topic:
                return ExportEvidenceMatrixOut(
                    error=MCPErrorModel(code="VALIDATION_ERROR", message="Must provide either comm_id or topic"),
                ).model_dump()
            
            # 确定文档集合
            if comm_id:
                # 从社区获取文档
                doc_ids_result = query_all(
                    """
                    SELECT DISTINCT m.doc_id
                    FROM community_members cm
                    JOIN mentions m ON m.entity_id = cm.entity_id
                    WHERE cm.comm_id = %s
                    """,
                    (comm_id,)
                )
            else:
                # 从主题获取文档
                # 先查找 topic entity
                topic_entity = query_one(
                    """
                    SELECT entity_id FROM entities
                    WHERE type = 'Topic' AND (canonical_name ILIKE %s OR canonical_key ILIKE %s)
                    LIMIT 1
                    """,
                    (f"%{topic}%", f"%{topic}%")
                )
                
                if not topic_entity:
                    return ExportEvidenceMatrixOut(
                        error=MCPErrorModel(code="NOT_FOUND", message=f"Topic '{topic}' not found"),
                    ).model_dump()
                
                doc_ids_result = query_all(
                    """
                    SELECT DISTINCT m.doc_id
                    FROM mentions m
                    WHERE m.entity_id = %s
                    """,
                    (topic_entity["entity_id"],)
                )
            
            doc_ids = [r["doc_id"] for r in doc_ids_result]
            
            if not doc_ids:
                return ExportEvidenceMatrixOut(
                    error=MCPErrorModel(code="NOT_FOUND", message="No documents found"),
                ).model_dump()
            
            if limit_docs:
                doc_ids = doc_ids[:limit_docs]
            
            # ===== PaperMatrix =====
            # 获取文档元数据
            docs = query_all(
                """
                SELECT doc_id, title, authors, year, venue, doi
                FROM documents
                WHERE doc_id = ANY(%s)
                """,
                (doc_ids,)
            )
            doc_meta = {d["doc_id"]: d for d in docs}
            
            # 获取每个文档的 topics
            doc_topics = query_all(
                """
                SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS topic
                FROM relations r
                JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Topic'
                WHERE r.predicate = 'PAPER_HAS_TOPIC' AND p.canonical_key = ANY(%s)
                """,
                (doc_ids,)
            )
            topics_by_doc: dict[str, list[str]] = defaultdict(list)
            for r in doc_topics:
                topics_by_doc[r["doc_id"]].append(r["topic"])
            
            # 获取每个文档的 measures
            doc_measures = query_all(
                """
                SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS measure
                FROM relations r
                JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'MeasureProxy'
                WHERE r.predicate = 'PAPER_USES_MEASURE' AND p.canonical_key = ANY(%s)
                """,
                (doc_ids,)
            )
            measures_by_doc: dict[str, list[str]] = defaultdict(list)
            for r in doc_measures:
                measures_by_doc[r["doc_id"]].append(r["measure"])
            
            # 获取每个文档的 identification strategies
            doc_ids_strat = query_all(
                """
                SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS id_strategy
                FROM relations r
                JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'IdentificationStrategy'
                WHERE r.predicate = 'PAPER_IDENTIFIES_WITH' AND p.canonical_key = ANY(%s)
                """,
                (doc_ids,)
            )
            ids_by_doc: dict[str, list[str]] = defaultdict(list)
            for r in doc_ids_strat:
                ids_by_doc[r["doc_id"]].append(r["id_strategy"])
            
            # 获取每个文档的 methods
            doc_methods = query_all(
                """
                SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS method
                FROM relations r
                JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Method'
                WHERE r.predicate = 'PAPER_USES_METHOD' AND p.canonical_key = ANY(%s)
                """,
                (doc_ids,)
            )
            methods_by_doc: dict[str, list[str]] = defaultdict(list)
            for r in doc_methods:
                methods_by_doc[r["doc_id"]].append(r["method"])
            
            # 获取每个文档的 settings
            doc_settings = query_all(
                """
                SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS setting
                FROM relations r
                JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper'
                JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Setting'
                WHERE r.predicate = 'PAPER_IN_SETTING' AND p.canonical_key = ANY(%s)
                """,
                (doc_ids,)
            )
            settings_by_doc: dict[str, list[str]] = defaultdict(list)
            for r in doc_settings:
                settings_by_doc[r["doc_id"]].append(r["setting"])
            
            # 获取每个文档的 top claims
            doc_claims = query_all(
                """
                SELECT doc_id, claim_text, sign, confidence, chunk_id
                FROM claims
                WHERE doc_id = ANY(%s)
                ORDER BY doc_id, confidence DESC
                """,
                (doc_ids,)
            )
            claims_by_doc: dict[str, list[dict]] = defaultdict(list)
            for c in doc_claims:
                if len(claims_by_doc[c["doc_id"]]) < 3:  # Top 3 claims per doc
                    claims_by_doc[c["doc_id"]].append({
                        "claim_text": c["claim_text"],
                        "sign": c["sign"],
                        "chunk_id": c["chunk_id"],
                    })
            
            # 构建 PaperMatrix
            paper_matrix = []
            for doc_id in doc_ids:
                meta = doc_meta.get(doc_id, {})
                paper_matrix.append({
                    "doc_id": doc_id,
                    "title": meta.get("title", ""),
                    "authors": meta.get("authors", ""),
                    "year": meta.get("year"),
                    "venue": meta.get("venue", ""),
                    "topics": topics_by_doc.get(doc_id, []),
                    "measures": measures_by_doc.get(doc_id, []),
                    "identification_strategies": ids_by_doc.get(doc_id, []),
                    "methods": methods_by_doc.get(doc_id, []),
                    "settings": settings_by_doc.get(doc_id, []),
                    "top_claims": claims_by_doc.get(doc_id, []),
                })
            
            # ===== ClaimMatrix =====
            all_claims = query_all(
                """
                SELECT 
                    c.claim_id,
                    c.doc_id,
                    c.chunk_id,
                    c.claim_text,
                    c.sign,
                    c.effect_size_text,
                    c.conditions,
                    c.confidence,
                    ch.page_start,
                    ch.page_end
                FROM claims c
                JOIN chunks ch ON ch.chunk_id = c.chunk_id
                WHERE c.doc_id = ANY(%s)
                ORDER BY c.confidence DESC
                """,
                (doc_ids,)
            )
            
            claim_matrix = []
            for claim in all_claims:
                doc_id = claim["doc_id"]
                claim_matrix.append({
                    "claim_id": claim["claim_id"],
                    "doc_id": doc_id,
                    "chunk_id": claim["chunk_id"],
                    "claim_text": claim["claim_text"],
                    "sign": claim["sign"],
                    "effect_size_text": claim["effect_size_text"],
                    "conditions": claim["conditions"],
                    "confidence": claim["confidence"],
                    "page_start": claim["page_start"],
                    "page_end": claim["page_end"],
                    # 补充 doc 级信息
                    "topics": topics_by_doc.get(doc_id, []),
                    "identification_strategies": ids_by_doc.get(doc_id, []),
                })
            
            return ExportEvidenceMatrixOut(
                paper_matrix=paper_matrix,
                claim_matrix=claim_matrix,
            ).model_dump()
            
        except Exception as e:
            return ExportEvidenceMatrixOut(
                error=MCPErrorModel(code="DB_CONN_ERROR", message=str(e)),
            ).model_dump()
  • Pydantic models defining the input parameters and output structure (paper_matrix and claim_matrix lists) for validation and typing.
    # export_evidence_matrix_v1 工具模型
    # ============================================================
    
    
    class ExportEvidenceMatrixIn(BaseModel):
        """export_evidence_matrix_v1 输入"""
        comm_id: Optional[int] = None
        topic: Optional[str] = None             # topic 名称或 canonical_key
        format: Literal["json", "csv"] = "json"
        limit_docs: Optional[int] = None
    
    
    class ExportEvidenceMatrixOut(BaseModel):
        """export_evidence_matrix_v1 输出"""
        paper_matrix: list[dict[str, Any]] = Field(default_factory=list)
        claim_matrix: list[dict[str, Any]] = Field(default_factory=list)
        error: Optional[MCPErrorModel] = None
  • Invocation of register_graph_summarize_tools(mcp) in the main MCP server setup, which defines and registers the export_evidence_matrix_v1 tool using @mcp.tool() decorator.
    register_graph_summarize_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server