MCP Code Indexer
by zxfgds
- mcp_code_indexer
"""
上下文管理器模块
提供代码上下文的缓存、压缩和优先级管理功能
"""
import os
from typing import Dict, List, Set, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import json
import logging
from pathlib import Path
import time
import heapq
from concurrent.futures import ThreadPoolExecutor
from .code_optimizer import CodeOptimizer
from .code_compressor import CodeCompressor, NormalizationLevel
logger = logging.getLogger(__name__)
class ContextType(Enum):
"""上下文类型"""
FUNCTION = "function"
CLASS = "class"
MODULE = "module"
FILE = "file"
DIRECTORY = "directory"
class ContextPriority(Enum):
"""上下文优先级"""
CRITICAL = 0 # 关键上下文
HIGH = 1 # 高优先级
NORMAL = 2 # 普通优先级
LOW = 3 # 低优先级
BACKGROUND = 4 # 背景上下文
@dataclass
class ContextItem:
"""上下文项"""
content: str
context_type: ContextType
priority: ContextPriority
file_path: str
start_line: int
end_line: int
last_used: float
access_count: int
dependencies: Set[str]
compressed: bool = False
def __lt__(self, other):
"""优先级比较"""
if not isinstance(other, ContextItem):
return NotImplemented
return (self.priority.value, -self.access_count, -self.last_used) < \
(other.priority.value, -other.access_count, -other.last_used)
class ContextCache:
"""上下文缓存"""
def __init__(self, cache_dir: Optional[str] = None, max_size: int = 1000):
"""
初始化上下文缓存
Args:
cache_dir: 缓存目录
max_size: 最大缓存项数
"""
self.cache_dir = Path(cache_dir) if cache_dir else Path.home() / ".mcp_cache" / "context"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.max_size = max_size
self.items: Dict[str, ContextItem] = {}
self.compressed_items: Dict[str, ContextItem] = {}
self._load_cache()
def _load_cache(self):
"""加载缓存"""
try:
cache_file = self.cache_dir / "context_cache.json"
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for item_data in data['items']:
item = ContextItem(
content=item_data['content'],
context_type=ContextType(item_data['type']),
priority=ContextPriority(item_data['priority']),
file_path=item_data['file_path'],
start_line=item_data['start_line'],
end_line=item_data['end_line'],
last_used=item_data['last_used'],
access_count=item_data['access_count'],
dependencies=set(item_data['dependencies']),
compressed=item_data['compressed']
)
if item.compressed:
self.compressed_items[self._get_key(item)] = item
else:
self.items[self._get_key(item)] = item
except Exception as e:
logger.error(f"加载上下文缓存失败: {str(e)}")
def _save_cache(self):
"""保存缓存"""
try:
cache_file = self.cache_dir / "context_cache.json"
items_data = []
for item in list(self.items.values()) + list(self.compressed_items.values()):
items_data.append({
'content': item.content,
'type': item.context_type.value,
'priority': item.priority.value,
'file_path': item.file_path,
'start_line': item.start_line,
'end_line': item.end_line,
'last_used': item.last_used,
'access_count': item.access_count,
'dependencies': list(item.dependencies),
'compressed': item.compressed
})
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({'items': items_data}, f, indent=2)
except Exception as e:
logger.error(f"保存上下文缓存失败: {str(e)}")
def _get_key(self, item: ContextItem) -> str:
"""生成缓存键"""
return f"{item.file_path}:{item.start_line}-{item.end_line}"
def get(self, file_path: str, start_line: int, end_line: int) -> Optional[ContextItem]:
"""获取上下文项"""
key = f"{file_path}:{start_line}-{end_line}"
# 先查找未压缩的缓存
if key in self.items:
item = self.items[key]
item.last_used = time.time()
item.access_count += 1
return item
# 再查找压缩的缓存
if key in self.compressed_items:
item = self.compressed_items[key]
item.last_used = time.time()
item.access_count += 1
return item
return None
def put(self, item: ContextItem):
"""存储上下文项"""
key = self._get_key(item)
# 检查缓存大小
if len(self.items) + len(self.compressed_items) >= self.max_size:
self._evict_items()
# 存储项
if item.compressed:
self.compressed_items[key] = item
else:
self.items[key] = item
# 定期保存缓存
if (len(self.items) + len(self.compressed_items)) % 100 == 0:
self._save_cache()
def _evict_items(self):
"""驱逐缓存项"""
# 合并所有项
all_items = list(self.items.values()) + list(self.compressed_items.values())
# 按优先级、访问次数和最后访问时间排序
sorted_items = sorted(all_items)
# 移除低优先级项直到缓存大小合适
while len(sorted_items) > self.max_size * 0.8: # 保留80%的空间
item = sorted_items.pop()
key = self._get_key(item)
if item.compressed:
self.compressed_items.pop(key, None)
else:
self.items.pop(key, None)
class ContextManager:
"""上下文管理器"""
def __init__(self, cache_dir: Optional[str] = None):
"""
初始化上下文管理器
Args:
cache_dir: 缓存目录
"""
self.cache = ContextCache(cache_dir)
self.optimizer = CodeOptimizer()
self.compressor = CodeCompressor()
def get_context(self, file_path: str, line_number: int,
context_type: ContextType = ContextType.FUNCTION,
priority: ContextPriority = ContextPriority.NORMAL) -> Optional[str]:
"""
获取代码上下文
Args:
file_path: 文件路径
line_number: 行号
context_type: 上下文类型
priority: 上下文优先级
Returns:
上下文内容
"""
try:
# 分析代码结构
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 获取文件语言
ext = os.path.splitext(file_path)[1].lower()
language = self._get_language(ext)
# 分析代码
analysis = self.optimizer.analyze_code(content, file_path, language)
# 查找包含目标行的代码块
target_block = None
for block in analysis['blocks']:
if block.start_line <= line_number <= block.end_line:
target_block = block
break
if not target_block:
return None
# 查找缓存
context_item = self.cache.get(
file_path,
target_block.start_line,
target_block.end_line
)
if context_item:
return context_item.content
# 创建新的上下文项
context_item = ContextItem(
content=target_block.content,
context_type=context_type,
priority=priority,
file_path=file_path,
start_line=target_block.start_line,
end_line=target_block.end_line,
last_used=time.time(),
access_count=1,
dependencies=target_block.dependencies
)
# 根据优先级决定是否压缩
if priority in [ContextPriority.LOW, ContextPriority.BACKGROUND]:
context_item.content = self.compressor.compress(
context_item.content,
language
)
context_item.compressed = True
# 缓存上下文
self.cache.put(context_item)
return context_item.content
except Exception as e:
logger.error(f"获取上下文失败 {file_path}:{line_number}: {str(e)}")
return None
def get_module_context(self, file_path: str,
priority: ContextPriority = ContextPriority.NORMAL) -> Optional[str]:
"""
获取模块级上下文
Args:
file_path: 文件路径
priority: 上下文优先级
Returns:
上下文内容
"""
try:
# 查找缓存
context_item = self.cache.get(file_path, 1, -1)
if context_item:
return context_item.content
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 获取文件语言
ext = os.path.splitext(file_path)[1].lower()
language = self._get_language(ext)
# 分析代码
analysis = self.optimizer.analyze_code(content, file_path, language)
# 创建新的上下文项
context_item = ContextItem(
content=content,
context_type=ContextType.MODULE,
priority=priority,
file_path=file_path,
start_line=1,
end_line=len(content.splitlines()),
last_used=time.time(),
access_count=1,
dependencies=set()
)
# 根据优先级决定是否压缩
if priority in [ContextPriority.LOW, ContextPriority.BACKGROUND]:
context_item.content = self.compressor.compress(
context_item.content,
language
)
context_item.compressed = True
# 缓存上下文
self.cache.put(context_item)
return context_item.content
except Exception as e:
logger.error(f"获取模块上下文失败 {file_path}: {str(e)}")
return None
def _get_language(self, ext: str) -> str:
"""获取文件语言"""
language_map = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.jsx': 'javascript',
'.tsx': 'typescript',
'.vue': 'vue',
'.php': 'php',
'.rs': 'rust'
}
return language_map.get(ext, 'text')