extract_graph_missing
Identify documents lacking mentions and process them with extract_graph_v1 to build knowledge graphs from academic papers.
Instructions
批量补跑未抽取的文档
找出没有 mentions 的文档,并对它们执行 extract_graph_v1。
Args: limit_docs: 最大处理文档数,默认 50 llm_model: LLM 模型,默认使用环境变量 LLM_MODEL 配置 min_confidence: 最小置信度阈值
Returns: 处理的文档数和文档 ID 列表
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit_docs | No | ||
| llm_model | No | ||
| min_confidence | No | ||
| concurrency | No | ||
| doc_concurrency | No | ||
| max_chunks | No |
Implementation Reference
- Main handler function that implements the tool logic: finds missing documents, selects high-value chunks, and extracts graph using parallel calls to extract_graph_v1_run.@mcp.tool() async def extract_graph_missing( limit_docs: int = 50, llm_model: str | None = None, min_confidence: float = 0.8, concurrency: int = 30, doc_concurrency: int = 15, max_chunks: int = 60, ) -> dict[str, Any]: """批量补跑未抽取的文档 找出没有 mentions 的文档,并对它们执行 extract_graph_v1。 Args: limit_docs: 最大处理文档数,默认 50 llm_model: LLM 模型,默认使用环境变量 LLM_MODEL 配置 min_confidence: 最小置信度阈值 Returns: 处理的文档数和文档 ID 列表 """ try: settings = get_settings() actual_llm_model = llm_model or settings.llm_model # 找出未抽取的文档 missing_docs = query_all( """ SELECT d.doc_id FROM documents d LEFT JOIN mentions m ON m.doc_id = d.doc_id GROUP BY d.doc_id HAVING COUNT(m.mention_id) = 0 ORDER BY d.created_at DESC LIMIT %s """, (limit_docs,) ) if not missing_docs: return ExtractGraphMissingOut( processed_docs=0, doc_ids=[], ).model_dump() # 导入抽取工具 from paperlib_mcp.tools.graph_extract import ( extract_graph_v1_run, HIGH_VALUE_KEYWORDS_DEFAULT, ) processed_doc_ids = [] # 使用 Semaphore 限制文档级并发 sem = asyncio.Semaphore(doc_concurrency) async def process_single_doc(doc_row): async with sem: doc_id = doc_row["doc_id"] # 获取该文档的高价值 chunks fts_query = " OR ".join(f"'{kw}'" for kw in HIGH_VALUE_KEYWORDS_DEFAULT) chunks = query_all( """ SELECT chunk_id, doc_id, page_start, page_end, text FROM chunks WHERE doc_id = %s AND tsv @@ websearch_to_tsquery('english', %s) ORDER BY ts_rank(tsv, websearch_to_tsquery('english', %s)) DESC LIMIT %s """, (doc_id, fts_query, fts_query, max_chunks) ) if not chunks: # 没有高价值 chunks,使用所有 chunks chunks = query_all( """ SELECT chunk_id, doc_id, page_start, page_end, text FROM chunks WHERE doc_id = %s ORDER BY chunk_index LIMIT %s """, (doc_id, max_chunks) ) if not chunks: return None # 直接调用优化后的 extract_graph_v1_run (支持并行) # from paperlib_mcp.tools.graph_extract import extract_graph_v1_run # Already imported above # 提取 chunk_ids chunk_ids = [c["chunk_id"] for c in chunks] await extract_graph_v1_run( doc_id=doc_id, chunk_ids=chunk_ids, mode="custom", # 使用传入的 chunk_ids max_chunks=len(chunks), llm_model=actual_llm_model, min_confidence=min_confidence, concurrency=concurrency, ) return doc_id # 并发执行所有文档 tasks = [process_single_doc(doc) for doc in missing_docs] results = await asyncio.gather(*tasks) # 收集能够成功处理的 doc_id processed_doc_ids = [r for r in results if r is not None] return ExtractGraphMissingOut( processed_docs=len(processed_doc_ids), doc_ids=processed_doc_ids, ).model_dump() except Exception as e: return ExtractGraphMissingOut( processed_docs=0, doc_ids=[], error=MCPErrorModel(code="LLM_ERROR", message=str(e)), ).model_dump()
- Pydantic input and output schema definitions for the extract_graph_missing tool.class ExtractGraphMissingIn(BaseModel): """extract_graph_missing 输入""" limit_docs: int = 50 llm_model: Optional[str] = None # 默认使用环境变量 LLM_MODEL min_confidence: float = 0.8 class ExtractGraphMissingOut(BaseModel): """extract_graph_missing 输出""" processed_docs: int doc_ids: list[str] = Field(default_factory=list) error: Optional[MCPErrorModel] = None
- src/paperlib_mcp/server.py:44-44 (registration)Registration call in the main MCP server setup that registers the graph maintenance tools, including extract_graph_missing.register_graph_maintenance_tools(mcp)
- src/paperlib_mcp/tools/graph_maintenance.py:22-24 (registration)The registration function that defines and registers the tool using @mcp.tool() decorator.def register_graph_maintenance_tools(mcp: FastMCP) -> None: """注册 GraphRAG 维护工具"""