"""
缓存接口模块
提供多层缓存、智能失效和性能优化功能。
"""
import os
import logging
logger = logging.getLogger(__name__)
import json
import time
import asyncio
from typing import Any, Optional, Dict, List, Union, Callable, TypeVar, Generic
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import threading
from dataclasses import dataclass, asdict
# 泛型类型定义
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
try:
import diskcache
DISKCACHE_AVAILABLE = True
except ImportError:
DISKCACHE_AVAILABLE = False
@dataclass
class CacheEntry(Generic[T]):
"""缓存条目"""
key: str
value: T
created_at: datetime
accessed_at: datetime
access_count: int
ttl: Optional[int] = None # 生存时间(秒)
def is_expired(self) -> bool:
"""检查是否过期"""
if self.ttl is None:
return False
return (datetime.now() - self.created_at).total_seconds() > self.ttl
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'key': self.key,
'value': self.value,
'created_at': self.created_at.isoformat(),
'accessed_at': self.accessed_at.isoformat(),
'access_count': self.access_count,
'ttl': self.ttl
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CacheEntry':
"""从字典创建缓存条目"""
return cls(
key=data['key'],
value=data['value'],
created_at=datetime.fromisoformat(data['created_at']),
accessed_at=datetime.fromisoformat(data['accessed_at']),
access_count=data['access_count'],
ttl=data.get('ttl')
)
class MemoryCache(Generic[T]):
"""内存缓存实现"""
def __init__(self, max_size: int = 1000):
"""
初始化内存缓存
Args:
max_size: 最大缓存条目数
"""
self._cache: Dict[str, CacheEntry[T]] = {}
self._max_size = max_size
self._lock = threading.RLock()
self._stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
def get(self, key: str) -> Optional[T]:
"""获取缓存值"""
with self._lock:
entry = self._cache.get(key)
if entry is None:
self._stats['misses'] += 1
return None
# 检查是否过期
if entry.is_expired():
del self._cache[key]
self._stats['misses'] += 1
return None
# 更新访问信息
entry.accessed_at = datetime.now()
entry.access_count += 1
self._stats['hits'] += 1
return entry.value
def set(self, key: str, value: T, ttl: Optional[int] = None) -> bool:
"""设置缓存值"""
with self._lock:
# 如果缓存已满,执行LRU淘汰
if len(self._cache) >= self._max_size and key not in self._cache:
self._evict_lru()
now = datetime.now()
entry = CacheEntry[T](
key=key,
value=value,
created_at=now,
accessed_at=now,
access_count=1,
ttl=ttl
)
self._cache[key] = entry
return True
def delete(self, key: str) -> bool:
"""删除缓存条目"""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self):
"""清空缓存"""
with self._lock:
self._cache.clear()
def _evict_lru(self):
"""淘汰最近最少使用的条目"""
if not self._cache:
return
# 找到最近最少访问的条目
lru_key = min(
self._cache.keys(),
key=lambda k: (
self._cache[k].accessed_at,
self._cache[k].access_count
)
)
del self._cache[lru_key]
self._stats['evictions'] += 1
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
with self._lock:
total_requests = self._stats['hits'] + self._stats['misses']
hit_rate = self._stats['hits'] / total_requests if total_requests > 0 else 0
return {
'size': len(self._cache),
'max_size': self._max_size,
'hits': self._stats['hits'],
'misses': self._stats['misses'],
'evictions': self._stats['evictions'],
'hit_rate': hit_rate
}
class DiskCache:
"""磁盘缓存实现"""
def __init__(self, cache_dir: str = "cache", max_size: int = 100 * 1024 * 1024): # 100MB
"""
初始化磁盘缓存
Args:
cache_dir: 缓存目录
max_size: 最大缓存大小(字节)
"""
if not DISKCACHE_AVAILABLE:
raise ImportError("diskcache is not available. Install with: pip install diskcache")
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._cache = diskcache.Cache(str(self.cache_dir))
self._max_size = max_size
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
try:
entry_data = self._cache.get(key)
if entry_data is None:
return None
entry = CacheEntry.from_dict(entry_data)
if entry.is_expired():
self.delete(key)
return None
# 更新访问信息
entry.accessed_at = datetime.now()
entry.access_count += 1
self._cache.set(key, entry.to_dict())
return entry.value
except (CacheError, RuntimeError, TypeError) as e:
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""设置缓存值"""
try:
now = datetime.now()
entry = CacheEntry(
key=key,
value=value,
created_at=now,
accessed_at=now,
access_count=1,
ttl=ttl
)
self._cache.set(key, entry.to_dict(), expire=ttl)
return True
except (CacheError, RuntimeError, TypeError) as e:
return False
def delete(self, key: str) -> bool:
"""删除缓存条目"""
try:
return self._cache.delete(key)
except (CacheError, RuntimeError, TypeError) as e:
return False
def clear(self):
"""清空缓存"""
try:
self._cache.clear()
except (CacheError, IndexError, KeyError) as e:
pass
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
try:
return {
'size': len(self._cache),
'volume': self._cache.volume(),
'max_size': self._max_size
}
except (CacheError, IndexError, KeyError) as e:
return {'size': 0, 'volume': 0, 'max_size': self._max_size}
class CacheInterface:
"""统一缓存接口"""
def __init__(
self,
memory_size: int = 1000,
disk_cache_dir: Optional[str] = None,
disk_max_size: int = 100 * 1024 * 1024,
enable_disk_cache: bool = True
):
"""
初始化缓存接口
Args:
memory_size: 内存缓存最大条目数
disk_cache_dir: 磁盘缓存目录
disk_max_size: 磁盘缓存最大大小
enable_disk_cache: 是否启用磁盘缓存
"""
self.memory_cache = MemoryCache(memory_size)
self.disk_cache = None
if enable_disk_cache and DISKCACHE_AVAILABLE and disk_cache_dir:
self.disk_cache = DiskCache(disk_cache_dir, disk_max_size)
def get(self, key: str) -> Optional[Any]:
"""获取缓存值(优先从内存缓存)"""
# 先尝试内存缓存
value = self.memory_cache.get(key)
if value is not None:
return value
# 再尝试磁盘缓存
if self.disk_cache:
value = self.disk_cache.get(key)
if value is not None:
# 将值放入内存缓存
self.memory_cache.set(key, value)
return value
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None, use_disk: bool = True) -> bool:
"""设置缓存值"""
success = self.memory_cache.set(key, value, ttl)
if use_disk and self.disk_cache:
try:
# 检查是否在异步上下文中
loop = asyncio.get_running_loop()
# 在异步上下文中,创建异步任务
loop.create_task(self._async_disk_set(key, value, ttl))
except RuntimeError:
# 不在异步上下文中,使用线程池执行
import threading
thread = threading.Thread(
target=self._sync_disk_set,
args=(key, value, ttl),
daemon=True
)
thread.start()
return success
def _sync_disk_set(self, key: str, value: Any, ttl: Optional[int]):
"""同步设置磁盘缓存(用于非异步上下文)"""
if self.disk_cache:
try:
self.disk_cache.set(key, value, ttl)
except (CacheError, IOError) as e:
# 静默处理错误,避免影响主流程
pass
async def _async_disk_set(self, key: str, value: Any, ttl: Optional[int]):
"""异步设置磁盘缓存"""
if self.disk_cache:
self.disk_cache.set(key, value, ttl)
def delete(self, key: str) -> bool:
"""删除缓存条目"""
memory_success = self.memory_cache.delete(key)
disk_success = True
if self.disk_cache:
disk_success = self.disk_cache.delete(key)
return memory_success or disk_success
def clear(self):
"""清空所有缓存"""
self.memory_cache.clear()
if self.disk_cache:
self.disk_cache.clear()
def invalidate_pattern(self, pattern: str) -> int:
"""根据模式删除缓存条目"""
# 简单实现:删除包含pattern的所有key
count = 0
# 获取所有内存缓存key
keys_to_delete = []
for key in list(self.memory_cache._cache.keys()):
if pattern in key:
keys_to_delete.append(key)
for key in keys_to_delete:
if self.memory_cache.delete(key):
count += 1
# 磁盘缓存模式删除(如果支持)
if self.disk_cache:
# diskcache不支持模式匹配,这里简化处理
pass
return count
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
stats = {
'memory_cache': self.memory_cache.get_stats()
}
if self.disk_cache:
stats['disk_cache'] = self.disk_cache.get_stats()
return stats
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""生成缓存键"""
key_parts = [prefix]
for arg in args:
if isinstance(arg, (str, int, float, bool)):
key_parts.append(str(arg))
else:
key_parts.append(hashlib.md5(str(arg).encode()).hexdigest()[:8])
for k, v in sorted(kwargs.items()):
key_parts.append(f"{k}={v}")
return ":".join(key_parts)
def cache_folder_structure(self, path: str, exclude_dirs: List[str]) -> str:
"""生成文件夹结构缓存键"""
exclude_str = ",".join(sorted(exclude_dirs))
return self._generate_key("folder_structure", path, exclude_dirs)
def cache_file_content(self, file_path: str) -> str:
"""生成文件内容缓存键"""
mtime = 0
try:
mtime = os.path.getmtime(file_path)
except (IOError, OSError) as e:
logger.warning(f"操作失败: {e}")
pass
return self._generate_key("file_content", file_path, mtime)
def cache_analysis_result(self, path: str, analysis_type: str) -> str:
"""生成分析结果缓存键"""
return self._generate_key("analysis", path, analysis_type)
# 全局缓存实例
cache_interface = CacheInterface()