Skip to main content
Glama

import_pdf

Add PDF academic papers to your knowledge base by specifying file path, title, authors, and publication year for organized literature management.

Instructions

导入 PDF 文献到知识库

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathYes
titleNo
authorsNo
yearNo
forceNo

Implementation Reference

  • Registration of import_pdf tool via register_import_tools call in the main MCP server initialization.
    register_import_tools(mcp)
  • The FastMCP tool handler for 'import_pdf', decorated with @mcp.tool(). Delegates to the core import_pdf_run implementation.
    @mcp.tool() async def import_pdf( file_path: str, title: str | None = None, authors: str | None = None, year: int | None = None, force: bool = False, ) -> dict[str, Any]: """导入 PDF 文献到知识库""" return await import_pdf_run( file_path=file_path, title=title, authors=authors, year=year, force=force, )
  • Core implementation of PDF import logic using a 6-stage state machine with job management, supporting resume and force options.
    async def import_pdf_run( file_path: str, title: str | None = None, authors: str | None = None, year: int | None = None, force: bool = False, ) -> dict[str, Any]: """导入 PDF 文献到知识库 (Core Implementation)""" job_manager = None doc_id = "" pdf_key = "" try: # 1. 验证输入参数 params = ImportPdfInput( file_path=file_path, title=title, authors=authors, year=year, force=force, ) settings = get_settings() # ==================== STAGE 1: HASHED ==================== doc_id = compute_file_sha256(params.file_path) pdf_key = f"papers/{doc_id}.pdf" job_manager = IngestJobManager(doc_id) # 检查是否已存在完成的导入 existing_doc = query_one( "SELECT doc_id FROM documents WHERE doc_id = %s", (doc_id,) ) # 检查现有的作业状态(用于断点续传) latest_job = job_manager.get_latest_job() completed_stages: set[str] = set() resumed_from: str | None = None if existing_doc and not params.force: # 文档已存在且完成,检查是否有缺失的 embedding stats = query_one( """ SELECT COUNT(c.chunk_id) as chunks, COUNT(ce.chunk_id) as embedded FROM chunks c LEFT JOIN chunk_embeddings ce ON c.chunk_id = ce.chunk_id WHERE c.doc_id = %s """, (doc_id,) ) if stats and stats["chunks"] == stats["embedded"]: return ImportPdfResult( success=True, doc_id=doc_id, pdf_key=pdf_key, n_pages=0, n_chunks=stats["chunks"], embedded_chunks=stats["embedded"], empty_pages=0, skipped=True, message="Document already exists with complete embeddings. Use force=True to reimport.", ).model_dump() elif stats and stats["chunks"] > stats["embedded"]: # 有缺失的 embedding,可以继续 completed_stages = { IngestStage.HASHED.value, IngestStage.UPLOADED.value, IngestStage.EXTRACTED.value, IngestStage.CHUNKED.value, } resumed_from = IngestStage.CHUNKED.value elif latest_job and latest_job["status"] == IngestStatus.FAILED.value and not params.force: # 之前失败的作业,获取已完成的阶段 completed_stages = job_manager.get_completed_stages(latest_job["job_id"]) if completed_stages: # 找到最后完成的阶段 stage_order = [s.value for s in IngestStage] for stage in reversed(stage_order): if stage in completed_stages: resumed_from = stage break # 创建新的导入作业 job_manager.create_job() # 记录 HASHED 阶段完成 job_manager.update_stage( IngestStage.HASHED, IngestStatus.COMPLETED, f"SHA256: {doc_id}" ) # ==================== STAGE 2: UPLOADED ==================== if IngestStage.UPLOADED.value not in completed_stages: job_manager.update_stage(IngestStage.UPLOADED, IngestStatus.RUNNING) if not object_exists(pdf_key) or params.force: upload_file(params.file_path, pdf_key) job_manager.update_stage( IngestStage.UPLOADED, IngestStatus.COMPLETED, f"Uploaded to {pdf_key}" ) # ==================== STAGE 3: EXTRACTED ==================== pdf_result = None if IngestStage.EXTRACTED.value not in completed_stages: job_manager.update_stage(IngestStage.EXTRACTED, IngestStatus.RUNNING) pdf_result = extract_pdf(params.file_path) # 使用 PDF 元数据填充用户未指定的字段 if pdf_result.metadata: if not params.title and pdf_result.metadata.title: params.title = pdf_result.metadata.title if not params.authors and pdf_result.metadata.authors: params.authors = pdf_result.metadata.authors if not params.year and pdf_result.metadata.year: params.year = pdf_result.metadata.year job_manager.update_stage( IngestStage.EXTRACTED, IngestStatus.COMPLETED, f"Extracted {pdf_result.total_pages} pages, {pdf_result.empty_pages} empty" ) else: # 即使跳过也需要提取(用于后续分块) pdf_result = extract_pdf(params.file_path) # 使用 PDF 元数据填充用户未指定的字段 if pdf_result.metadata: if not params.title and pdf_result.metadata.title: params.title = pdf_result.metadata.title if not params.authors and pdf_result.metadata.authors: params.authors = pdf_result.metadata.authors if not params.year and pdf_result.metadata.year: params.year = pdf_result.metadata.year # ==================== STAGE 4: CHUNKED ==================== chunks = [] chunk_ids = [] if IngestStage.CHUNKED.value not in completed_stages: job_manager.update_stage(IngestStage.CHUNKED, IngestStatus.RUNNING) # 写入/更新 documents 表 with get_db() as conn: with conn.cursor() as cur: if existing_doc and params.force: # 删除旧的 chunks 和 embeddings(级联删除) cur.execute( "DELETE FROM chunks WHERE doc_id = %s", (doc_id,) ) # 更新 documents cur.execute( """ UPDATE documents SET title = %s, authors = %s, year = %s, pdf_key = %s, pdf_sha256 = %s, updated_at = now() WHERE doc_id = %s """, ( params.title, params.authors, params.year, pdf_key, doc_id, doc_id ) ) else: # 插入新文档 cur.execute( """ INSERT INTO documents (doc_id, title, authors, year, pdf_bucket, pdf_key, pdf_sha256) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (doc_id) DO UPDATE SET title = COALESCE(EXCLUDED.title, documents.title), authors = COALESCE(EXCLUDED.authors, documents.authors), year = COALESCE(EXCLUDED.year, documents.year), pdf_key = EXCLUDED.pdf_key, pdf_sha256 = EXCLUDED.pdf_sha256, updated_at = now() """, ( doc_id, params.title, params.authors, params.year, settings.s3_bucket, pdf_key, doc_id ) ) # 分块 pages = [(p.page_num, p.text) for p in pdf_result.pages if not p.is_empty] chunks = chunk_document(pages) if not chunks: job_manager.update_stage( IngestStage.CHUNKED, IngestStatus.COMPLETED, "No text content" ) job_manager.complete_job() return ImportPdfResult( success=True, doc_id=doc_id, job_id=job_manager.job_id, pdf_key=pdf_key, n_pages=pdf_result.total_pages, n_chunks=0, embedded_chunks=0, empty_pages=pdf_result.empty_pages, resumed_from=resumed_from, message="No text content extracted from PDF", ).model_dump() # 写入 chunks 表 with get_db() as conn: with conn.cursor() as cur: for chunk in chunks: cur.execute( """ INSERT INTO chunks (doc_id, chunk_index, page_start, page_end, text, token_count) VALUES (%s, %s, %s, %s, %s, %s) RETURNING chunk_id """, ( doc_id, chunk["chunk_index"], chunk["page_start"], chunk["page_end"], chunk["text"], chunk["token_count"], ) ) result = cur.fetchone() if result: chunk_ids.append(result["chunk_id"]) job_manager.update_stage( IngestStage.CHUNKED, IngestStatus.COMPLETED, f"Created {len(chunks)} chunks" ) else: # 已有 chunks,获取现有数据 existing_chunks = query_all( "SELECT chunk_id, text FROM chunks WHERE doc_id = %s ORDER BY chunk_index", (doc_id,) ) chunk_ids = [c["chunk_id"] for c in existing_chunks] chunks = [{"text": c["text"]} for c in existing_chunks] # ==================== STAGE 5: EMBEDDED ==================== embedded_count = 0 if IngestStage.EMBEDDED.value not in completed_stages: job_manager.update_stage(IngestStage.EMBEDDED, IngestStatus.RUNNING) # 检查哪些 chunks 缺少 embedding missing = query_all( """ SELECT c.chunk_id, c.text FROM chunks c LEFT JOIN chunk_embeddings ce ON c.chunk_id = ce.chunk_id WHERE c.doc_id = %s AND ce.chunk_id IS NULL ORDER BY c.chunk_index """, (doc_id,) ) if missing: missing_ids = [m["chunk_id"] for m in missing] missing_texts = [m["text"] for m in missing] # 生成 embeddings embeddings = await aget_embeddings_chunked(missing_texts) # 写入 chunk_embeddings 表 with get_db() as conn: with conn.cursor() as cur: for chunk_id, embedding in zip(missing_ids, embeddings): embedding_str = "[" + ",".join(str(x) for x in embedding) + "]" cur.execute( """ INSERT INTO chunk_embeddings (chunk_id, embedding_model, embedding) VALUES (%s, %s, %s::vector) ON CONFLICT (chunk_id) DO UPDATE SET embedding_model = EXCLUDED.embedding_model, embedding = EXCLUDED.embedding """, (chunk_id, settings.embedding_model, embedding_str) ) embedded_count += 1 # 获取总 embedding 数 total_embedded = query_one( """ SELECT COUNT(*) as count FROM chunk_embeddings ce JOIN chunks c ON ce.chunk_id = c.chunk_id WHERE c.doc_id = %s """, (doc_id,) ) embedded_count = total_embedded["count"] if total_embedded else embedded_count job_manager.update_stage( IngestStage.EMBEDDED, IngestStatus.COMPLETED, f"Generated {embedded_count} embeddings" ) else: # 获取现有 embedding 数 total_embedded = query_one( """ SELECT COUNT(*) as count FROM chunk_embeddings ce JOIN chunks c ON ce.chunk_id = c.chunk_id WHERE c.doc_id = %s """, (doc_id,) ) embedded_count = total_embedded["count"] if total_embedded else 0 # ==================== STAGE 6: COMMITTED ==================== job_manager.update_stage( IngestStage.COMMITTED, IngestStatus.COMPLETED, "Import completed" ) job_manager.complete_job() return ImportPdfResult( success=True, doc_id=doc_id, job_id=job_manager.job_id, pdf_key=pdf_key, n_pages=pdf_result.total_pages if pdf_result else 0, n_chunks=len(chunks) if chunks else len(chunk_ids), embedded_chunks=embedded_count, empty_pages=pdf_result.empty_pages if pdf_result else 0, resumed_from=resumed_from, message="Import completed successfully" + (f" (resumed from {resumed_from})" if resumed_from else ""), ).model_dump() except Exception as e: error_msg = str(e) if job_manager and job_manager.job_id: job_manager.fail_job(error_msg) return { "success": False, "error": error_msg, "doc_id": doc_id, "job_id": job_manager.job_id if job_manager else None, "pdf_key": pdf_key, "n_pages": 0, "n_chunks": 0, "embedded_chunks": 0, "empty_pages": 0, }
  • Pydantic schemas for ImportPdfInput (with file validation) and ImportPdfResult (output structure).
    class ImportPdfInput(BaseModel): """PDF 导入输入参数""" file_path: str title: str | None = None authors: str | None = None year: int | None = None force: bool = False @field_validator("file_path") @classmethod def validate_file_path(cls, v: str) -> str: path = Path(v) if not path.exists(): raise ValueError(f"File not found: {v}") if path.suffix.lower() != ".pdf": raise ValueError(f"Not a PDF file: {v}") return str(path.absolute()) class ImportPdfResult(BaseModel): """PDF 导入结果""" success: bool doc_id: str job_id: int | None = None pdf_key: str n_pages: int n_chunks: int embedded_chunks: int empty_pages: int skipped: bool = False resumed_from: str | None = None message: str = ""
  • Helper class managing ingest jobs, tracking stages, status updates, and resume capabilities for the import process.
    class IngestJobManager: """导入作业管理器""" def __init__(self, doc_id: str): self.doc_id = doc_id self.job_id: int | None = None def create_job(self) -> int: """创建新的导入作业""" with get_db() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO ingest_jobs (doc_id, status, current_stage) VALUES (%s, %s, NULL) RETURNING job_id """, (self.doc_id, IngestStatus.RUNNING.value) ) result = cur.fetchone() self.job_id = result["job_id"] return self.job_id def get_latest_job(self) -> dict[str, Any] | None: """获取最新的导入作业""" return query_one( """ SELECT job_id, status, current_stage, error FROM ingest_jobs WHERE doc_id = %s ORDER BY started_at DESC LIMIT 1 """, (self.doc_id,) ) def get_completed_stages(self, job_id: int) -> set[str]: """获取已完成的阶段""" items = query_all( """ SELECT stage FROM ingest_job_items WHERE job_id = %s AND status = %s """, (job_id, IngestStatus.COMPLETED.value) ) return {item["stage"] for item in items} def update_stage(self, stage: IngestStage, status: IngestStatus, message: str = ""): """更新阶段状态""" if not self.job_id: return with get_db() as conn: with conn.cursor() as cur: # 记录阶段详情 cur.execute( """ INSERT INTO ingest_job_items (job_id, stage, status, message) VALUES (%s, %s, %s, %s) """, (self.job_id, stage.value, status.value, message) ) # 更新作业当前阶段 if status == IngestStatus.COMPLETED: cur.execute( """ UPDATE ingest_jobs SET current_stage = %s WHERE job_id = %s """, (stage.value, self.job_id) ) def complete_job(self): """标记作业完成""" if not self.job_id: return with get_db() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE ingest_jobs SET status = %s, finished_at = now() WHERE job_id = %s """, (IngestStatus.COMPLETED.value, self.job_id) ) def fail_job(self, error: str): """标记作业失败""" if not self.job_id: return with get_db() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE ingest_jobs SET status = %s, finished_at = now(), error = %s WHERE job_id = %s """, (IngestStatus.FAILED.value, error, self.job_id) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server