Skip to main content
Glama

MCP Sheet Parser

by yuqie6
MIT License
3
  • Apple
cache_manager.py14.1 kB
""" 主缓存管理器,结合 LRU 与磁盘缓存。 本模块提供统一的缓存操作接口, 将内存中的 LRU 缓存与可选的磁盘持久化结合。 """ import hashlib import time import logging import threading from typing import Any from pathlib import Path import os # 缓存管理常量 SMALL_FILE_THRESHOLD_BYTES = 1024 * 1024 # 1MB,小文件阈值,用于内容哈希 HASH_PREFIX_LENGTH = 16 # 内容哈希前缀长度 CACHE_KEY_LENGTH = 32 # 缓存键长度 from ..unified_config import get_cache_config from .lru_cache import LRURowBlockCache from .disk_cache import DiskCache logger = logging.getLogger(__name__) class CacheManager: """ 统一缓存管理器,结合内存 LRU 与磁盘持久化。 按文件哈希+范围键为行块提供智能缓存,适用于重复工具调用。 """ def __init__(self, config: Any = None): """ 初始化缓存管理器。 参数: config: 可选缓存配置。为 None 时使用统一全局配置。 """ # 优先使用统一配置系统 self.config = config or get_cache_config() self.config.validate() # Initialize caches based on configuration self.memory_cache = None self.disk_cache = None if self.config.memory_cache_enabled: self.memory_cache = LRURowBlockCache(max_entries=self.config.max_entries) logger.info(f"Initialized LRU cache with {self.config.max_entries} entries") if self.config.disk_cache_enabled: cache_dir = self.config.cache_dir if not cache_dir or cache_dir is None: if os.name == 'nt': cache_base = os.environ.get('LOCALAPPDATA', os.path.expanduser('~')) else: cache_base = os.environ.get('XDG_CACHE_HOME', os.path.expanduser('~/.cache')) cache_dir = str(Path(cache_base) / 'mcp-sheet-parser') # 这里确保 cache_dir 永远不是 None if cache_dir is None: cache_dir = str(Path(os.path.expanduser('~')) / 'mcp-sheet-parser') self.disk_cache = DiskCache( cache_dir=cache_dir, max_cache_size_mb=self.config.max_disk_cache_size_mb ) logger.info(f"Initialized disk cache at {cache_dir}") def _generate_cache_key(self, file_path: str, range_string: str | None = None, sheet_name: str | None = None) -> str: """ 根据文件哈希和范围参数生成缓存键。 参数: file_path: 文件路径 range_string: 可选范围字符串(如 "A1:D10") sheet_name: 可选表名 返回: 缓存键字符串 """ # Calculate file hash file_hash = self._calculate_file_hash(file_path) # Build cache key components key_parts = [file_hash] if range_string: key_parts.append(f"range:{range_string}") if sheet_name: key_parts.append(f"sheet:{sheet_name}") cache_key = "|".join(key_parts) return cache_key def _calculate_file_hash(self, file_path: str) -> str: """ 计算文件的 SHA256 哈希值,用于生成缓存键。 包含文件大小、修改时间,小文件还包含内容哈希,确保缓存一致性。 参数: file_path: 文件路径 返回: 文件的 SHA256 哈希值,或计算失败时的错误字符串 """ try: path = Path(file_path) if not path.exists(): return f"missing:{file_path}" # 包含文件大小和修改时间以加速比较 stat = path.stat() # 使用高精度修改时间以提升缓存失效准确性 file_signature = f"{path.name}:{stat.st_size}:{stat.st_mtime_ns}" # 对于小文件,包含内容哈希以确保最大准确性 if stat.st_size < SMALL_FILE_THRESHOLD_BYTES: try: with open(path, 'rb') as f: content = f.read() content_hash = hashlib.sha256(content).hexdigest()[:HASH_PREFIX_LENGTH] file_signature += f":{content_hash}" except (OSError, MemoryError) as e: logger.warning(f"Failed to read file content for hash: {e}") # 仅回退到文件元数据 return hashlib.sha256(file_signature.encode()).hexdigest()[:CACHE_KEY_LENGTH] except Exception as e: logger.warning(f"Failed to calculate file hash for {file_path}: {e}") # 返回唯一错误标识以避免缓存冲突 return f"error:{abs(hash(file_path))}:{int(time.time())}" def get(self, file_path: str, range_string: str | None = None, sheet_name: str | None = None) -> Any | None: """ 获取缓存数据。 参数: file_path: 文件路径 range_string: 可选范围字符串 sheet_name: 可选表名 返回: 找到则返回缓存数据,否则返回 None """ if not self.config.is_cache_enabled(): return None cache_key = self._generate_cache_key(file_path, range_string, sheet_name) # 优先尝试内存缓存 if self.memory_cache: cached_data = self.memory_cache.get(cache_key) if cached_data is not None: logger.debug(f"Cache hit (memory): {cache_key}") return cached_data # 再尝试磁盘缓存 if self.disk_cache: cached_data = self.disk_cache.get(cache_key) if cached_data is not None: # 检查缓存条目是否仍然有效 if self._is_cache_valid(cached_data): logger.debug(f"Cache hit (disk): {cache_key}") # 提升到内存缓存 if self.memory_cache: self.memory_cache.set(cache_key, cached_data) return cached_data else: logger.debug(f"Cache expired (disk): {cache_key}") logger.debug(f"Cache miss: {cache_key}") return None def set(self, file_path: str, data: Any, range_string: str | None = None, sheet_name: str | None = None) -> None: """ 缓存数据。 参数: file_path: 文件路径 data: 要缓存的数据 range_string: 可选范围字符串 sheet_name: 可选表名 """ if not self.config.is_cache_enabled(): return cache_key = self._generate_cache_key(file_path, range_string, sheet_name) # 添加时间戳用于过期判断 cache_entry = { 'data': data, 'timestamp': time.time(), 'file_path': file_path, 'range_string': range_string, 'sheet_name': sheet_name } # 存入内存缓存 if self.memory_cache: self.memory_cache.set(cache_key, cache_entry) logger.debug(f"Cached in memory: {cache_key}") # 存入磁盘缓存 if self.disk_cache: try: self.disk_cache.set(cache_key, cache_entry) logger.debug(f"Cached on disk: {cache_key}") except Exception as e: logger.warning(f"Failed to cache on disk: {e}") def _is_cache_valid(self, cache_entry: dict[str, Any]) -> bool: """ 检查缓存条目是否有效。 参数: cache_entry: 待验证的缓存条目 返回: 有效返回 True,否则返回 False """ if not isinstance(cache_entry, dict) or 'timestamp' not in cache_entry: return False current_time = time.time() entry_time = cache_entry['timestamp'] # 检查过期 if current_time - entry_time > self.config.cache_expiry_seconds: return False # 检查文件是否仍存在且未被修改 file_path = cache_entry.get('file_path') if file_path: try: path = Path(file_path) if not path.exists(): return False # 当前依赖缓存键中的文件哈希 # 可在此处添加更复杂的校验 return True except Exception: return False return True def clear(self) -> None: """清除所有缓存。""" if self.memory_cache: self.memory_cache.clear() logger.info("Cleared memory cache") if self.disk_cache: self.disk_cache.clear() logger.info("Cleared disk cache") def get_stats(self) -> dict[str, Any]: """ 获取缓存统计信息。 返回: 包含缓存统计信息的字典 """ stats = { 'config': { 'cache_enabled': self.config.cache_enabled, 'memory_cache_enabled': self.config.memory_cache_enabled, 'disk_cache_enabled': self.config.disk_cache_enabled, 'max_entries': self.config.max_entries, 'max_disk_cache_size_mb': self.config.max_disk_cache_size_mb, 'cache_expiry_seconds': self.config.cache_expiry_seconds, 'cache_dir': self.config.cache_dir }, 'memory_cache': None, 'disk_cache': None } # 内存缓存统计 if self.memory_cache: stats['memory_cache'] = { 'current_size': len(self.memory_cache.cache), 'max_size': self.memory_cache.cache.maxsize, 'hits': getattr(self.memory_cache.cache, 'hits', 0), 'misses': getattr(self.memory_cache.cache, 'misses', 0) } # 磁盘缓存统计 if self.disk_cache: try: cache_files = list(self.disk_cache.cache_dir.glob('*.cache')) total_size = sum(f.stat().st_size for f in cache_files) stats['disk_cache'] = { 'cache_files': len(cache_files), 'total_size_mb': total_size / (1024 * 1024), 'cache_dir': str(self.disk_cache.cache_dir) } except Exception as e: stats['disk_cache'] = {'error': str(e)} return stats def optimize_cache(self) -> dict[str, Any]: """ 优化缓存,清理过期条目。 返回: 包含优化结果的字典 """ import pickle # 移到函数顶部,避免重复导入 results = { 'memory_cache_cleaned': 0, 'disk_cache_cleaned': 0, 'errors': [] } # 清理内存缓存(LRU自动处理) if self.memory_cache: # LRU缓存无法轻易清理过期项,除非遍历所有条目 pass # 清理磁盘缓存 if self.disk_cache: try: cache_files = list(self.disk_cache.cache_dir.glob('*.cache')) for cache_file in cache_files: try: with open(cache_file, 'rb') as f: cache_entry = pickle.load(f) if not self._is_cache_valid(cache_entry): cache_file.unlink() results['disk_cache_cleaned'] += 1 except (pickle.PickleError, EOFError, OSError) as e: # 移除损坏的缓存文件 try: cache_file.unlink() results['disk_cache_cleaned'] += 1 results['errors'].append(f"Removed corrupted cache file {cache_file.name}: {e}") except OSError as unlink_error: results['errors'].append(f"Failed to remove corrupted cache file {cache_file.name}: {unlink_error}") except Exception as e: # 记录未预期错误但不移除文件 results['errors'].append(f"Unexpected error processing cache file {cache_file.name}: {e}") except Exception as e: results['errors'].append(f"Failed to optimize disk cache: {e}") return results # 全局缓存管理器实例(线程安全) _global_cache_manager = None _cache_manager_lock = threading.Lock() def get_cache_manager() -> CacheManager: """获取全局缓存管理器实例(线程安全)。""" global _global_cache_manager if _global_cache_manager is None: with _cache_manager_lock: # 双重检查锁定模式 if _global_cache_manager is None: _global_cache_manager = CacheManager() return _global_cache_manager def reset_cache_manager() -> None: """重置全局缓存管理器实例(线程安全)。""" global _global_cache_manager with _cache_manager_lock: if _global_cache_manager is not None: _global_cache_manager.clear() _global_cache_manager = None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yuqie6/MCP-Sheet-Parser-cot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server