Skip to main content
Glama

rechunk_document

Extract text from PDF documents, split into manageable chunks, and generate new embeddings for improved search and analysis in academic literature management.

Instructions

重新分块文档

从 MinIO 获取 PDF,重新提取文本并分块,然后生成新的 embeddings。 会删除旧的 chunks 和 embeddings。

Args: doc_id: 文档的唯一标识符 strategy: 分块策略,目前支持 "page_v1"(按页分块) force: 是否强制执行(即使已有 chunks),默认 False

Returns: 处理结果,包含新的 chunk 数量

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
doc_idYes
strategyNopage_v1
forceNo

Implementation Reference

  • The core handler function for the 'rechunk_document' tool. It re-downloads the PDF, extracts text, chunks it, deletes old chunks/embeddings, inserts new ones, and generates embeddings.
    @mcp.tool() def rechunk_document( doc_id: str, strategy: str = "page_v1", force: bool = False, ) -> dict[str, Any]: """重新分块文档 从 MinIO 获取 PDF,重新提取文本并分块,然后生成新的 embeddings。 会删除旧的 chunks 和 embeddings。 Args: doc_id: 文档的唯一标识符 strategy: 分块策略,目前支持 "page_v1"(按页分块) force: 是否强制执行(即使已有 chunks),默认 False Returns: 处理结果,包含新的 chunk 数量 """ try: # 检查文档是否存在 doc = query_one( "SELECT doc_id, pdf_key FROM documents WHERE doc_id = %s", (doc_id,) ) if not doc: return { "success": False, "error": f"Document not found: {doc_id}", "doc_id": doc_id, } # 检查是否已有 chunks existing = query_one( "SELECT COUNT(*) as count FROM chunks WHERE doc_id = %s", (doc_id,) ) if existing and existing["count"] > 0 and not force: return { "success": False, "error": f"Document already has {existing['count']} chunks. Use force=True to rechunk.", "doc_id": doc_id, "existing_chunks": existing["count"], } settings = get_settings() # 从 MinIO 获取 PDF pdf_content = get_object(doc["pdf_key"]) # 保存到临时文件 with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: tmp.write(pdf_content) tmp_path = tmp.name try: # 提取文本 pdf_result = extract_pdf(tmp_path) # 删除旧的 chunks(级联删除 embeddings) execute("DELETE FROM chunks WHERE doc_id = %s", (doc_id,)) # 分块 pages = [(p.page_num, p.text) for p in pdf_result.pages if not p.is_empty] chunks = chunk_document(pages) if not chunks: return { "success": True, "doc_id": doc_id, "n_chunks": 0, "message": "No text content extracted from PDF", } # 写入 chunks 表 chunk_ids = [] with get_db() as conn: with conn.cursor() as cur: for chunk in chunks: cur.execute( """ INSERT INTO chunks (doc_id, chunk_index, page_start, page_end, text, token_count) VALUES (%s, %s, %s, %s, %s, %s) RETURNING chunk_id """, ( doc_id, chunk["chunk_index"], chunk["page_start"], chunk["page_end"], chunk["text"], chunk["token_count"], ) ) result = cur.fetchone() if result: chunk_ids.append(result["chunk_id"]) # 生成 embeddings texts = [c["text"] for c in chunks] embeddings = get_embeddings_chunked(texts) # 写入 embeddings embedded_count = 0 with get_db() as conn: with conn.cursor() as cur: for chunk_id, embedding in zip(chunk_ids, embeddings): embedding_str = "[" + ",".join(str(x) for x in embedding) + "]" cur.execute( """ INSERT INTO chunk_embeddings (chunk_id, embedding_model, embedding) VALUES (%s, %s, %s::vector) """, (chunk_id, settings.embedding_model, embedding_str) ) embedded_count += 1 return { "success": True, "doc_id": doc_id, "strategy": strategy, "n_pages": pdf_result.total_pages, "n_chunks": len(chunks), "embedded_chunks": embedded_count, } finally: # 清理临时文件 Path(tmp_path).unlink(missing_ok=True) except Exception as e: return { "success": False, "error": str(e), "doc_id": doc_id, }
  • Registers the fetch tools, including rechunk_document, by calling register_fetch_tools on the MCP instance.
    register_fetch_tools(mcp)
  • Helper function called by rechunk_document to perform the actual page-based chunking of document text.
    def chunk_document(pages: list[tuple[int, str]]) -> list[dict]: """对文档按页分块(返回字典格式,便于数据库存储) Args: pages: 页面列表,每项为 (page_num, text) Returns: chunk 字典列表,包含 chunk_index, page_start, page_end, text, token_count """ chunks = chunk_pages(pages) return [ { "chunk_index": c.chunk_index, "page_start": c.page_start, "page_end": c.page_end, "text": c.text, "token_count": c.estimated_tokens, } for c in chunks ]
  • Import and invocation of the fetch tools registration in the main server file.
    from paperlib_mcp.tools.fetch import register_fetch_tools from paperlib_mcp.tools.writing import register_writing_tools # M2 GraphRAG 工具 from paperlib_mcp.tools.graph_extract import register_graph_extract_tools from paperlib_mcp.tools.graph_canonicalize import register_graph_canonicalize_tools from paperlib_mcp.tools.graph_community import register_graph_community_tools from paperlib_mcp.tools.graph_summarize import register_graph_summarize_tools from paperlib_mcp.tools.graph_maintenance import register_graph_maintenance_tools # M3 Review 工具 from paperlib_mcp.tools.review import register_review_tools # M4 Canonicalization & Grouping 工具 from paperlib_mcp.tools.graph_relation_canonicalize import register_graph_relation_canonicalize_tools from paperlib_mcp.tools.graph_claim_grouping import register_graph_claim_grouping_tools from paperlib_mcp.tools.graph_v12 import register_graph_v12_tools register_health_tools(mcp) register_import_tools(mcp) register_search_tools(mcp) register_fetch_tools(mcp) register_writing_tools(mcp) # 注册 M2 GraphRAG 工具 register_graph_extract_tools(mcp) register_graph_canonicalize_tools(mcp) register_graph_community_tools(mcp) register_graph_summarize_tools(mcp) register_graph_maintenance_tools(mcp) # 注册 M3 Review 工具 register_review_tools(mcp) # 注册 M4 Canonicalization & Grouping 工具 register_graph_relation_canonicalize_tools(mcp) register_graph_claim_grouping_tools(mcp) register_graph_v12_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server