Skip to main content
Glama
review.py39.2 kB
"""M3 Review Tools: 综述生成与验证工具 实现 M3 阶段的核心功能: 1. generate_review_outline_data_v1 - 生成综述大纲(确定性) 2. build_section_evidence_pack_v1 - 构建章节证据包 3. export_section_packet_v1 - 导出写作输入包 4. lint_section_v1 - 验证章节引用合规 5. compose_full_template_v1 - 生成全文模板 6. lint_review_v1 - 验证全文合规 """ import json import re import uuid from collections import defaultdict from typing import Any from fastmcp import FastMCP from paperlib_mcp.db import get_db, query_all, query_one # ============================================================ # 章节模板定义(经济金融领域标准) # ============================================================ SECTION_TEMPLATES = { "econ_finance_canonical": { "name": "经济金融学经典结构", "sections": [ { "id": "research_question", "title": "研究问题与理论框架", "description": "核心研究问题、理论基础和主要假设", "ord": 1, "keywords": ["research question", "theory", "hypothesis", "framework", "motivation"], "entity_types": ["Topic"], }, { "id": "measurement", "title": "测量与数据", "description": "变量定义、代理变量选择、数据来源", "ord": 2, "keywords": ["measure", "proxy", "variable", "data", "sample", "dataset"], "entity_types": ["MeasureProxy", "DataSource"], }, { "id": "identification", "title": "识别策略", "description": "因果识别方法、内生性处理、工具变量", "ord": 3, "keywords": ["identification", "endogeneity", "instrument", "exogenous", "causal", "IV", "DID", "RDD"], "entity_types": ["IdentificationStrategy", "Method"], }, { "id": "findings", "title": "主要发现", "description": "核心结论、稳健性检验、异质性分析", "ord": 4, "keywords": ["result", "finding", "evidence", "show", "demonstrate", "coefficient", "significant", "effect"], "entity_types": [], # Use claims }, { "id": "debates", "title": "争议与不一致发现", "description": "文献中的分歧、methodological debates", "ord": 5, "keywords": ["debate", "controversy", "inconsistent", "mixed", "contrast", "challenge"], "entity_types": [], # Use claims with conflicting signs }, { "id": "gaps", "title": "研究空白与未来方向", "description": "尚未解决的问题、潜在研究机会", "ord": 6, "keywords": ["gap", "future", "direction", "unexplored", "opportunity", "need", "limitation"], "entity_types": ["LimitationGap"], }, ], }, "general": { "name": "通用文献综述结构", "sections": [ {"id": "background", "title": "背景与动机", "description": "研究领域概述", "ord": 1, "keywords": ["background", "motivation"], "entity_types": []}, {"id": "methodology", "title": "研究方法", "description": "方法论综述", "ord": 2, "keywords": ["method", "approach"], "entity_types": ["Method"]}, {"id": "findings", "title": "主要发现", "description": "核心结论", "ord": 3, "keywords": ["finding", "result"], "entity_types": []}, {"id": "discussion", "title": "讨论", "description": "争议与局限", "ord": 4, "keywords": ["discussion", "limitation"], "entity_types": []}, {"id": "future", "title": "未来方向", "description": "研究空白", "ord": 5, "keywords": ["future", "gap"], "entity_types": ["LimitationGap"]}, ], }, } def get_section_template(outline_style: str) -> dict: """获取章节模板""" return SECTION_TEMPLATES.get(outline_style, SECTION_TEMPLATES["general"]) # ============================================================ # 工具注册 # ============================================================ def register_review_tools(mcp: FastMCP) -> None: """注册 M3 综述工具""" # ---------------------------------------------------------- # Tool 1: generate_review_outline_data_v1 # ---------------------------------------------------------- @mcp.tool() def generate_review_outline_data_v1( topic: str | None = None, comm_ids: list[int] | None = None, outline_style: str = "econ_finance_canonical", rebuild: bool = False, ) -> dict[str, Any]: """生成综述大纲(确定性,无 LLM) 从 topic 或 comm_ids 生成可复现的综述大纲结构,写入数据库。 Args: topic: 综述主题(与 comm_ids 二选一) comm_ids: 社区 ID 列表(与 topic 二选一) outline_style: 大纲样式,默认 "econ_finance_canonical" rebuild: 是否重建已存在的大纲,默认 False Returns: outline_id, topic, sections 列表 """ try: if not topic and not comm_ids: return {"error": "Must provide either topic or comm_ids"} # 确定 topic actual_topic = topic or f"community_{','.join(map(str, comm_ids))}" # 检查是否已存在 if not rebuild: existing = query_one( """ SELECT outline_id, topic, outline_style, sources_json, created_at::text FROM review_outlines WHERE topic = %s AND outline_style = %s ORDER BY created_at DESC LIMIT 1 """, (actual_topic, outline_style), ) if existing: # 获取 sections sections = query_all( """ SELECT section_id, title, description, ord, sources_json, keywords FROM review_outline_sections WHERE outline_id = %s ORDER BY ord """, (existing["outline_id"],), ) return { "outline_id": existing["outline_id"], "topic": existing["topic"], "outline_style": existing["outline_style"], "sources": existing["sources_json"] or {}, "created_at": existing["created_at"], "sections": [ { "section_id": s["section_id"], "title": s["title"], "description": s["description"], "ord": s["ord"], "sources": s["sources_json"] or {}, "keywords": s["keywords"] or [], } for s in sections ], "reused": True, } # 如果只有 topic,通过搜索找到相关 doc_ids,再映射到 comm_ids source_doc_ids = [] source_comm_ids = comm_ids or [] if topic and not comm_ids: # 使用同步 FTS 搜索相关文档(避免 async 警告) from paperlib_mcp.tools.search import search_fts fts_results = search_fts(topic, limit=20) if fts_results: source_doc_ids = list(set(r["doc_id"] for r in fts_results)) # 从 doc_ids 映射到 comm_ids if source_doc_ids: comm_rows = query_all( """ SELECT DISTINCT cm.comm_id FROM community_members cm JOIN entities e ON cm.entity_id = e.entity_id JOIN mentions m ON e.entity_id = m.entity_id WHERE m.doc_id = ANY(%s) """, (source_doc_ids,), ) source_comm_ids = [r["comm_id"] for r in comm_rows] # 生成新的 outline_id outline_id = str(uuid.uuid4()) # 获取模板 template = get_section_template(outline_style) # 写入 outline sources_json = { "doc_ids": source_doc_ids, "comm_ids": source_comm_ids, } with get_db() as conn: with conn.cursor() as cur: # 如果 rebuild,先删除旧的 if rebuild: cur.execute( "DELETE FROM review_outlines WHERE topic = %s AND outline_style = %s", (actual_topic, outline_style), ) # 插入 outline cur.execute( """ INSERT INTO review_outlines (outline_id, topic, outline_style, sources_json) VALUES (%s, %s, %s, %s) """, (outline_id, actual_topic, outline_style, json.dumps(sources_json)), ) # 插入 sections for section in template["sections"]: section_sources = { "entity_types": section.get("entity_types", []), } cur.execute( """ INSERT INTO review_outline_sections (outline_id, section_id, title, description, ord, sources_json, keywords) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( outline_id, section["id"], section["title"], section["description"], section["ord"], json.dumps(section_sources), section.get("keywords", []), ), ) return { "outline_id": outline_id, "topic": actual_topic, "outline_style": outline_style, "sources": sources_json, "sections": [ { "section_id": s["id"], "title": s["title"], "description": s["description"], "ord": s["ord"], "keywords": s.get("keywords", []), } for s in template["sections"] ], "reused": False, } except Exception as e: return {"error": str(e)} # ---------------------------------------------------------- # Tool 2: build_section_evidence_pack_v1 # ---------------------------------------------------------- @mcp.tool() def build_section_evidence_pack_v1( outline_id: str, section_id: str, max_chunks: int = 60, per_doc_limit: int = 4, rebuild: bool = False, ) -> dict[str, Any]: """构建章节证据包 为指定章节生成固定的证据包(可复现)。 Args: outline_id: 大纲 ID section_id: 章节 ID max_chunks: 最大 chunk 数量,默认 60 per_doc_limit: 每篇文档最多 chunk 数,默认 4 rebuild: 是否重建,默认 False Returns: pack_id, chunk_count, doc_count """ try: # 检查缓存 if not rebuild: cached = query_one( """ SELECT pack_id, params, created_at::text FROM review_section_packs WHERE outline_id = %s AND section_id = %s """, (outline_id, section_id), ) if cached: # 获取 pack 统计 stats = query_one( """ SELECT COUNT(*) as chunk_count, COUNT(DISTINCT doc_id) as doc_count FROM evidence_pack_items WHERE pack_id = %s """, (cached["pack_id"],), ) return { "pack_id": cached["pack_id"], "section_id": section_id, "chunk_count": stats["chunk_count"] if stats else 0, "doc_count": stats["doc_count"] if stats else 0, "params": cached["params"], "created_at": cached["created_at"], "reused": True, } # 获取 outline 和 section 信息 outline = query_one( "SELECT topic, sources_json FROM review_outlines WHERE outline_id = %s", (outline_id,), ) if not outline: return {"error": f"Outline not found: {outline_id}"} section = query_one( """ SELECT section_id, title, sources_json, keywords FROM review_outline_sections WHERE outline_id = %s AND section_id = %s """, (outline_id, section_id), ) if not section: return {"error": f"Section not found: {section_id}"} # 根据 section 类型选择 chunks outline_sources = outline["sources_json"] or {} section_sources = section["sources_json"] or {} entity_types = section_sources.get("entity_types", []) keywords = section["keywords"] or [] # 候选 chunk_ids candidate_chunks = [] # 策略 1: 从实体类型对应的 mentions 获取 chunks if entity_types: entity_chunks = query_all( """ SELECT DISTINCT m.chunk_id, m.doc_id, c.page_start, c.page_end FROM mentions m JOIN entities e ON m.entity_id = e.entity_id JOIN chunks c ON m.chunk_id = c.chunk_id WHERE e.type = ANY(%s) ORDER BY m.chunk_id LIMIT %s """, (entity_types, max_chunks * 2), ) candidate_chunks.extend(entity_chunks) # 策略 2: 对于 findings/debates,使用 claims if section_id in ("findings", "debates"): if section_id == "findings": # 高置信度 claims - 使用子查询避免 DISTINCT + ORDER BY 冲突 claim_chunks = query_all( """ SELECT chunk_id, doc_id, page_start, page_end FROM ( SELECT DISTINCT ON (c.chunk_id) c.chunk_id, c.doc_id, ch.page_start, ch.page_end, c.confidence FROM claims c JOIN chunks ch ON c.chunk_id = ch.chunk_id WHERE c.confidence >= 0.7 ORDER BY c.chunk_id, c.confidence DESC ) sub ORDER BY confidence DESC LIMIT %s """, (max_chunks * 2,), ) else: # debates - 找冲突 sign claim_chunks = query_all( """ SELECT DISTINCT c.chunk_id, c.doc_id, ch.page_start, ch.page_end FROM claims c JOIN chunks ch ON c.chunk_id = ch.chunk_id WHERE c.sign IN ('positive', 'negative', 'mixed') ORDER BY c.chunk_id LIMIT %s """, (max_chunks * 2,), ) candidate_chunks.extend(claim_chunks) # 策略 3: 使用关键词搜索 if keywords and len(candidate_chunks) < max_chunks: keyword_pattern = "|".join(keywords) keyword_chunks = query_all( """ SELECT chunk_id, doc_id, page_start, page_end FROM chunks WHERE text ~* %s LIMIT %s """, (keyword_pattern, max_chunks * 2), ) candidate_chunks.extend(keyword_chunks) # 去重 seen = set() unique_chunks = [] for chunk in candidate_chunks: if chunk["chunk_id"] not in seen: seen.add(chunk["chunk_id"]) unique_chunks.append(chunk) # 应用 per_doc_limit doc_counts: dict[str, int] = defaultdict(int) filtered_chunks = [] for chunk in unique_chunks: doc_id = chunk["doc_id"] if doc_counts[doc_id] < per_doc_limit: filtered_chunks.append(chunk) doc_counts[doc_id] += 1 if len(filtered_chunks) >= max_chunks: break # 创建 evidence pack params = { "max_chunks": max_chunks, "per_doc_limit": per_doc_limit, "section_id": section_id, } with get_db() as conn: with conn.cursor() as cur: # 创建 pack cur.execute( """ INSERT INTO evidence_packs (query, params_json) VALUES (%s, %s) RETURNING pack_id """, (f"section:{section_id}:{outline_id}", json.dumps(params)), ) pack_id = cur.fetchone()["pack_id"] # 插入 items for rank, chunk in enumerate(filtered_chunks): cur.execute( """ INSERT INTO evidence_pack_items (pack_id, doc_id, chunk_id, rank) VALUES (%s, %s, %s, %s) """, (pack_id, chunk["doc_id"], chunk["chunk_id"], rank), ) # 删除旧缓存 cur.execute( "DELETE FROM review_section_packs WHERE outline_id = %s AND section_id = %s", (outline_id, section_id), ) # 缓存新映射 cur.execute( """ INSERT INTO review_section_packs (outline_id, section_id, pack_id, params) VALUES (%s, %s, %s, %s) """, (outline_id, section_id, pack_id, json.dumps(params)), ) return { "pack_id": pack_id, "section_id": section_id, "chunk_count": len(filtered_chunks), "doc_count": len(doc_counts), "params": params, "reused": False, } except Exception as e: return {"error": str(e)} # ---------------------------------------------------------- # Tool 3: export_section_packet_v1 # ---------------------------------------------------------- @mcp.tool() def export_section_packet_v1(pack_id: int) -> dict[str, Any]: """导出章节写作输入包 生成包含所有必要信息的 JSON,供 Agent 写作使用。 Args: pack_id: 证据包 ID Returns: evidence[], paper_matrix[], claim_matrix[], doc_citations[] """ try: # 获取 pack 信息 pack = query_one( "SELECT pack_id, query, params_json FROM evidence_packs WHERE pack_id = %s", (pack_id,), ) if not pack: return {"error": f"Pack not found: {pack_id}"} # 获取所有 chunk 内容 chunks = query_all( """ SELECT epi.doc_id, epi.chunk_id, epi.rank, c.text, c.page_start, c.page_end, d.title, d.authors, d.year FROM evidence_pack_items epi JOIN chunks c ON epi.chunk_id = c.chunk_id JOIN documents d ON epi.doc_id = d.doc_id WHERE epi.pack_id = %s ORDER BY epi.rank """, (pack_id,), ) # 构建 evidence 列表 evidence = [] doc_ids = set() chunk_ids = [] for chunk in chunks: doc_ids.add(chunk["doc_id"]) chunk_ids.append(chunk["chunk_id"]) evidence.append({ "doc_id": chunk["doc_id"], "chunk_id": chunk["chunk_id"], "page_start": chunk["page_start"], "page_end": chunk["page_end"], "text": chunk["text"], "title": chunk["title"], "authors": chunk["authors"], "year": chunk["year"], "citation_anchor": f"[[chunk:{chunk['chunk_id']}]]", }) doc_ids_list = list(doc_ids) # 构建 paper_matrix paper_matrix = [] for doc_id in doc_ids_list: doc = query_one( "SELECT doc_id, title, authors, year FROM documents WHERE doc_id = %s", (doc_id,), ) if doc: # 获取该文档关联的实体 entities = query_all( """ SELECT DISTINCT e.type, e.canonical_name FROM entities e JOIN mentions m ON e.entity_id = m.entity_id WHERE m.doc_id = %s """, (doc_id,), ) entity_by_type: dict[str, list[str]] = defaultdict(list) for ent in entities: entity_by_type[ent["type"]].append(ent["canonical_name"]) paper_matrix.append({ "doc_id": doc_id, "title": doc["title"], "authors": doc["authors"], "year": doc["year"], "topics": entity_by_type.get("Topic", []), "measures": entity_by_type.get("MeasureProxy", []), "identification_strategies": entity_by_type.get("IdentificationStrategy", []), "methods": entity_by_type.get("Method", []), "settings": entity_by_type.get("Setting", []), "limitations": entity_by_type.get("LimitationGap", []), }) # 构建 claim_matrix claim_matrix = [] if chunk_ids: claims = query_all( """ SELECT claim_id, doc_id, chunk_id, claim_text, sign, conditions, confidence FROM claims WHERE chunk_id = ANY(%s) ORDER BY confidence DESC """, (chunk_ids,), ) for claim in claims: claim_matrix.append({ "claim_id": claim["claim_id"], "doc_id": claim["doc_id"], "chunk_id": claim["chunk_id"], "claim_text": claim["claim_text"], "sign": claim["sign"], "conditions": claim["conditions"] or {}, "confidence": claim["confidence"], "citation_anchor": f"[[chunk:{claim['chunk_id']}]]", }) # 获取引用信息 doc_citations = [] for doc_id in doc_ids_list: citation = query_one( """ SELECT d.doc_id, d.title, d.authors, d.year, c.bibtex, c.apa FROM documents d LEFT JOIN citations c ON d.doc_id = c.doc_id WHERE d.doc_id = %s """, (doc_id,), ) if citation: doc_citations.append({ "doc_id": citation["doc_id"], "title": citation["title"], "authors": citation["authors"], "year": citation["year"], "bibtex": citation["bibtex"], "apa": citation["apa"], }) return { "pack_id": pack_id, "query": pack["query"], "evidence": evidence, "paper_matrix": paper_matrix, "claim_matrix": claim_matrix, "doc_citations": doc_citations, "stats": { "total_chunks": len(evidence), "unique_docs": len(doc_ids_list), "total_claims": len(claim_matrix), }, } except Exception as e: return {"error": str(e)} # ---------------------------------------------------------- # Tool 4: lint_section_v1 # ---------------------------------------------------------- @mcp.tool() def lint_section_v1( pack_id: int, markdown: str, require_citations_per_paragraph: bool = False, min_citations_per_paragraph: int = 1, ) -> dict[str, Any]: """验证章节引用合规 检查 Agent 写作的 markdown 是否符合引用规则。 Args: pack_id: 证据包 ID markdown: Agent 写作的 markdown 内容 require_citations_per_paragraph: 是否要求每段有引用,默认 False min_citations_per_paragraph: 每段最少引用数,默认 1 Returns: passed, issues[], stats """ try: # 获取 pack 中所有 chunk_ids pack_chunks = query_all( "SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s", (pack_id,), ) if not pack_chunks: return {"error": f"Pack not found or empty: {pack_id}"} valid_chunk_ids = {row["chunk_id"] for row in pack_chunks} # 解析 markdown 中的引用 # 格式: [[chunk:<chunk_id>]] citation_pattern = r"\[\[chunk:(\d+)\]\]" citations = re.findall(citation_pattern, markdown) cited_chunk_ids = [int(c) for c in citations] issues = [] valid_citations = 0 invalid_citations = 0 # 检查每个引用 for chunk_id in cited_chunk_ids: # 检查是否存在 exists = query_one( "SELECT chunk_id FROM chunks WHERE chunk_id = %s", (chunk_id,), ) if not exists: issues.append({ "severity": "error", "rule": "CHUNK_NOT_FOUND", "chunk_id": chunk_id, "message": f"Chunk {chunk_id} does not exist in database", "suggestion": "Remove this citation or use a valid chunk_id from the evidence pack", }) invalid_citations += 1 continue # 检查是否在 pack 内 if chunk_id not in valid_chunk_ids: issues.append({ "severity": "error", "rule": "CHUNK_OUT_OF_PACK", "chunk_id": chunk_id, "message": f"Chunk {chunk_id} is not in evidence pack {pack_id}", "suggestion": "Only cite chunks from the provided evidence pack", }) invalid_citations += 1 continue valid_citations += 1 # 检查段落引用密度(可选) if require_citations_per_paragraph: # 按段落分割 paragraphs = [p.strip() for p in markdown.split("\n\n") if p.strip()] for i, para in enumerate(paragraphs): # 跳过标题行 if para.startswith("#"): continue # 统计该段落的引用 para_citations = re.findall(citation_pattern, para) if len(para_citations) < min_citations_per_paragraph: issues.append({ "severity": "warning", "rule": "LOW_PARAGRAPH_DENSITY", "paragraph_index": i, "message": f"Paragraph {i+1} has {len(para_citations)} citations (minimum: {min_citations_per_paragraph})", "suggestion": f"Add at least {min_citations_per_paragraph - len(para_citations)} more citation(s) to this paragraph", }) # 检查单一来源主导 if cited_chunk_ids: # 获取每个 chunk 的 doc_id chunk_docs = {} for chunk_id in set(cited_chunk_ids): doc = query_one( "SELECT doc_id FROM chunks WHERE chunk_id = %s", (chunk_id,), ) if doc: chunk_docs[chunk_id] = doc["doc_id"] # 统计每个文档被引用次数 doc_cite_counts: dict[str, int] = defaultdict(int) for chunk_id in cited_chunk_ids: doc_id = chunk_docs.get(chunk_id) if doc_id: doc_cite_counts[doc_id] += 1 total = len(cited_chunk_ids) for doc_id, count in doc_cite_counts.items(): if count / total > 0.5: issues.append({ "severity": "warning", "rule": "SINGLE_SOURCE_DOMINANT", "doc_id": doc_id, "message": f"Document {doc_id[:16]}... accounts for {count}/{total} ({count*100//total}%) of citations", "suggestion": "Consider diversifying citations across multiple sources", }) # 判断是否通过 has_errors = any(issue["severity"] == "error" for issue in issues) return { "passed": not has_errors, "issues": issues, "stats": { "total_citations": len(cited_chunk_ids), "valid_citations": valid_citations, "invalid_citations": invalid_citations, "unique_chunks_cited": len(set(cited_chunk_ids)), "pack_chunk_count": len(valid_chunk_ids), "citation_coverage": valid_citations / len(valid_chunk_ids) if valid_chunk_ids else 0, }, } except Exception as e: return {"error": str(e), "passed": False} # ---------------------------------------------------------- # Tool 5: compose_full_template_v1 # ---------------------------------------------------------- @mcp.tool() def compose_full_template_v1(outline_id: str) -> dict[str, Any]: """生成全文结构模板 返回按顺序排列的章节和 markdown 模板(带占位符)。 Args: outline_id: 大纲 ID Returns: ordered_sections[], template_markdown """ try: # 获取 outline outline = query_one( "SELECT outline_id, topic, outline_style FROM review_outlines WHERE outline_id = %s", (outline_id,), ) if not outline: return {"error": f"Outline not found: {outline_id}"} # 获取 sections sections = query_all( """ SELECT section_id, title, description, ord FROM review_outline_sections WHERE outline_id = %s ORDER BY ord """, (outline_id,), ) ordered_sections = [ { "section_id": s["section_id"], "title": s["title"], "description": s["description"], "ord": s["ord"], } for s in sections ] # 生成 markdown 模板 template_lines = [ f"# {outline['topic']}", "", ] for section in sections: template_lines.extend([ f"## {section['title']}", "", f"<!-- SECTION: {section['section_id']} -->", f"<!-- {section['description']} -->", "", "[请在此处插入章节内容]", "", ]) template_lines.extend([ "## 参考文献", "", "<!-- REFERENCES -->", "", ]) return { "outline_id": outline_id, "topic": outline["topic"], "outline_style": outline["outline_style"], "ordered_sections": ordered_sections, "template_markdown": "\n".join(template_lines), } except Exception as e: return {"error": str(e)} # ---------------------------------------------------------- # Tool 6: lint_review_v1 # ---------------------------------------------------------- @mcp.tool() def lint_review_v1( pack_ids: list[int], markdown: str, ) -> dict[str, Any]: """验证全文合规 检查完整综述是否符合所有引用规则。 Args: pack_ids: 允许的证据包 ID 列表(白名单) markdown: 完整的综述 markdown Returns: passed, issues[], stats """ try: # 收集所有允许的 chunk_ids all_valid_chunk_ids: set[int] = set() pack_chunk_counts: dict[int, int] = {} for pack_id in pack_ids: pack_chunks = query_all( "SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s", (pack_id,), ) chunk_ids = {row["chunk_id"] for row in pack_chunks} all_valid_chunk_ids.update(chunk_ids) pack_chunk_counts[pack_id] = len(chunk_ids) if not all_valid_chunk_ids: return {"error": "No valid chunks in provided pack_ids"} # 解析引用 citation_pattern = r"\[\[chunk:(\d+)\]\]" citations = re.findall(citation_pattern, markdown) cited_chunk_ids = [int(c) for c in citations] issues = [] valid_citations = 0 invalid_citations = 0 # 检查每个引用 for chunk_id in cited_chunk_ids: # 检查是否存在 exists = query_one( "SELECT chunk_id FROM chunks WHERE chunk_id = %s", (chunk_id,), ) if not exists: issues.append({ "severity": "error", "rule": "CHUNK_NOT_FOUND", "chunk_id": chunk_id, "message": f"Chunk {chunk_id} does not exist", }) invalid_citations += 1 continue # 检查是否在白名单内 if chunk_id not in all_valid_chunk_ids: issues.append({ "severity": "error", "rule": "CHUNK_OUT_OF_PACK", "chunk_id": chunk_id, "message": f"Chunk {chunk_id} is not in whitelisted packs", }) invalid_citations += 1 continue valid_citations += 1 # 通过判定 has_errors = any(issue["severity"] == "error" for issue in issues) return { "passed": not has_errors, "issues": issues, "stats": { "total_citations": len(cited_chunk_ids), "unique_chunks_cited": len(set(cited_chunk_ids)), "valid_citations": valid_citations, "invalid_citations": invalid_citations, "pack_count": len(pack_ids), "total_allowed_chunks": len(all_valid_chunk_ids), "citation_coverage_pct": ( len(set(cited_chunk_ids) & all_valid_chunk_ids) / len(all_valid_chunk_ids) * 100 if all_valid_chunk_ids else 0 ), }, } except Exception as e: return {"error": str(e), "passed": False}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server