Skip to main content
Glama

import_pdf

Add PDF academic papers to your knowledge base by specifying file path, title, authors, and publication year for organized literature management.

Instructions

导入 PDF 文献到知识库

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathYes
titleNo
authorsNo
yearNo
forceNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Registration of import_pdf tool via register_import_tools call in the main MCP server initialization.
    register_import_tools(mcp)
  • The FastMCP tool handler for 'import_pdf', decorated with @mcp.tool(). Delegates to the core import_pdf_run implementation.
    @mcp.tool()
    async def import_pdf(
        file_path: str,
        title: str | None = None,
        authors: str | None = None,
        year: int | None = None,
        force: bool = False,
    ) -> dict[str, Any]:
        """导入 PDF 文献到知识库"""
        return await import_pdf_run(
            file_path=file_path,
            title=title,
            authors=authors,
            year=year,
            force=force,
        )
  • Core implementation of PDF import logic using a 6-stage state machine with job management, supporting resume and force options.
    async def import_pdf_run(
        file_path: str,
        title: str | None = None,
        authors: str | None = None,
        year: int | None = None,
        force: bool = False,
    ) -> dict[str, Any]:
        """导入 PDF 文献到知识库 (Core Implementation)"""
        job_manager = None
        doc_id = ""
        pdf_key = ""
        
        try:
            # 1. 验证输入参数
            params = ImportPdfInput(
                file_path=file_path,
                title=title,
                authors=authors,
                year=year,
                force=force,
            )
            
            settings = get_settings()
            
            # ==================== STAGE 1: HASHED ====================
            doc_id = compute_file_sha256(params.file_path)
            pdf_key = f"papers/{doc_id}.pdf"
            
            job_manager = IngestJobManager(doc_id)
            
            # 检查是否已存在完成的导入
            existing_doc = query_one(
                "SELECT doc_id FROM documents WHERE doc_id = %s",
                (doc_id,)
            )
            
            # 检查现有的作业状态(用于断点续传)
            latest_job = job_manager.get_latest_job()
            completed_stages: set[str] = set()
            resumed_from: str | None = None
            
            if existing_doc and not params.force:
                # 文档已存在且完成,检查是否有缺失的 embedding
                stats = query_one(
                    """
                    SELECT COUNT(c.chunk_id) as chunks, COUNT(ce.chunk_id) as embedded
                    FROM chunks c
                    LEFT JOIN chunk_embeddings ce ON c.chunk_id = ce.chunk_id
                    WHERE c.doc_id = %s
                    """,
                    (doc_id,)
                )
                
                if stats and stats["chunks"] == stats["embedded"]:
                    return ImportPdfResult(
                        success=True,
                        doc_id=doc_id,
                        pdf_key=pdf_key,
                        n_pages=0,
                        n_chunks=stats["chunks"],
                        embedded_chunks=stats["embedded"],
                        empty_pages=0,
                        skipped=True,
                        message="Document already exists with complete embeddings. Use force=True to reimport.",
                    ).model_dump()
                elif stats and stats["chunks"] > stats["embedded"]:
                    # 有缺失的 embedding,可以继续
                    completed_stages = {
                        IngestStage.HASHED.value,
                        IngestStage.UPLOADED.value,
                        IngestStage.EXTRACTED.value,
                        IngestStage.CHUNKED.value,
                    }
                    resumed_from = IngestStage.CHUNKED.value
            
            elif latest_job and latest_job["status"] == IngestStatus.FAILED.value and not params.force:
                # 之前失败的作业,获取已完成的阶段
                completed_stages = job_manager.get_completed_stages(latest_job["job_id"])
                if completed_stages:
                    # 找到最后完成的阶段
                    stage_order = [s.value for s in IngestStage]
                    for stage in reversed(stage_order):
                        if stage in completed_stages:
                            resumed_from = stage
                            break
            
            # 创建新的导入作业
            job_manager.create_job()
            
            # 记录 HASHED 阶段完成
            job_manager.update_stage(
                IngestStage.HASHED,
                IngestStatus.COMPLETED,
                f"SHA256: {doc_id}"
            )
            
            # ==================== STAGE 2: UPLOADED ====================
            if IngestStage.UPLOADED.value not in completed_stages:
                job_manager.update_stage(IngestStage.UPLOADED, IngestStatus.RUNNING)
                
                if not object_exists(pdf_key) or params.force:
                    upload_file(params.file_path, pdf_key)
                
                job_manager.update_stage(
                    IngestStage.UPLOADED,
                    IngestStatus.COMPLETED,
                    f"Uploaded to {pdf_key}"
                )
            
            # ==================== STAGE 3: EXTRACTED ====================
            pdf_result = None
            if IngestStage.EXTRACTED.value not in completed_stages:
                job_manager.update_stage(IngestStage.EXTRACTED, IngestStatus.RUNNING)
                
                pdf_result = extract_pdf(params.file_path)
                
                # 使用 PDF 元数据填充用户未指定的字段
                if pdf_result.metadata:
                    if not params.title and pdf_result.metadata.title:
                        params.title = pdf_result.metadata.title
                    if not params.authors and pdf_result.metadata.authors:
                        params.authors = pdf_result.metadata.authors
                    if not params.year and pdf_result.metadata.year:
                        params.year = pdf_result.metadata.year
                
                job_manager.update_stage(
                    IngestStage.EXTRACTED,
                    IngestStatus.COMPLETED,
                    f"Extracted {pdf_result.total_pages} pages, {pdf_result.empty_pages} empty"
                )
            else:
                # 即使跳过也需要提取(用于后续分块)
                pdf_result = extract_pdf(params.file_path)
                
                # 使用 PDF 元数据填充用户未指定的字段
                if pdf_result.metadata:
                    if not params.title and pdf_result.metadata.title:
                        params.title = pdf_result.metadata.title
                    if not params.authors and pdf_result.metadata.authors:
                        params.authors = pdf_result.metadata.authors
                    if not params.year and pdf_result.metadata.year:
                        params.year = pdf_result.metadata.year
            
            # ==================== STAGE 4: CHUNKED ====================
            chunks = []
            chunk_ids = []
            
            if IngestStage.CHUNKED.value not in completed_stages:
                job_manager.update_stage(IngestStage.CHUNKED, IngestStatus.RUNNING)
                
                # 写入/更新 documents 表
                with get_db() as conn:
                    with conn.cursor() as cur:
                        if existing_doc and params.force:
                            # 删除旧的 chunks 和 embeddings(级联删除)
                            cur.execute(
                                "DELETE FROM chunks WHERE doc_id = %s",
                                (doc_id,)
                            )
                            # 更新 documents
                            cur.execute(
                                """
                                UPDATE documents 
                                SET title = %s, authors = %s, year = %s, 
                                    pdf_key = %s, pdf_sha256 = %s, updated_at = now()
                                WHERE doc_id = %s
                                """,
                                (
                                    params.title, params.authors, params.year,
                                    pdf_key, doc_id, doc_id
                                )
                            )
                        else:
                            # 插入新文档
                            cur.execute(
                                """
                                INSERT INTO documents (doc_id, title, authors, year, pdf_bucket, pdf_key, pdf_sha256)
                                VALUES (%s, %s, %s, %s, %s, %s, %s)
                                ON CONFLICT (doc_id) DO UPDATE SET
                                    title = COALESCE(EXCLUDED.title, documents.title),
                                    authors = COALESCE(EXCLUDED.authors, documents.authors),
                                    year = COALESCE(EXCLUDED.year, documents.year),
                                    pdf_key = EXCLUDED.pdf_key,
                                    pdf_sha256 = EXCLUDED.pdf_sha256,
                                    updated_at = now()
                                """,
                                (
                                    doc_id, params.title, params.authors, params.year,
                                    settings.s3_bucket, pdf_key, doc_id
                                )
                            )
                
                # 分块
                pages = [(p.page_num, p.text) for p in pdf_result.pages if not p.is_empty]
                chunks = chunk_document(pages)
                
                if not chunks:
                    job_manager.update_stage(
                        IngestStage.CHUNKED,
                        IngestStatus.COMPLETED,
                        "No text content"
                    )
                    job_manager.complete_job()
                    
                    return ImportPdfResult(
                        success=True,
                        doc_id=doc_id,
                        job_id=job_manager.job_id,
                        pdf_key=pdf_key,
                        n_pages=pdf_result.total_pages,
                        n_chunks=0,
                        embedded_chunks=0,
                        empty_pages=pdf_result.empty_pages,
                        resumed_from=resumed_from,
                        message="No text content extracted from PDF",
                    ).model_dump()
                
                # 写入 chunks 表
                with get_db() as conn:
                    with conn.cursor() as cur:
                        for chunk in chunks:
                            cur.execute(
                                """
                                INSERT INTO chunks (doc_id, chunk_index, page_start, page_end, text, token_count)
                                VALUES (%s, %s, %s, %s, %s, %s)
                                RETURNING chunk_id
                                """,
                                (
                                    doc_id,
                                    chunk["chunk_index"],
                                    chunk["page_start"],
                                    chunk["page_end"],
                                    chunk["text"],
                                    chunk["token_count"],
                                )
                            )
                            result = cur.fetchone()
                            if result:
                                chunk_ids.append(result["chunk_id"])
                
                job_manager.update_stage(
                    IngestStage.CHUNKED,
                    IngestStatus.COMPLETED,
                    f"Created {len(chunks)} chunks"
                )
            else:
                # 已有 chunks,获取现有数据
                existing_chunks = query_all(
                    "SELECT chunk_id, text FROM chunks WHERE doc_id = %s ORDER BY chunk_index",
                    (doc_id,)
                )
                chunk_ids = [c["chunk_id"] for c in existing_chunks]
                chunks = [{"text": c["text"]} for c in existing_chunks]
            
            # ==================== STAGE 5: EMBEDDED ====================
            embedded_count = 0
            
            if IngestStage.EMBEDDED.value not in completed_stages:
                job_manager.update_stage(IngestStage.EMBEDDED, IngestStatus.RUNNING)
                
                # 检查哪些 chunks 缺少 embedding
                missing = query_all(
                    """
                    SELECT c.chunk_id, c.text
                    FROM chunks c
                    LEFT JOIN chunk_embeddings ce ON c.chunk_id = ce.chunk_id
                    WHERE c.doc_id = %s AND ce.chunk_id IS NULL
                    ORDER BY c.chunk_index
                    """,
                    (doc_id,)
                )
                
                if missing:
                    missing_ids = [m["chunk_id"] for m in missing]
                    missing_texts = [m["text"] for m in missing]
                    
                    # 生成 embeddings
                    embeddings = await aget_embeddings_chunked(missing_texts)
                    
                    # 写入 chunk_embeddings 表
                    with get_db() as conn:
                        with conn.cursor() as cur:
                            for chunk_id, embedding in zip(missing_ids, embeddings):
                                embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
                                cur.execute(
                                    """
                                    INSERT INTO chunk_embeddings (chunk_id, embedding_model, embedding)
                                    VALUES (%s, %s, %s::vector)
                                    ON CONFLICT (chunk_id) DO UPDATE SET
                                        embedding_model = EXCLUDED.embedding_model,
                                        embedding = EXCLUDED.embedding
                                    """,
                                    (chunk_id, settings.embedding_model, embedding_str)
                                )
                                embedded_count += 1
                
                # 获取总 embedding 数
                total_embedded = query_one(
                    """
                    SELECT COUNT(*) as count
                    FROM chunk_embeddings ce
                    JOIN chunks c ON ce.chunk_id = c.chunk_id
                    WHERE c.doc_id = %s
                    """,
                    (doc_id,)
                )
                embedded_count = total_embedded["count"] if total_embedded else embedded_count
                
                job_manager.update_stage(
                    IngestStage.EMBEDDED,
                    IngestStatus.COMPLETED,
                    f"Generated {embedded_count} embeddings"
                )
            else:
                # 获取现有 embedding 数
                total_embedded = query_one(
                    """
                    SELECT COUNT(*) as count
                    FROM chunk_embeddings ce
                    JOIN chunks c ON ce.chunk_id = c.chunk_id
                    WHERE c.doc_id = %s
                    """,
                    (doc_id,)
                )
                embedded_count = total_embedded["count"] if total_embedded else 0
            
            # ==================== STAGE 6: COMMITTED ====================
            job_manager.update_stage(
                IngestStage.COMMITTED,
                IngestStatus.COMPLETED,
                "Import completed"
            )
            job_manager.complete_job()
            
            return ImportPdfResult(
                success=True,
                doc_id=doc_id,
                job_id=job_manager.job_id,
                pdf_key=pdf_key,
                n_pages=pdf_result.total_pages if pdf_result else 0,
                n_chunks=len(chunks) if chunks else len(chunk_ids),
                embedded_chunks=embedded_count,
                empty_pages=pdf_result.empty_pages if pdf_result else 0,
                resumed_from=resumed_from,
                message="Import completed successfully" + (f" (resumed from {resumed_from})" if resumed_from else ""),
            ).model_dump()
            
        except Exception as e:
            error_msg = str(e)
            if job_manager and job_manager.job_id:
                job_manager.fail_job(error_msg)
            
            return {
                "success": False,
                "error": error_msg,
                "doc_id": doc_id,
                "job_id": job_manager.job_id if job_manager else None,
                "pdf_key": pdf_key,
                "n_pages": 0,
                "n_chunks": 0,
                "embedded_chunks": 0,
                "empty_pages": 0,
            }
  • Pydantic schemas for ImportPdfInput (with file validation) and ImportPdfResult (output structure).
    class ImportPdfInput(BaseModel):
        """PDF 导入输入参数"""
        file_path: str
        title: str | None = None
        authors: str | None = None
        year: int | None = None
        force: bool = False
        
        @field_validator("file_path")
        @classmethod
        def validate_file_path(cls, v: str) -> str:
            path = Path(v)
            if not path.exists():
                raise ValueError(f"File not found: {v}")
            if path.suffix.lower() != ".pdf":
                raise ValueError(f"Not a PDF file: {v}")
            return str(path.absolute())
    
    
    class ImportPdfResult(BaseModel):
        """PDF 导入结果"""
        success: bool
        doc_id: str
        job_id: int | None = None
        pdf_key: str
        n_pages: int
        n_chunks: int
        embedded_chunks: int
        empty_pages: int
        skipped: bool = False
        resumed_from: str | None = None
        message: str = ""
  • Helper class managing ingest jobs, tracking stages, status updates, and resume capabilities for the import process.
    class IngestJobManager:
        """导入作业管理器"""
        
        def __init__(self, doc_id: str):
            self.doc_id = doc_id
            self.job_id: int | None = None
        
        def create_job(self) -> int:
            """创建新的导入作业"""
            with get_db() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        INSERT INTO ingest_jobs (doc_id, status, current_stage)
                        VALUES (%s, %s, NULL)
                        RETURNING job_id
                        """,
                        (self.doc_id, IngestStatus.RUNNING.value)
                    )
                    result = cur.fetchone()
                    self.job_id = result["job_id"]
                    return self.job_id
        
        def get_latest_job(self) -> dict[str, Any] | None:
            """获取最新的导入作业"""
            return query_one(
                """
                SELECT job_id, status, current_stage, error
                FROM ingest_jobs
                WHERE doc_id = %s
                ORDER BY started_at DESC
                LIMIT 1
                """,
                (self.doc_id,)
            )
        
        def get_completed_stages(self, job_id: int) -> set[str]:
            """获取已完成的阶段"""
            items = query_all(
                """
                SELECT stage FROM ingest_job_items
                WHERE job_id = %s AND status = %s
                """,
                (job_id, IngestStatus.COMPLETED.value)
            )
            return {item["stage"] for item in items}
        
        def update_stage(self, stage: IngestStage, status: IngestStatus, message: str = ""):
            """更新阶段状态"""
            if not self.job_id:
                return
            
            with get_db() as conn:
                with conn.cursor() as cur:
                    # 记录阶段详情
                    cur.execute(
                        """
                        INSERT INTO ingest_job_items (job_id, stage, status, message)
                        VALUES (%s, %s, %s, %s)
                        """,
                        (self.job_id, stage.value, status.value, message)
                    )
                    
                    # 更新作业当前阶段
                    if status == IngestStatus.COMPLETED:
                        cur.execute(
                            """
                            UPDATE ingest_jobs 
                            SET current_stage = %s
                            WHERE job_id = %s
                            """,
                            (stage.value, self.job_id)
                        )
        
        def complete_job(self):
            """标记作业完成"""
            if not self.job_id:
                return
            
            with get_db() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        UPDATE ingest_jobs 
                        SET status = %s, finished_at = now()
                        WHERE job_id = %s
                        """,
                        (IngestStatus.COMPLETED.value, self.job_id)
                    )
        
        def fail_job(self, error: str):
            """标记作业失败"""
            if not self.job_id:
                return
            
            with get_db() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        UPDATE ingest_jobs 
                        SET status = %s, finished_at = now(), error = %s
                        WHERE job_id = %s
                        """,
                        (IngestStatus.FAILED.value, error, self.job_id)
                    )
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden. It states 'import' but doesn't disclose behavioral traits such as whether this is a write operation, what permissions are needed, if it overwrites existing documents, or how errors are handled. This is inadequate for a tool that likely modifies the knowledge base.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence in Chinese with no wasted words. It's front-loaded and appropriately sized for a basic tool, though it could benefit from more detail without losing conciseness.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool has 5 parameters with 0% schema coverage, no annotations, and likely performs a write operation (importing into a knowledge base), the description is incomplete. It doesn't cover parameter meanings, behavioral context, or usage guidelines. An output schema exists, but the description doesn't hint at what the tool returns, making it insufficient for effective use.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must compensate. It mentions 'PDF 文献' (PDF literature) but doesn't explain any parameters like 'file_path', 'title', 'authors', 'year', or 'force'. Without this, users won't know what inputs are expected or their purposes, leaving significant gaps.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description '导入 PDF 文献到知识库' (Import PDF literature into knowledge base) states the verb (import) and resource (PDF literature), but it's vague about what 'import' entails—whether it's uploading, parsing, indexing, or storing. It doesn't distinguish from siblings like 'delete_document' or 'get_document', which is a missed opportunity for clarity.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives. For example, it doesn't mention prerequisites like having a PDF file available, or how it differs from other document-related tools like 'rechunk_document' or 'update_document'. The description lacks any context for usage.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server