Skip to main content
Glama
test_m3_review.py23.3 kB
#!/usr/bin/env python3 """M3 Review Tools 扩展测试脚本 测试 M3 里程碑实现的所有 Review 工具(扩展版): 1. generate_review_outline_data_v1 - 生成综述大纲 2. build_section_evidence_pack_v1 - 构建章节证据包 3. export_section_packet_v1 - 导出写作输入包 4. lint_section_v1 - 验证章节引用 5. compose_full_template_v1 - 生成全文模板 6. lint_review_v1 - 验证全文合规 扩展测试: - 多章节证据包构建 - 不同大纲样式 - Lint 边界情况 - 完整写作流程模拟 """ import sys from pathlib import Path # 添加 src 目录到 path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from paperlib_mcp.db import query_one, query_all from paperlib_mcp.tools.review import register_review_tools from fastmcp import FastMCP class Colors: """终端颜色""" GREEN = "\033[92m" RED = "\033[91m" YELLOW = "\033[93m" BLUE = "\033[94m" CYAN = "\033[96m" RESET = "\033[0m" BOLD = "\033[1m" def print_header(title: str): """打印测试标题""" print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}") print(f"{Colors.BOLD}{Colors.BLUE}{title}{Colors.RESET}") print(f"{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}") def print_subheader(title: str): """打印子标题""" print(f"\n{Colors.CYAN}--- {title} ---{Colors.RESET}") def print_test(name: str, passed: bool, details: str = ""): """打印测试结果""" status = f"{Colors.GREEN}✓ PASS{Colors.RESET}" if passed else f"{Colors.RED}✗ FAIL{Colors.RESET}" print(f" {status} {name}") if details: print(f" {Colors.YELLOW}{details}{Colors.RESET}") def print_info(msg: str): """打印信息""" print(f" {Colors.BLUE}ℹ{Colors.RESET} {msg}") class M3ExtendedTester: """M3 Review Tools 扩展测试器""" def __init__(self): self.mcp = FastMCP("test") self.passed = 0 self.failed = 0 self.outline_id = None self.section_packs = {} # section_id -> pack_id # 注册工具 register_review_tools(self.mcp) def call_tool(self, name: str, **kwargs): """调用 MCP 工具""" tool = self.mcp._tool_manager._tools.get(name) if not tool: raise ValueError(f"Tool not found: {name}") return tool.fn(**kwargs) def test(self, name: str, condition: bool, details: str = ""): """记录测试结果""" if condition: self.passed += 1 else: self.failed += 1 print_test(name, condition, details) # ==================== 基础测试 ==================== def test_generate_outline(self): """测试生成大纲""" print_header("1. 生成综述大纲 (generate_review_outline_data_v1)") # 使用 rebuild=True 确保新建 result = self.call_tool( "generate_review_outline_data_v1", topic="earnings management and corporate governance", outline_style="econ_finance_canonical", rebuild=True, ) self.test("返回 outline_id", result.get("outline_id") is not None) self.test("返回 sections", "sections" in result) self.test("包含 6 个章节", len(result.get("sections", [])) == 6) if result.get("outline_id"): self.outline_id = result["outline_id"] print_info(f"Outline ID: {self.outline_id[:8]}...") print_info(f"Topic: {result.get('topic')}") if result.get("sections"): section_ids = [s["section_id"] for s in result["sections"]] expected = ["research_question", "measurement", "identification", "findings", "debates", "gaps"] self.test("章节 ID 正确", section_ids == expected, str(section_ids)) def test_outline_reproducibility(self): """测试大纲可复现性""" print_header("2. 大纲可复现性测试") # 同一 topic 再次生成 result = self.call_tool( "generate_review_outline_data_v1", topic="earnings management and corporate governance", outline_style="econ_finance_canonical", rebuild=False, ) self.test("返回已存在的大纲", result.get("reused", False) is True) self.test("outline_id 一致", result.get("outline_id") == self.outline_id) def test_different_outline_style(self): """测试不同大纲样式""" print_header("3. 不同大纲样式测试") # 使用 general 样式 result = self.call_tool( "generate_review_outline_data_v1", topic="test topic for general style", outline_style="general", rebuild=True, ) self.test("返回 outline_id", result.get("outline_id") is not None) self.test("使用 general 样式", result.get("outline_style") == "general") if result.get("sections"): section_ids = [s["section_id"] for s in result["sections"]] expected = ["background", "methodology", "findings", "discussion", "future"] self.test("general 样式章节正确", section_ids == expected, str(section_ids)) print_info(f"General style sections: {len(result['sections'])}") # ==================== 证据包测试 ==================== def test_build_all_section_packs(self): """测试构建所有章节证据包""" print_header("4. 构建所有章节证据包") if not self.outline_id: print_info("跳过:没有 outline_id") return sections = ["research_question", "measurement", "identification", "findings", "debates", "gaps"] for section_id in sections: print_subheader(f"Section: {section_id}") result = self.call_tool( "build_section_evidence_pack_v1", outline_id=self.outline_id, section_id=section_id, max_chunks=30, per_doc_limit=3, rebuild=True, ) has_pack = result.get("pack_id") is not None self.test(f"{section_id} 返回 pack_id", has_pack) if has_pack: self.section_packs[section_id] = result["pack_id"] print_info(f"Pack ID: {result['pack_id']}, Chunks: {result.get('chunk_count', 0)}, Docs: {result.get('doc_count', 0)}") def test_pack_caching(self): """测试证据包缓存""" print_header("5. 证据包缓存测试") if not self.outline_id or not self.section_packs: print_info("跳过:没有数据") return # 选一个有数据的 section test_section = None test_pack_id = None for section_id, pack_id in self.section_packs.items(): if pack_id: test_section = section_id test_pack_id = pack_id break if not test_section: print_info("跳过:没有有效的证据包") return # 再次请求同一 section result = self.call_tool( "build_section_evidence_pack_v1", outline_id=self.outline_id, section_id=test_section, rebuild=False, ) self.test("复用已有 pack", result.get("reused", False) is True) self.test("pack_id 一致", result.get("pack_id") == test_pack_id) print_info(f"Section: {test_section}, Pack ID: {result.get('pack_id')}") # ==================== 导出测试 ==================== def test_export_packets(self): """测试导出多个写作输入包""" print_header("6. 导出写作输入包") if not self.section_packs: print_info("跳过:没有证据包") return for section_id, pack_id in self.section_packs.items(): if not pack_id: continue print_subheader(f"Section: {section_id}") result = self.call_tool("export_section_packet_v1", pack_id=pack_id) if "error" in result: print_info(f"Error: {result['error']}") continue self.test(f"{section_id} 返回 evidence", "evidence" in result) stats = result.get("stats", {}) print_info(f"Chunks: {stats.get('total_chunks', 0)}, Docs: {stats.get('unique_docs', 0)}, Claims: {stats.get('total_claims', 0)}") # 验证 evidence 结构 if result.get("evidence"): first = result["evidence"][0] has_anchor = "citation_anchor" in first self.test(f"{section_id} evidence 包含 citation_anchor", has_anchor) # ==================== Lint 测试 ==================== def test_lint_valid_citations(self): """测试 Lint - 有效引用""" print_header("7. Lint Section - 有效引用") # 找一个有数据的 pack test_pack_id = None for pack_id in self.section_packs.values(): if pack_id: test_pack_id = pack_id break if not test_pack_id: print_info("跳过:没有证据包") return # 获取 pack 中的所有 chunk_ids chunks = query_all( "SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s LIMIT 3", (test_pack_id,), ) if not chunks: print_info("跳过:pack 为空") return # 构造有效 markdown citations = " ".join([f"[[chunk:{c['chunk_id']}]]" for c in chunks]) markdown = f""" ## 测量与数据 本节讨论相关测量方法。{citations} 这是详细分析段落,引用了多个来源。 """ result = self.call_tool( "lint_section_v1", pack_id=test_pack_id, markdown=markdown, ) self.test("返回 passed", "passed" in result) self.test("校验通过", result.get("passed", False) is True) stats = result.get("stats", {}) print_info(f"Valid: {stats.get('valid_citations', 0)}, Invalid: {stats.get('invalid_citations', 0)}") def test_lint_invalid_chunk(self): """测试 Lint - 不存在的 chunk_id""" print_header("8. Lint Section - 不存在的 chunk_id") test_pack_id = None for pack_id in self.section_packs.values(): if pack_id: test_pack_id = pack_id break if not test_pack_id: print_info("跳过:没有证据包") return markdown = """ ## 测试 这是引用了不存在的 chunk。[[chunk:999999999]] """ result = self.call_tool( "lint_section_v1", pack_id=test_pack_id, markdown=markdown, ) self.test("校验失败", result.get("passed") is False) issues = result.get("issues", []) has_not_found = any(i["rule"] == "CHUNK_NOT_FOUND" for i in issues) self.test("检测到 CHUNK_NOT_FOUND", has_not_found) print_info(f"Issues: {len(issues)}") def test_lint_out_of_pack(self): """测试 Lint - pack 外引用""" print_header("9. Lint Section - Pack 外引用") test_pack_id = None for pack_id in self.section_packs.values(): if pack_id: test_pack_id = pack_id break if not test_pack_id: print_info("跳过:没有证据包") return # 找一个存在但不在 pack 中的 chunk_id out_chunk = query_one( """ SELECT c.chunk_id FROM chunks c WHERE c.chunk_id NOT IN ( SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s ) LIMIT 1 """, (test_pack_id,), ) if not out_chunk: print_info("跳过:无法找到 pack 外的 chunk") return out_chunk_id = out_chunk["chunk_id"] markdown = f""" ## 测试 这是引用了 pack 外的 chunk。[[chunk:{out_chunk_id}]] """ result = self.call_tool( "lint_section_v1", pack_id=test_pack_id, markdown=markdown, ) self.test("校验失败", result.get("passed") is False) issues = result.get("issues", []) has_out_of_pack = any(i["rule"] == "CHUNK_OUT_OF_PACK" for i in issues) self.test("检测到 CHUNK_OUT_OF_PACK", has_out_of_pack) print_info(f"Out-of-pack chunk_id: {out_chunk_id}") def test_lint_no_citations(self): """测试 Lint - 无引用""" print_header("10. Lint Section - 无引用") test_pack_id = None for pack_id in self.section_packs.values(): if pack_id: test_pack_id = pack_id break if not test_pack_id: print_info("跳过:没有证据包") return markdown = """ ## 测试 这是一段没有任何引用的文字。 完全没有使用证据包中的任何内容。 """ result = self.call_tool( "lint_section_v1", pack_id=test_pack_id, markdown=markdown, ) # 无引用应该通过(因为 require_citations_per_paragraph 默认 False) self.test("无引用默认通过", result.get("passed", False) is True) stats = result.get("stats", {}) self.test("total_citations = 0", stats.get("total_citations", -1) == 0) def test_lint_paragraph_density(self): """测试 Lint - 段落密度检查""" print_header("11. Lint Section - 段落密度检查(可选功能)") test_pack_id = None for pack_id in self.section_packs.values(): if pack_id: test_pack_id = pack_id break if not test_pack_id: print_info("跳过:没有证据包") return markdown = """ ## 测试 这是一段没有引用的文字。 另一段也没有引用。 """ # 开启段落密度检查 result = self.call_tool( "lint_section_v1", pack_id=test_pack_id, markdown=markdown, require_citations_per_paragraph=True, min_citations_per_paragraph=1, ) # 应该有 warning issues = result.get("issues", []) has_low_density = any(i["rule"] == "LOW_PARAGRAPH_DENSITY" for i in issues) self.test("检测到 LOW_PARAGRAPH_DENSITY", has_low_density) # 但仍然通过(因为是 warning 不是 error) self.test("仍然通过(warning不阻止)", result.get("passed", False) is True) # ==================== 全文测试 ==================== def test_compose_template(self): """测试生成全文模板""" print_header("12. 生成全文模板 (compose_full_template_v1)") if not self.outline_id: print_info("跳过:没有 outline_id") return result = self.call_tool("compose_full_template_v1", outline_id=self.outline_id) self.test("返回 ordered_sections", "ordered_sections" in result) self.test("返回 template_markdown", "template_markdown" in result) if result.get("ordered_sections"): self.test("包含 6 个章节", len(result["ordered_sections"]) == 6) if result.get("template_markdown"): template = result["template_markdown"] self.test("模板包含占位符", "<!-- SECTION:" in template) self.test("模板包含参考文献", "参考文献" in template) print_info(f"Template length: {len(template)} chars") def test_lint_review_valid(self): """测试全文合规检查 - 有效""" print_header("13. 全文合规检查 - 有效引用 (lint_review_v1)") valid_pack_ids = [p for p in self.section_packs.values() if p] if not valid_pack_ids: print_info("跳过:没有证据包") return # 获取一些有效 chunk chunks = [] for pack_id in valid_pack_ids[:2]: pack_chunks = query_all( "SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s LIMIT 2", (pack_id,), ) chunks.extend([c["chunk_id"] for c in pack_chunks]) if not chunks: print_info("跳过:没有 chunks") return citations = " ".join([f"[[chunk:{c}]]" for c in chunks]) markdown = f""" # 综述 ## 研究问题 这是研究问题章节。{citations} ## 参考文献 """ result = self.call_tool( "lint_review_v1", pack_ids=valid_pack_ids, markdown=markdown, ) self.test("返回 passed", "passed" in result) self.test("校验通过", result.get("passed", False) is True) stats = result.get("stats", {}) print_info(f"Valid: {stats.get('valid_citations', 0)}, Coverage: {stats.get('citation_coverage_pct', 0):.1f}%") def test_lint_review_cross_pack(self): """测试全文合规检查 - 跨 pack 引用""" print_header("14. 全文合规检查 - 跨 Pack 引用") valid_pack_ids = [p for p in self.section_packs.values() if p] if len(valid_pack_ids) < 1: print_info("跳过:没有证据包") return # 只允许第一个 pack,但引用其他 pack 的 chunk first_pack_id = valid_pack_ids[0] # 找一个不在第一个 pack 的 chunk out_chunk = query_one( """ SELECT c.chunk_id FROM chunks c WHERE c.chunk_id NOT IN ( SELECT chunk_id FROM evidence_pack_items WHERE pack_id = %s ) LIMIT 1 """, (first_pack_id,), ) if not out_chunk: print_info("跳过:无法找到外部 chunk") return markdown = f""" # 综述 ## 测试 引用了不在白名单 pack 中的 chunk。[[chunk:{out_chunk['chunk_id']}]] """ result = self.call_tool( "lint_review_v1", pack_ids=[first_pack_id], # 只允许第一个 pack markdown=markdown, ) self.test("校验失败", result.get("passed") is False) issues = result.get("issues", []) has_out_of_pack = any(i["rule"] == "CHUNK_OUT_OF_PACK" for i in issues) self.test("检测到 CHUNK_OUT_OF_PACK", has_out_of_pack) # ==================== 完整流程模拟 ==================== def test_full_workflow(self): """测试完整写作流程""" print_header("15. 完整写作流程模拟") # Step 1: 生成大纲 print_subheader("Step 1: 生成大纲") outline_result = self.call_tool( "generate_review_outline_data_v1", topic="full workflow test", outline_style="econ_finance_canonical", rebuild=True, ) outline_id = outline_result.get("outline_id") self.test("大纲生成成功", outline_id is not None) print_info(f"Outline ID: {outline_id[:8] if outline_id else 'N/A'}...") if not outline_id: return # Step 2: 为 measurement 构建证据包 print_subheader("Step 2: 构建 measurement 证据包") pack_result = self.call_tool( "build_section_evidence_pack_v1", outline_id=outline_id, section_id="measurement", max_chunks=20, ) pack_id = pack_result.get("pack_id") self.test("证据包构建成功", pack_id is not None) print_info(f"Pack ID: {pack_id}, Chunks: {pack_result.get('chunk_count', 0)}") if not pack_id: return # Step 3: 导出写作输入包 print_subheader("Step 3: 导出写作输入包") packet = self.call_tool("export_section_packet_v1", pack_id=pack_id) evidence = packet.get("evidence", []) self.test("写作输入包导出成功", len(evidence) > 0) print_info(f"Evidence items: {len(evidence)}") if not evidence: return # Step 4: 模拟 Agent 写作(使用真实 chunk_id) print_subheader("Step 4: 模拟 Agent 写作") chunk_ids = [e["chunk_id"] for e in evidence[:3]] agent_markdown = f""" ## 测量与数据 本研究使用了多种测量方法。[[chunk:{chunk_ids[0]}]] 我们参考了以下数据来源进行分析。[[chunk:{chunk_ids[1] if len(chunk_ids) > 1 else chunk_ids[0]}]] """ print_info(f"Agent 使用了 {len(chunk_ids)} 个引用") # Step 5: Lint 验证 print_subheader("Step 5: Lint 验证") lint_result = self.call_tool( "lint_section_v1", pack_id=pack_id, markdown=agent_markdown, ) self.test("Lint 验证通过", lint_result.get("passed", False) is True) print_info(f"Issues: {len(lint_result.get('issues', []))}") # Step 6: 生成全文模板 print_subheader("Step 6: 生成全文模板") template_result = self.call_tool("compose_full_template_v1", outline_id=outline_id) self.test("模板生成成功", "template_markdown" in template_result) print_info("完整流程测试完成!") # ==================== 运行所有测试 ==================== def run_all(self): """运行所有测试""" print(f"\n{Colors.BOLD}M3 Review Tools 扩展测试套件{Colors.RESET}") print("测试 M3 里程碑实现的所有 Review 工具(扩展版)\n") # 基础测试 self.test_generate_outline() self.test_outline_reproducibility() self.test_different_outline_style() # 证据包测试 self.test_build_all_section_packs() self.test_pack_caching() # 导出测试 self.test_export_packets() # Lint 测试 self.test_lint_valid_citations() self.test_lint_invalid_chunk() self.test_lint_out_of_pack() self.test_lint_no_citations() self.test_lint_paragraph_density() # 全文测试 self.test_compose_template() self.test_lint_review_valid() self.test_lint_review_cross_pack() # 完整流程 self.test_full_workflow() # 打印总结 print_header("测试总结") total = self.passed + self.failed print(f" 总测试数: {total}") print(f" {Colors.GREEN}通过: {self.passed}{Colors.RESET}") print(f" {Colors.RED}失败: {self.failed}{Colors.RESET}") print(f" 通过率: {self.passed/total*100:.1f}%") if self.failed == 0: print(f"\n{Colors.GREEN}{Colors.BOLD}✓ 所有测试通过!{Colors.RESET}") else: print(f"\n{Colors.RED}{Colors.BOLD}✗ 有 {self.failed} 个测试失败{Colors.RESET}") return self.failed == 0 def main(): """主函数""" tester = M3ExtendedTester() success = tester.run_all() sys.exit(0 if success else 1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server