Skip to main content
Glama

build_section_evidence_pack_v1

Generate reproducible evidence packages for academic sections by compiling relevant document chunks based on outline and section IDs.

Instructions

构建章节证据包

为指定章节生成固定的证据包(可复现)。

Args: outline_id: 大纲 ID section_id: 章节 ID max_chunks: 最大 chunk 数量,默认 60 per_doc_limit: 每篇文档最多 chunk 数,默认 4 rebuild: 是否重建,默认 False

Returns: pack_id, chunk_count, doc_count

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
outline_idYes
section_idYes
max_chunksNo
per_doc_limitNo
rebuildNo

Implementation Reference

  • The core handler function for the 'build_section_evidence_pack_v1' tool. It builds a reproducible evidence pack for a specific section by selecting relevant chunks based on entity types, claims, or keywords from the database, applying limits, and storing in evidence_packs table.
    @mcp.tool()
    def build_section_evidence_pack_v1(
        outline_id: str,
        section_id: str,
        max_chunks: int = 60,
        per_doc_limit: int = 4,
        rebuild: bool = False,
    ) -> dict[str, Any]:
        """构建章节证据包
    
        为指定章节生成固定的证据包(可复现)。
    
        Args:
            outline_id: 大纲 ID
            section_id: 章节 ID
            max_chunks: 最大 chunk 数量,默认 60
            per_doc_limit: 每篇文档最多 chunk 数,默认 4
            rebuild: 是否重建,默认 False
    
        Returns:
            pack_id, chunk_count, doc_count
        """
        try:
            # 检查缓存
            if not rebuild:
                cached = query_one(
                    """
                    SELECT pack_id, params, created_at::text
                    FROM review_section_packs
                    WHERE outline_id = %s AND section_id = %s
                    """,
                    (outline_id, section_id),
                )
                if cached:
                    # 获取 pack 统计
                    stats = query_one(
                        """
                        SELECT COUNT(*) as chunk_count, COUNT(DISTINCT doc_id) as doc_count
                        FROM evidence_pack_items
                        WHERE pack_id = %s
                        """,
                        (cached["pack_id"],),
                    )
                    return {
                        "pack_id": cached["pack_id"],
                        "section_id": section_id,
                        "chunk_count": stats["chunk_count"] if stats else 0,
                        "doc_count": stats["doc_count"] if stats else 0,
                        "params": cached["params"],
                        "created_at": cached["created_at"],
                        "reused": True,
                    }
    
            # 获取 outline 和 section 信息
            outline = query_one(
                "SELECT topic, sources_json FROM review_outlines WHERE outline_id = %s",
                (outline_id,),
            )
            if not outline:
                return {"error": f"Outline not found: {outline_id}"}
    
            section = query_one(
                """
                SELECT section_id, title, sources_json, keywords
                FROM review_outline_sections
                WHERE outline_id = %s AND section_id = %s
                """,
                (outline_id, section_id),
            )
            if not section:
                return {"error": f"Section not found: {section_id}"}
    
            # 根据 section 类型选择 chunks
            outline_sources = outline["sources_json"] or {}
            section_sources = section["sources_json"] or {}
            entity_types = section_sources.get("entity_types", [])
            keywords = section["keywords"] or []
    
            # 候选 chunk_ids
            candidate_chunks = []
    
            # 策略 1: 从实体类型对应的 mentions 获取 chunks
            if entity_types:
                entity_chunks = query_all(
                    """
                    SELECT DISTINCT m.chunk_id, m.doc_id, c.page_start, c.page_end
                    FROM mentions m
                    JOIN entities e ON m.entity_id = e.entity_id
                    JOIN chunks c ON m.chunk_id = c.chunk_id
                    WHERE e.type = ANY(%s)
                    ORDER BY m.chunk_id
                    LIMIT %s
                    """,
                    (entity_types, max_chunks * 2),
                )
                candidate_chunks.extend(entity_chunks)
    
            # 策略 2: 对于 findings/debates,使用 claims
            if section_id in ("findings", "debates"):
                if section_id == "findings":
                    # 高置信度 claims - 使用子查询避免 DISTINCT + ORDER BY 冲突
                    claim_chunks = query_all(
                        """
                        SELECT chunk_id, doc_id, page_start, page_end FROM (
                            SELECT DISTINCT ON (c.chunk_id) 
                                c.chunk_id, c.doc_id, ch.page_start, ch.page_end, c.confidence
                            FROM claims c
                            JOIN chunks ch ON c.chunk_id = ch.chunk_id
                            WHERE c.confidence >= 0.7
                            ORDER BY c.chunk_id, c.confidence DESC
                        ) sub
                        ORDER BY confidence DESC
                        LIMIT %s
                        """,
                        (max_chunks * 2,),
                    )
                else:  # debates - 找冲突 sign
                    claim_chunks = query_all(
                        """
                        SELECT DISTINCT c.chunk_id, c.doc_id, ch.page_start, ch.page_end
                        FROM claims c
                        JOIN chunks ch ON c.chunk_id = ch.chunk_id
                        WHERE c.sign IN ('positive', 'negative', 'mixed')
                        ORDER BY c.chunk_id
                        LIMIT %s
                        """,
                        (max_chunks * 2,),
                    )
                candidate_chunks.extend(claim_chunks)
    
            # 策略 3: 使用关键词搜索
            if keywords and len(candidate_chunks) < max_chunks:
                keyword_pattern = "|".join(keywords)
                keyword_chunks = query_all(
                    """
                    SELECT chunk_id, doc_id, page_start, page_end
                    FROM chunks
                    WHERE text ~* %s
                    LIMIT %s
                    """,
                    (keyword_pattern, max_chunks * 2),
                )
                candidate_chunks.extend(keyword_chunks)
    
            # 去重
            seen = set()
            unique_chunks = []
            for chunk in candidate_chunks:
                if chunk["chunk_id"] not in seen:
                    seen.add(chunk["chunk_id"])
                    unique_chunks.append(chunk)
    
            # 应用 per_doc_limit
            doc_counts: dict[str, int] = defaultdict(int)
            filtered_chunks = []
            for chunk in unique_chunks:
                doc_id = chunk["doc_id"]
                if doc_counts[doc_id] < per_doc_limit:
                    filtered_chunks.append(chunk)
                    doc_counts[doc_id] += 1
                if len(filtered_chunks) >= max_chunks:
                    break
    
            # 创建 evidence pack
            params = {
                "max_chunks": max_chunks,
                "per_doc_limit": per_doc_limit,
                "section_id": section_id,
            }
    
            with get_db() as conn:
                with conn.cursor() as cur:
                    # 创建 pack
                    cur.execute(
                        """
                        INSERT INTO evidence_packs (query, params_json)
                        VALUES (%s, %s)
                        RETURNING pack_id
                        """,
                        (f"section:{section_id}:{outline_id}", json.dumps(params)),
                    )
                    pack_id = cur.fetchone()["pack_id"]
    
                    # 插入 items
                    for rank, chunk in enumerate(filtered_chunks):
                        cur.execute(
                            """
                            INSERT INTO evidence_pack_items (pack_id, doc_id, chunk_id, rank)
                            VALUES (%s, %s, %s, %s)
                            """,
                            (pack_id, chunk["doc_id"], chunk["chunk_id"], rank),
                        )
    
                    # 删除旧缓存
                    cur.execute(
                        "DELETE FROM review_section_packs WHERE outline_id = %s AND section_id = %s",
                        (outline_id, section_id),
                    )
    
                    # 缓存新映射
                    cur.execute(
                        """
                        INSERT INTO review_section_packs (outline_id, section_id, pack_id, params)
                        VALUES (%s, %s, %s, %s)
                        """,
                        (outline_id, section_id, pack_id, json.dumps(params)),
                    )
    
            return {
                "pack_id": pack_id,
                "section_id": section_id,
                "chunk_count": len(filtered_chunks),
                "doc_count": len(doc_counts),
                "params": params,
                "reused": False,
            }
    
        except Exception as e:
            return {"error": str(e)}
  • The registration point where review tools, including 'build_section_evidence_pack_v1', are registered to the MCP server instance by calling register_review_tools(mcp). The actual tool definitions with @mcp.tool() decorators are inside register_review_tools in review.py.
    register_review_tools(mcp)
  • The function that defines and registers all review tools using @mcp.tool() decorators when called.
    def register_review_tools(mcp: FastMCP) -> None:
        """注册 M3 综述工具"""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server