MCP Code Indexer

""" 嵌入向量工具模块 提供嵌入向量生成、处理和比较的工具函数 """ import numpy as np import logging from typing import List, Dict, Any, Optional, Union, Tuple import math from concurrent.futures import ThreadPoolExecutor import os logger = logging.getLogger(__name__) try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False logger.warning("sentence-transformers 库未安装,某些嵌入功能将不可用") def create_embeddings(texts: List[str], model_name: str = "sentence-transformers/all-MiniLM-L6-v2", batch_size: int = 32) -> Optional[np.ndarray]: """ 使用指定模型为文本创建嵌入向量 Args: texts: 文本列表 model_name: 嵌入模型名称 batch_size: 批处理大小 Returns: 嵌入向量数组,如果生成失败则返回None """ if not SENTENCE_TRANSFORMERS_AVAILABLE: logger.error("无法创建嵌入:sentence-transformers 库未安装") return None if not texts: return np.array([]) try: model = SentenceTransformer(model_name) embeddings = model.encode(texts, batch_size=batch_size, show_progress_bar=False) return embeddings except Exception as e: logger.error(f"创建嵌入向量失败: {str(e)}") return None def batch_encode_texts(texts: List[str], model: Any, batch_size: int = 32) -> np.ndarray: """ 批量编码文本为嵌入向量 Args: texts: 文本列表 model: 嵌入模型对象 batch_size: 批处理大小 Returns: 嵌入向量数组 """ if not texts: return np.array([]) # 分批处理 batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)] all_embeddings = [] for batch in batches: batch_embeddings = model.encode(batch, show_progress_bar=False) all_embeddings.append(batch_embeddings) # 合并所有批次的结果 return np.vstack(all_embeddings) if all_embeddings else np.array([]) def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float: """ 计算两个向量的余弦相似度 Args: vec1: 第一个向量 vec2: 第二个向量 Returns: 余弦相似度值,范围为[-1, 1] """ if len(vec1.shape) > 1 or len(vec2.shape) > 1: raise ValueError("输入必须是一维向量") norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return np.dot(vec1, vec2) / (norm1 * norm2) def normalize_embeddings(embeddings: np.ndarray) -> np.ndarray: """ 归一化嵌入向量 Args: embeddings: 嵌入向量数组 Returns: 归一化后的嵌入向量数组 """ # 计算每个向量的范数 norms = np.linalg.norm(embeddings, axis=1, keepdims=True) # 避免除以零 norms[norms == 0] = 1.0 # 归一化 return embeddings / norms def parallel_encode(texts: List[str], model: Any, batch_size: int = 32, max_workers: Optional[int] = None) -> np.ndarray: """ 并行编码文本为嵌入向量 Args: texts: 文本列表 model: 嵌入模型对象 batch_size: 批处理大小 max_workers: 最大工作线程数,默认为CPU核心数 Returns: 嵌入向量数组 """ if not texts: return np.array([]) if max_workers is None: max_workers = os.cpu_count() or 4 # 分批处理 batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)] results = [None] * len(batches) def encode_batch(batch_idx: int, batch: List[str]) -> Tuple[int, np.ndarray]: """编码单个批次并返回批次索引和结果""" embeddings = model.encode(batch, show_progress_bar=False) return batch_idx, embeddings # 使用线程池并行处理 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(encode_batch, i, batch) for i, batch in enumerate(batches)] for future in futures: batch_idx, embeddings = future.result() results[batch_idx] = embeddings # 合并所有批次的结果 return np.vstack(results) if results else np.array([]) def calculate_similarity_matrix(embeddings1: np.ndarray, embeddings2: np.ndarray) -> np.ndarray: """ 计算两组嵌入向量之间的相似度矩阵 Args: embeddings1: 第一组嵌入向量 embeddings2: 第二组嵌入向量 Returns: 相似度矩阵,形状为 (len(embeddings1), len(embeddings2)) """ # 归一化嵌入向量 norm_embeddings1 = normalize_embeddings(embeddings1) norm_embeddings2 = normalize_embeddings(embeddings2) # 计算点积(即余弦相似度) return np.dot(norm_embeddings1, norm_embeddings2.T) def find_top_k_similar(query_embedding: np.ndarray, corpus_embeddings: np.ndarray, k: int = 5) -> Tuple[List[int], List[float]]: """ 找出与查询向量最相似的k个语料向量 Args: query_embedding: 查询嵌入向量 corpus_embeddings: 语料库嵌入向量 k: 返回的最相似向量数量 Returns: 元组(索引列表, 相似度列表) """ # 确保查询向量是二维的 if len(query_embedding.shape) == 1: query_embedding = query_embedding.reshape(1, -1) # 计算相似度 similarities = calculate_similarity_matrix(query_embedding, corpus_embeddings)[0] # 找出前k个最相似的索引 k = min(k, len(similarities)) top_k_indices = np.argsort(similarities)[-k:][::-1] top_k_scores = similarities[top_k_indices] return top_k_indices.tolist(), top_k_scores.tolist() def chunk_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]: """ 将文本分割成重叠的块 Args: text: 输入文本 chunk_size: 块大小(字符数) chunk_overlap: 块重叠大小(字符数) Returns: 文本块列表 """ if not text: return [] chunks = [] start = 0 text_len = len(text) while start < text_len: end = min(start + chunk_size, text_len) # 如果不是最后一块,尝试在空白处分割 if end < text_len: # 在chunk_size范围内找最后一个空白字符 while end > start + chunk_size - chunk_overlap and not text[end].isspace(): end -= 1 # 如果没找到合适的分割点,就直接在chunk_size处分割 if end <= start + chunk_size - chunk_overlap: end = start + chunk_size chunks.append(text[start:end]) start = end - chunk_overlap return chunks