Skip to main content
Glama
EMBEDDING_SYSTEM.md12.7 kB
# 文本嵌入与向量检索系统 本文档详细描述 Paperlib MCP 的文本嵌入生成和向量检索实现。 ## 系统概览 ```mermaid graph LR subgraph "输入层" PDF[PDF 文档] Query[搜索查询] end subgraph "处理层" Extract[PyMuPDF4LLM<br/>文本提取] Chunk[按页分块] Embed[OpenRouter API<br/>Embedding] end subgraph "存储层" PG[(PostgreSQL)] Vector[(pgvector<br/>HNSW 索引)] FTS[(tsvector<br/>GIN 索引)] end subgraph "检索层" VecSearch[向量相似度搜索] FTSSearch[全文搜索] Hybrid[RRF 融合排序] end PDF --> Extract --> Chunk --> Embed Embed --> Vector Chunk --> FTS Query --> Embed Embed --> VecSearch Query --> FTSSearch VecSearch --> Hybrid FTSSearch --> Hybrid ``` --- ## 1. 文本提取与分块 ### 1.1 PDF 文本提取 (`pdf_extract.py`) 使用 **PyMuPDF4LLM** 进行文本提取,优势: - 输出 LLM 优化的 Markdown 格式 - 智能表格检测和格式化 - 保留文档结构(标题、列表) - 多栏布局处理 ```python def extract_pdf( file_path: str | Path, *, table_strategy: str = "lines_strict", ignore_images: bool = True, ) -> PdfExtractResult: """从 PDF 文件提取文本""" page_data = pymupdf4llm.to_markdown( str(file_path), page_chunks=True, # 按页返回 table_strategy=table_strategy, ignore_images=ignore_images, ) # 返回 PdfExtractResult 包含所有页面 ``` **表格检测策略**: | 策略 | 说明 | 适用场景 | |------|------|---------| | `lines_strict` | 仅检测有可见线条的表格 | 推荐,精度高 | | `lines` | 检测有线条的表格(宽松) | 表格边框不完整 | | `text` | 基于文本对齐检测 | 无边框表格 | | `explicit` | 仅检测明确标记的表格 | 最保守 | ### 1.2 文本分块 (`chunking.py`) 采用 **按页分块** 策略,每页形成一个独立 chunk: ```python @dataclass class Chunk: chunk_index: int # 从 0 开始 page_start: int # 起始页码(从 1 开始) page_end: int # 结束页码(从 1 开始) text: str char_count: int @property def estimated_tokens(self) -> int: """估算 token 数(约 4 字符 = 1 token)""" return self.char_count // 4 ``` **分块特点**: - 保持页面语义完整性 - 便于引用定位(精确到页码) - 适合学术论文结构 **文本清理**: ```python def sanitize_text(text: str) -> str: """移除 null 字节等问题字符""" return text.replace('\x00', '') # PostgreSQL JSON 兼容 ``` --- ## 2. Embedding 生成 (`embeddings.py`) ### 2.1 API 配置 | 参数 | 默认值 | 说明 | |------|--------|------| | `embedding_model` | `openai/text-embedding-3-small` | 模型标识 | | `embedding_batch_size` | 64 | 批处理大小 | | `openrouter_base_url` | `https://openrouter.ai/api/v1` | API 端点 | ### 2.2 单条 Embedding ```python def get_embedding(text: str) -> list[float]: """获取单个文本的 embedding 向量 (1536 维)""" embeddings = get_embeddings_batch([text]) return embeddings[0] ``` ### 2.3 批量 Embedding ```python def get_embeddings_batch(texts: list[str]) -> list[list[float]]: """批量获取文本的 embedding 向量""" url = f"{settings.openrouter_base_url}/embeddings" headers = { "Authorization": f"Bearer {settings.openrouter_api_key}", "Content-Type": "application/json", } payload = { "model": settings.embedding_model, "input": texts, } with httpx.Client(timeout=60.0) as client: response = client.post(url, json=payload, headers=headers) response.raise_for_status() # 按 index 排序确保顺序 embeddings_data = sorted(data["data"], key=lambda x: x["index"]) return [item["embedding"] for item in embeddings_data] ``` ### 2.4 异步并发批处理 ```python async def aget_embeddings_chunked( texts: list[str], batch_size: int | None = None, concurrency: int = 5 ) -> list[list[float]]: """分批并行获取大量文本的 embedding 向量""" actual_batch_size = batch_size or settings.embedding_batch_size # 准备批次 batches = [texts[i:i + actual_batch_size] for i in range(0, len(texts), actual_batch_size)] # 并发控制 sem = asyncio.Semaphore(concurrency) results = [None] * len(batches) async def process_batch(idx, batch_texts): async with sem: embeddings = await aget_embeddings_batch(batch_texts) results[idx] = embeddings tasks = [process_batch(idx, b) for idx, b in enumerate(batches)] await asyncio.gather(*tasks) # 展平结果 return [e for batch in results if batch for e in batch] ``` **并发参数**: - `batch_size`: 每批文本数量(默认 64) - `concurrency`: 最大并发请求数(默认 5) --- ## 3. 数据库存储 ### 3.1 表结构 ```sql -- 文本块表 CREATE TABLE chunks ( chunk_id BIGSERIAL PRIMARY KEY, doc_id TEXT NOT NULL REFERENCES documents(doc_id), chunk_index INT NOT NULL, page_start INT, page_end INT, text TEXT NOT NULL, token_count INT, -- 全文搜索向量(自动生成) tsv TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', text)) STORED, UNIQUE (doc_id, chunk_index) ); -- Embedding 表 CREATE TABLE chunk_embeddings ( chunk_id BIGINT PRIMARY KEY REFERENCES chunks(chunk_id), embedding_model TEXT NOT NULL, embedding VECTOR(1536) NOT NULL -- pgvector 类型 ); ``` ### 3.2 索引策略 ```sql -- HNSW 向量索引(余弦相似度) CREATE INDEX chunk_emb_hnsw_cos ON chunk_embeddings USING hnsw (embedding vector_cosine_ops); -- GIN 全文索引 CREATE INDEX chunks_tsv_gin ON chunks USING GIN (tsv); -- 业务索引 CREATE INDEX chunks_doc_idx ON chunks(doc_id); ``` **HNSW 索引参数**: - `m`: 最大连接数(默认 16) - `ef_construction`: 构建时搜索深度(默认 64) - 适合百万级向量的 ANN 搜索 --- ## 4. 混合检索 (`search.py`) ### 4.1 检索架构 ```mermaid flowchart TB Query[查询文本] --> Split{并行执行} Split --> FTS[全文搜索<br/>PostgreSQL FTS] Split --> Vec[向量搜索<br/>pgvector HNSW] FTS --> FTS_Norm[BM25 分数归一化] Vec --> Vec_Norm[余弦距离归一化] FTS_Norm --> RRF[RRF 融合排序<br/>k=60] Vec_Norm --> RRF RRF --> PerDoc[Per-doc 限制] PerDoc --> TopK[Top-K 结果] ``` ### 4.2 全文搜索 ```python def search_fts(query: str, limit: int = 50) -> list[dict]: """PostgreSQL 全文搜索""" sql = """ SELECT c.chunk_id, c.doc_id, c.page_start, c.page_end, LEFT(c.text, 500) AS text, ts_rank_cd(c.tsv, plainto_tsquery('english', %s)) AS rank FROM chunks c WHERE c.tsv @@ plainto_tsquery('english', %s) ORDER BY rank DESC LIMIT %s """ return query_all(sql, (query, query, limit)) ``` **FTS 特点**: - 使用 PostgreSQL 内置 `tsvector` 和 `tsquery` - `ts_rank_cd` 计算覆盖密度排名 - 支持布尔运算符(AND, OR, NOT) ### 4.3 向量搜索 ```python def search_vector(query_embedding: list[float], limit: int = 50) -> list[dict]: """pgvector 向量搜索""" sql = """ SELECT c.chunk_id, c.doc_id, c.page_start, c.page_end, LEFT(c.text, 500) AS text, 1 - (ce.embedding <=> %s::vector) AS similarity FROM chunk_embeddings ce JOIN chunks c ON c.chunk_id = ce.chunk_id ORDER BY ce.embedding <=> %s::vector LIMIT %s """ embedding_str = "[" + ",".join(map(str, query_embedding)) + "]" return query_all(sql, (embedding_str, embedding_str, limit)) ``` **向量搜索特点**: - 使用 `<=>` 运算符计算余弦距离 - HNSW 索引提供近似最近邻搜索 - 相似度 = 1 - 余弦距离 ### 4.4 混合搜索融合 ```python def hybrid_search( query: str, k: int = 10, alpha: float = 0.6, # 向量权重 fts_topn: int = 50, vec_topn: int = 50, per_doc_limit: int | None = None, ) -> SearchResponse: """混合搜索(FTS + 向量)""" # 并行执行 FTS 和向量搜索 fts_results = search_fts(query, fts_topn) query_embedding = get_embedding(query) vec_results = search_vector(query_embedding, vec_topn) # 分数归一化 fts_scores = normalize_scores(fts_results, "rank") vec_scores = normalize_scores(vec_results, "similarity") # RRF 融合 combined = {} for chunk_id, score in fts_scores.items(): combined[chunk_id] = (1 - alpha) * score for chunk_id, score in vec_scores.items(): combined[chunk_id] = combined.get(chunk_id, 0) + alpha * score # 排序和截取 sorted_results = sorted(combined.items(), key=lambda x: x[1], reverse=True) # 应用 per_doc_limit if per_doc_limit: results = apply_per_doc_limit(sorted_results, per_doc_limit) return results[:k] ``` **参数说明**: | 参数 | 默认值 | 说明 | |------|--------|------| | `k` | 10 | 返回结果数量 | | `alpha` | 0.6 | 向量搜索权重 (0-1) | | `fts_topn` | 50 | FTS 候选数量 | | `vec_topn` | 50 | 向量候选数量 | | `per_doc_limit` | 3 | 每文档最大 chunk 数 | **Alpha 选择建议**: - `alpha=0.8`: 偏重语义相似(适合概念性查询) - `alpha=0.6`: 平衡(推荐默认值) - `alpha=0.4`: 偏重关键词匹配(适合精确术语搜索) ### 4.5 Per-doc 限制 ```python def apply_per_doc_limit( results: list[SearchResult], per_doc_limit: int | None, ) -> list[SearchResult]: """每文档最多返回 N 个 chunk(避免单篇论文刷屏)""" if not per_doc_limit: return results doc_counts = defaultdict(int) filtered = [] for result in results: if doc_counts[result.doc_id] < per_doc_limit: filtered.append(result) doc_counts[result.doc_id] += 1 return filtered ``` --- ## 5. 搜索解释工具 ```python def explain_search(query: str, ...) -> ExplainSearchResponse: """详细搜索解释(用于调试)""" return { "final_results": [...], # 最终 Top-K "fts_only_hits": [...], # 仅 FTS 命中 "vec_only_hits": [...], # 仅向量命中 "intersection_hits": [...], # 两者都命中 "stats": { "fts_candidates": 50, "vec_candidates": 50, "intersection_count": 15, } } ``` --- ## 6. 性能优化 ### 6.1 Embedding 缓存 - 文档导入时一次性生成所有 chunk embeddings - 存储在 `chunk_embeddings` 表中复用 - 搜索时只需生成查询 embedding ### 6.2 批量处理 ```python # PDF 导入时的批量 embedding embeddings = await aget_embeddings_chunked( texts=[chunk["text"] for chunk in chunks], batch_size=64, concurrency=5 ) ``` ### 6.3 索引优化 ```sql -- 调整 HNSW 参数(构建时间 vs 精度权衡) CREATE INDEX chunk_emb_hnsw_cos ON chunk_embeddings USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- 搜索时设置 ef_search SET hnsw.ef_search = 100; ``` ### 6.4 内存估算 | 数据规模 | 文档数 | Chunks | Embedding 存储 | |----------|--------|--------|---------------| | 小型 | 100 | ~2000 | ~12 MB | | 中型 | 500 | ~10000 | ~60 MB | | 大型 | 2000 | ~40000 | ~240 MB | *按每 chunk 1536 维 × 4 字节 ≈ 6KB 估算* --- ## 7. 常见问题 ### Q1: 为什么使用 OpenRouter 而不是直接调用 OpenAI? A: OpenRouter 提供统一接口访问多种模型,便于切换和比较不同 embedding 模型。 ### Q2: 如何更换 Embedding 模型? A: 修改 `.env` 中的 `EMBEDDING_MODEL`,然后使用 `reembed_document` 工具重新生成 embeddings。 ```bash # .env EMBEDDING_MODEL=openai/text-embedding-3-large ``` ### Q3: 向量维度不匹配怎么办? A: 需要重建 `chunk_embeddings` 表和索引: ```sql ALTER TABLE chunk_embeddings ALTER COLUMN embedding TYPE vector(3072); DROP INDEX chunk_emb_hnsw_cos; CREATE INDEX chunk_emb_hnsw_cos ON chunk_embeddings USING hnsw (embedding vector_cosine_ops); ``` ### Q4: 搜索结果不理想怎么调优? A: 使用 `explain_search` 工具分析: - 如果 FTS 命中多但向量命中少 → 降低 alpha - 如果向量命中多但 FTS 命中少 → 提高 alpha - 如果交集很小 → 检查查询词是否在文档中出现

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paperlib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server