export_evidence_matrix_v1
Export PaperMatrix and ClaimMatrix tables to analyze academic evidence for literature reviews. Specify community ID or topic to generate structured data in JSON or CSV format.
Instructions
导出证据矩阵
导出 PaperMatrix(论文级)和 ClaimMatrix(结论级)两张表。
Args: comm_id: 社区 ID(与 topic 二选一) topic: 主题名称或 canonical_key(与 comm_id 二选一) format: 输出格式,"json" 或 "csv" limit_docs: 限制文档数量
Returns: paper_matrix 和 claim_matrix
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| comm_id | No | ||
| topic | No | ||
| format | No | json | |
| limit_docs | No |
Implementation Reference
- Core handler function that executes the tool logic: determines relevant documents from community or topic, queries relations and claims to build paper_matrix (doc metadata + attributes) and claim_matrix, returns structured JSON.@mcp.tool() def export_evidence_matrix_v1( comm_id: int | None = None, topic: str | None = None, format: str = "json", limit_docs: int | None = None, ) -> dict[str, Any]: """导出证据矩阵 导出 PaperMatrix(论文级)和 ClaimMatrix(结论级)两张表。 Args: comm_id: 社区 ID(与 topic 二选一) topic: 主题名称或 canonical_key(与 comm_id 二选一) format: 输出格式,"json" 或 "csv" limit_docs: 限制文档数量 Returns: paper_matrix 和 claim_matrix """ try: if not comm_id and not topic: return ExportEvidenceMatrixOut( error=MCPErrorModel(code="VALIDATION_ERROR", message="Must provide either comm_id or topic"), ).model_dump() # 确定文档集合 if comm_id: # 从社区获取文档 doc_ids_result = query_all( """ SELECT DISTINCT m.doc_id FROM community_members cm JOIN mentions m ON m.entity_id = cm.entity_id WHERE cm.comm_id = %s """, (comm_id,) ) else: # 从主题获取文档 # 先查找 topic entity topic_entity = query_one( """ SELECT entity_id FROM entities WHERE type = 'Topic' AND (canonical_name ILIKE %s OR canonical_key ILIKE %s) LIMIT 1 """, (f"%{topic}%", f"%{topic}%") ) if not topic_entity: return ExportEvidenceMatrixOut( error=MCPErrorModel(code="NOT_FOUND", message=f"Topic '{topic}' not found"), ).model_dump() doc_ids_result = query_all( """ SELECT DISTINCT m.doc_id FROM mentions m WHERE m.entity_id = %s """, (topic_entity["entity_id"],) ) doc_ids = [r["doc_id"] for r in doc_ids_result] if not doc_ids: return ExportEvidenceMatrixOut( error=MCPErrorModel(code="NOT_FOUND", message="No documents found"), ).model_dump() if limit_docs: doc_ids = doc_ids[:limit_docs] # ===== PaperMatrix ===== # 获取文档元数据 docs = query_all( """ SELECT doc_id, title, authors, year, venue, doi FROM documents WHERE doc_id = ANY(%s) """, (doc_ids,) ) doc_meta = {d["doc_id"]: d for d in docs} # 获取每个文档的 topics doc_topics = query_all( """ SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS topic FROM relations r JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper' JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Topic' WHERE r.predicate = 'PAPER_HAS_TOPIC' AND p.canonical_key = ANY(%s) """, (doc_ids,) ) topics_by_doc: dict[str, list[str]] = defaultdict(list) for r in doc_topics: topics_by_doc[r["doc_id"]].append(r["topic"]) # 获取每个文档的 measures doc_measures = query_all( """ SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS measure FROM relations r JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper' JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'MeasureProxy' WHERE r.predicate = 'PAPER_USES_MEASURE' AND p.canonical_key = ANY(%s) """, (doc_ids,) ) measures_by_doc: dict[str, list[str]] = defaultdict(list) for r in doc_measures: measures_by_doc[r["doc_id"]].append(r["measure"]) # 获取每个文档的 identification strategies doc_ids_strat = query_all( """ SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS id_strategy FROM relations r JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper' JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'IdentificationStrategy' WHERE r.predicate = 'PAPER_IDENTIFIES_WITH' AND p.canonical_key = ANY(%s) """, (doc_ids,) ) ids_by_doc: dict[str, list[str]] = defaultdict(list) for r in doc_ids_strat: ids_by_doc[r["doc_id"]].append(r["id_strategy"]) # 获取每个文档的 methods doc_methods = query_all( """ SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS method FROM relations r JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper' JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Method' WHERE r.predicate = 'PAPER_USES_METHOD' AND p.canonical_key = ANY(%s) """, (doc_ids,) ) methods_by_doc: dict[str, list[str]] = defaultdict(list) for r in doc_methods: methods_by_doc[r["doc_id"]].append(r["method"]) # 获取每个文档的 settings doc_settings = query_all( """ SELECT DISTINCT p.canonical_key AS doc_id, x.canonical_name AS setting FROM relations r JOIN entities p ON p.entity_id = r.subj_entity_id AND p.type = 'Paper' JOIN entities x ON x.entity_id = r.obj_entity_id AND x.type = 'Setting' WHERE r.predicate = 'PAPER_IN_SETTING' AND p.canonical_key = ANY(%s) """, (doc_ids,) ) settings_by_doc: dict[str, list[str]] = defaultdict(list) for r in doc_settings: settings_by_doc[r["doc_id"]].append(r["setting"]) # 获取每个文档的 top claims doc_claims = query_all( """ SELECT doc_id, claim_text, sign, confidence, chunk_id FROM claims WHERE doc_id = ANY(%s) ORDER BY doc_id, confidence DESC """, (doc_ids,) ) claims_by_doc: dict[str, list[dict]] = defaultdict(list) for c in doc_claims: if len(claims_by_doc[c["doc_id"]]) < 3: # Top 3 claims per doc claims_by_doc[c["doc_id"]].append({ "claim_text": c["claim_text"], "sign": c["sign"], "chunk_id": c["chunk_id"], }) # 构建 PaperMatrix paper_matrix = [] for doc_id in doc_ids: meta = doc_meta.get(doc_id, {}) paper_matrix.append({ "doc_id": doc_id, "title": meta.get("title", ""), "authors": meta.get("authors", ""), "year": meta.get("year"), "venue": meta.get("venue", ""), "topics": topics_by_doc.get(doc_id, []), "measures": measures_by_doc.get(doc_id, []), "identification_strategies": ids_by_doc.get(doc_id, []), "methods": methods_by_doc.get(doc_id, []), "settings": settings_by_doc.get(doc_id, []), "top_claims": claims_by_doc.get(doc_id, []), }) # ===== ClaimMatrix ===== all_claims = query_all( """ SELECT c.claim_id, c.doc_id, c.chunk_id, c.claim_text, c.sign, c.effect_size_text, c.conditions, c.confidence, ch.page_start, ch.page_end FROM claims c JOIN chunks ch ON ch.chunk_id = c.chunk_id WHERE c.doc_id = ANY(%s) ORDER BY c.confidence DESC """, (doc_ids,) ) claim_matrix = [] for claim in all_claims: doc_id = claim["doc_id"] claim_matrix.append({ "claim_id": claim["claim_id"], "doc_id": doc_id, "chunk_id": claim["chunk_id"], "claim_text": claim["claim_text"], "sign": claim["sign"], "effect_size_text": claim["effect_size_text"], "conditions": claim["conditions"], "confidence": claim["confidence"], "page_start": claim["page_start"], "page_end": claim["page_end"], # 补充 doc 级信息 "topics": topics_by_doc.get(doc_id, []), "identification_strategies": ids_by_doc.get(doc_id, []), }) return ExportEvidenceMatrixOut( paper_matrix=paper_matrix, claim_matrix=claim_matrix, ).model_dump() except Exception as e: return ExportEvidenceMatrixOut( error=MCPErrorModel(code="DB_CONN_ERROR", message=str(e)), ).model_dump()
- Pydantic models defining the input parameters and output structure (paper_matrix and claim_matrix lists) for validation and typing.# export_evidence_matrix_v1 工具模型 # ============================================================ class ExportEvidenceMatrixIn(BaseModel): """export_evidence_matrix_v1 输入""" comm_id: Optional[int] = None topic: Optional[str] = None # topic 名称或 canonical_key format: Literal["json", "csv"] = "json" limit_docs: Optional[int] = None class ExportEvidenceMatrixOut(BaseModel): """export_evidence_matrix_v1 输出""" paper_matrix: list[dict[str, Any]] = Field(default_factory=list) claim_matrix: list[dict[str, Any]] = Field(default_factory=list) error: Optional[MCPErrorModel] = None
- src/paperlib_mcp/server.py:43-43 (registration)Invocation of register_graph_summarize_tools(mcp) in the main MCP server setup, which defines and registers the export_evidence_matrix_v1 tool using @mcp.tool() decorator.register_graph_summarize_tools(mcp)