MCP Code Indexer

  • mcp_code_indexer
""" MCP响应格式化模块 负责将检索结果转换为符合MCP协议的格式 """ import os import json from typing import List, Dict, Any, Optional import logging from datetime import datetime logger = logging.getLogger(__name__) class McpFormatter: """ MCP响应格式化器类 将检索结果转换为符合MCP协议的格式,包含代码内容、位置、关系等信息 """ def __init__(self): """ 初始化MCP响应格式化器 Returns: 无返回值 """ logger.info("MCP响应格式化器初始化完成") def format_search_results(self, results: List[Dict[str, Any]], query: str, confidence_threshold: float = 0.7) -> Dict[str, Any]: """ 格式化搜索结果为MCP响应 Args: results: 搜索结果列表 query: 原始查询字符串 confidence_threshold: 置信度阈值,低于此值的结果将被标记为低置信度 Returns: MCP格式的响应字典 """ if not results: return self._create_empty_response(query) # 格式化代码块 code_blocks = [] for result in results: code_block = self._format_code_block(result, confidence_threshold) if code_block: code_blocks.append(code_block) # 创建MCP响应 response = { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "query": query, "result_count": len(code_blocks), "code_blocks": code_blocks } return response def _format_code_block(self, result: Dict[str, Any], confidence_threshold: float) -> Optional[Dict[str, Any]]: """ 格式化单个代码块 Args: result: 代码块搜索结果 confidence_threshold: 置信度阈值 Returns: 格式化后的代码块字典,如果格式化失败则返回None """ try: # 提取基本信息 file_path = result.get('file_path', '') content = result.get('content', '') start_line = result.get('start_line', 1) end_line = result.get('end_line', 1) language = result.get('language', 'text') similarity = result.get('similarity', 0.0) # 计算置信度 confidence = similarity low_confidence = confidence < confidence_threshold # 创建代码块字典 code_block = { "id": self._generate_block_id(file_path, start_line, end_line), "file_path": file_path, "file_name": os.path.basename(file_path), "language": language, "start_line": start_line, "end_line": end_line, "content": content, "confidence": confidence, "low_confidence": low_confidence, "metadata": { "similarity": similarity, "type": result.get('type', 'code') } } return code_block except Exception as e: logger.error(f"格式化代码块失败: {str(e)}") return None def _generate_block_id(self, file_path: str, start_line: int, end_line: int) -> str: """ 生成代码块ID Args: file_path: 文件路径 start_line: 起始行号 end_line: 结束行号 Returns: 代码块ID字符串 """ file_name = os.path.basename(file_path) return f"{file_name}:{start_line}-{end_line}" def _create_empty_response(self, query: str) -> Dict[str, Any]: """ 创建空响应 Args: query: 原始查询字符串 Returns: 空的MCP响应字典 """ return { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "query": query, "result_count": 0, "code_blocks": [], "message": "未找到结果" } def format_project_info(self, project_info: Dict[str, Any]) -> Dict[str, Any]: """ 格式化项目信息为MCP响应 Args: project_info: 项目信息字典 Returns: MCP格式的项目信息响应字典 """ return { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "project_id": project_info.get("project_id"), "project_path": project_info.get("project_path"), "indexed_at": project_info.get("indexed_at"), "file_count": project_info.get("file_count"), "status": project_info.get("status"), "progress": project_info.get("progress", 0.0), "metadata": project_info.get("metadata", {}) } def format_code_context(self, code_context: Dict[str, Any], related_blocks: List[Dict[str, Any]] = None) -> Dict[str, Any]: """ 格式化代码上下文为MCP响应 Args: code_context: 代码上下文字典 related_blocks: 相关代码块列表 Returns: MCP格式的代码上下文响应字典 """ # 格式化主代码块 main_block = { "id": self._generate_block_id( code_context.get('file_path', ''), code_context.get('start_line', 1), code_context.get('end_line', 1) ), "file_path": code_context.get('file_path', ''), "file_name": os.path.basename(code_context.get('file_path', '')), "start_line": code_context.get('start_line', 1), "end_line": code_context.get('end_line', 1), "target_line": code_context.get('target_line', 1), "content": code_context.get('content', ''), "language": self._guess_language(code_context.get('file_path', '')) } # 格式化相关代码块 related_code_blocks = [] if related_blocks: for block in related_blocks: formatted_block = self._format_code_block(block, 0.0) if formatted_block: related_code_blocks.append(formatted_block) # 创建MCP响应 response = { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "code_context": main_block, "related_blocks": related_code_blocks, "related_count": len(related_code_blocks) } return response def _guess_language(self, file_path: str) -> str: """ 根据文件扩展名猜测编程语言 Args: file_path: 文件路径 Returns: 编程语言字符串 """ ext_to_lang = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "javascript", ".tsx": "typescript", ".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".cs": "csharp", ".go": "go", ".rb": "ruby", ".php": "php", ".swift": "swift", ".kt": "kotlin", ".rs": "rust", ".sh": "bash", ".html": "html", ".css": "css", ".scss": "scss", ".sql": "sql", ".md": "markdown", ".json": "json", ".xml": "xml", ".yaml": "yaml", ".yml": "yaml", ".toml": "toml" } _, ext = os.path.splitext(file_path.lower()) return ext_to_lang.get(ext, "text") def format_error(self, error_message: str, query: str = None) -> Dict[str, Any]: """ 格式化错误为MCP响应 Args: error_message: 错误消息 query: 原始查询字符串 Returns: MCP格式的错误响应字典 """ return { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "query": query, "error": True, "error_message": error_message, "result_count": 0, "code_blocks": [] } def format_indexing_status(self, project_id: str, status: str, progress: float, message: str = None) -> Dict[str, Any]: """ 格式化索引状态为MCP响应 Args: project_id: 项目ID status: 索引状态 progress: 索引进度(0.0-1.0) message: 状态消息 Returns: MCP格式的索引状态响应字典 """ return { "mcp_version": "1.0", "timestamp": datetime.now().isoformat(), "project_id": project_id, "indexing_status": { "status": status, "progress": progress, "message": message or f"索引状态:{status} ({progress:.1%})" } }