ingest_status
Check import status for documents in Paperlib MCP, showing progress stages and error details to monitor PDF processing and troubleshoot issues.
Instructions
查看导入状态
查看指定文档或作业的导入状态,包括各阶段进度和错误信息。
Args: doc_id: 文档 ID(通过 doc_id 查询最新作业) job_id: 作业 ID(直接查询特定作业)
Returns: 导入状态信息,包含各阶段状态、错误摘要和建议修复动作
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| doc_id | No | ||
| job_id | No |
Implementation Reference
- The handler function for the 'ingest_status' tool. It retrieves the status of an import job or document from the database, compiles stage statuses, document statistics, and suggests next actions.def ingest_status( doc_id: str | None = None, job_id: int | None = None, ) -> dict[str, Any]: """查看导入状态 查看指定文档或作业的导入状态,包括各阶段进度和错误信息。 Args: doc_id: 文档 ID(通过 doc_id 查询最新作业) job_id: 作业 ID(直接查询特定作业) Returns: 导入状态信息,包含各阶段状态、错误摘要和建议修复动作 """ try: if not doc_id and not job_id: return { "error": "Must provide either doc_id or job_id", } # 获取作业信息 if job_id: job = query_one( """ SELECT job_id, doc_id, status, current_stage, started_at::text, finished_at::text, error FROM ingest_jobs WHERE job_id = %s """, (job_id,) ) else: job = query_one( """ SELECT job_id, doc_id, status, current_stage, started_at::text, finished_at::text, error FROM ingest_jobs WHERE doc_id = %s ORDER BY started_at DESC LIMIT 1 """, (doc_id,) ) if not job: return { "error": f"No ingest job found for {'job_id=' + str(job_id) if job_id else 'doc_id=' + doc_id}", "doc_id": doc_id, "job_id": job_id, } # 获取各阶段详情 stages = query_all( """ SELECT stage, status, message, created_at::text FROM ingest_job_items WHERE job_id = %s ORDER BY created_at """, (job["job_id"],) ) # 构建阶段状态映射 stage_status = {} for stage in IngestStage: stage_status[stage.value] = { "status": "pending", "message": None, "timestamp": None, } for item in stages: stage_status[item["stage"]] = { "status": item["status"], "message": item["message"], "timestamp": item["created_at"], } # 生成建议修复动作 suggested_action = None if job["status"] == IngestStatus.FAILED.value: if job["current_stage"] == IngestStage.EMBEDDED.value or \ stage_status[IngestStage.EMBEDDED.value]["status"] == IngestStatus.FAILED.value: suggested_action = f"Use reembed_document(doc_id='{job['doc_id']}') to retry embedding generation" elif job["current_stage"] == IngestStage.CHUNKED.value: suggested_action = f"Use rechunk_document(doc_id='{job['doc_id']}', force=True) to retry chunking" else: suggested_action = f"Use import_pdf(file_path=..., force=True) to reimport from scratch" elif job["status"] == IngestStatus.RUNNING.value: suggested_action = "Job is still running. Wait for completion or check for stuck process." # 检查文档的实际状态 doc_stats = None if job["doc_id"]: stats = query_one( """ SELECT (SELECT COUNT(*) FROM chunks WHERE doc_id = %s) as chunk_count, (SELECT COUNT(*) FROM chunk_embeddings ce JOIN chunks c ON ce.chunk_id = c.chunk_id WHERE c.doc_id = %s) as embedded_count """, (job["doc_id"], job["doc_id"]) ) if stats: doc_stats = { "chunk_count": stats["chunk_count"], "embedded_count": stats["embedded_count"], "missing_embeddings": stats["chunk_count"] - stats["embedded_count"], } if doc_stats["missing_embeddings"] > 0 and job["status"] == IngestStatus.COMPLETED.value: suggested_action = f"Use reembed_document(doc_id='{job['doc_id']}') to fill missing embeddings" return { "job_id": job["job_id"], "doc_id": job["doc_id"], "status": job["status"], "current_stage": job["current_stage"], "started_at": job["started_at"], "finished_at": job["finished_at"], "error": job["error"], "stages": stage_status, "document_stats": doc_stats, "suggested_action": suggested_action, } except Exception as e: return { "error": str(e), "doc_id": doc_id, "job_id": job_id, }
- src/paperlib_mcp/server.py:13-34 (registration)Imports and calls register_import_tools(mcp), which registers the ingest_status tool (along with import_pdf).from paperlib_mcp.tools.import_pdf import register_import_tools from paperlib_mcp.tools.search import register_search_tools from paperlib_mcp.tools.fetch import register_fetch_tools from paperlib_mcp.tools.writing import register_writing_tools # M2 GraphRAG 工具 from paperlib_mcp.tools.graph_extract import register_graph_extract_tools from paperlib_mcp.tools.graph_canonicalize import register_graph_canonicalize_tools from paperlib_mcp.tools.graph_community import register_graph_community_tools from paperlib_mcp.tools.graph_summarize import register_graph_summarize_tools from paperlib_mcp.tools.graph_maintenance import register_graph_maintenance_tools # M3 Review 工具 from paperlib_mcp.tools.review import register_review_tools # M4 Canonicalization & Grouping 工具 from paperlib_mcp.tools.graph_relation_canonicalize import register_graph_relation_canonicalize_tools from paperlib_mcp.tools.graph_claim_grouping import register_graph_claim_grouping_tools from paperlib_mcp.tools.graph_v12 import register_graph_v12_tools register_health_tools(mcp) register_import_tools(mcp)
- Enum definitions for IngestStatus and IngestStage used by the ingest_status tool to categorize job statuses and stages.class IngestStage(str, Enum): """导入阶段""" HASHED = "HASHED" # 计算 SHA256 UPLOADED = "UPLOADED" # 上传到 MinIO EXTRACTED = "EXTRACTED" # 提取文本 CHUNKED = "CHUNKED" # 分块 EMBEDDED = "EMBEDDED" # 生成 embedding COMMITTED = "COMMITTED" # 提交完成 class IngestStatus(str, Enum): """状态""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed"
- src/paperlib_mcp/tools/import_pdf.py:553-572 (registration)The register_import_tools function that defines and registers both import_pdf and ingest_status tools using @mcp.tool() decorators.def register_import_tools(mcp: FastMCP) -> None: """注册 PDF 导入工具""" @mcp.tool() async def import_pdf( file_path: str, title: str | None = None, authors: str | None = None, year: int | None = None, force: bool = False, ) -> dict[str, Any]: """导入 PDF 文献到知识库""" return await import_pdf_run( file_path=file_path, title=title, authors=authors, year=year, force=force, )