storage_optimizer.py•12.1 kB
#!/usr/bin/env python3
"""
MemOS存储优化管理器
整合访问统计、Parquet归档和现有归档系统,实现30天未访问数据的自动归档
"""
import json
import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from access_tracker import AccessTracker
from parquet_archiver import ParquetArchiver
class StorageOptimizer:
"""存储优化管理器"""
def __init__(self, data_dir: str = "./memos_data"):
self.data_dir = Path(data_dir)
self.access_tracker = AccessTracker(data_dir)
self.parquet_archiver = ParquetArchiver(data_dir)
# 现有归档目录
self.memory_archives_dir = self.data_dir.parent / "memory_archives"
# 配置
self.config = {
"inactive_days_threshold": 30,
"min_access_count": 1,
"auto_archive_enabled": True,
"parquet_compression": "snappy",
"cleanup_old_archives_days": 365
}
print(f"🗄️ 存储优化器初始化完成")
print(f" 未访问阈值: {self.config['inactive_days_threshold']} 天")
def analyze_storage_usage(self) -> Dict[str, Any]:
"""分析当前存储使用情况"""
analysis = {
"timestamp": datetime.now().isoformat(),
"directories": {},
"total_size_mb": 0,
"recommendations": []
}
# 分析各个目录的存储使用
directories_to_check = [
self.data_dir / "qdrant_storage",
self.data_dir / "backups",
self.data_dir / "parquet_archives",
self.memory_archives_dir
]
for dir_path in directories_to_check:
if dir_path.exists():
size_bytes = sum(f.stat().st_size for f in dir_path.rglob('*') if f.is_file())
size_mb = size_bytes / (1024 * 1024)
file_count = len(list(dir_path.rglob('*')))
analysis["directories"][str(dir_path.name)] = {
"size_mb": round(size_mb, 2),
"size_bytes": size_bytes,
"file_count": file_count,
"path": str(dir_path)
}
analysis["total_size_mb"] += size_mb
# 生成优化建议
access_summary = self.access_tracker.get_access_summary()
inactive_30d = access_summary["inactive_memories"]["30_days"]
if inactive_30d > 10:
analysis["recommendations"].append(
f"发现 {inactive_30d} 条30天未访问记忆,建议执行归档操作"
)
qdrant_size = analysis["directories"].get("qdrant_storage", {}).get("size_mb", 0)
if qdrant_size > 100:
analysis["recommendations"].append(
f"Qdrant存储占用 {qdrant_size:.1f}MB,建议定期归档减少向量数据库大小"
)
return analysis
def get_archival_candidates(self, days_threshold: int = None) -> List[Dict]:
"""获取归档候选记忆"""
if days_threshold is None:
days_threshold = self.config["inactive_days_threshold"]
candidates = self.access_tracker.get_archival_candidates(
days_threshold=days_threshold,
min_access_count=self.config["min_access_count"]
)
print(f"📋 找到 {len(candidates)} 个归档候选记忆")
return candidates
def fetch_memory_data(self, memory_ids: List[str]) -> List[Dict]:
"""从MVP管理器获取记忆数据"""
try:
# 导入MVP管理器
from mvp_memory import create_mvp_memory_manager
mvp = create_mvp_memory_manager()
memories = []
for memory_id in memory_ids:
try:
# 获取记忆数据
memory_data = mvp.get_memory(int(memory_id))
if memory_data:
# 添加访问统计信息
access_stats = self.access_tracker.get_memory_stats(memory_id)
if access_stats:
memory_data["access_stats"] = access_stats
memories.append(memory_data)
except Exception as e:
print(f"⚠️ 获取记忆 {memory_id} 失败: {e}")
continue
return memories
except Exception as e:
print(f"❌ 获取记忆数据失败: {e}")
return []
def archive_inactive_memories(self, dry_run: bool = False) -> Dict[str, Any]:
"""归档未访问的记忆"""
result = {
"success": False,
"archived_count": 0,
"archive_name": "",
"errors": [],
"dry_run": dry_run
}
try:
# 获取归档候选
candidates = self.get_archival_candidates()
if not candidates:
print("✅ 没有需要归档的记忆")
result["success"] = True
return result
memory_ids = [c["memory_id"] for c in candidates]
if dry_run:
print(f"🔍 模拟运行:将归档 {len(memory_ids)} 条记忆")
for candidate in candidates[:5]: # 显示前5个
print(f" ID {candidate['memory_id']}: {candidate['days_since_access']}天未访问")
if len(candidates) > 5:
print(f" ... 还有 {len(candidates) - 5} 条记忆")
result["success"] = True
result["archived_count"] = len(memory_ids)
return result
# 获取记忆数据
print(f"📥 获取 {len(memory_ids)} 条记忆数据...")
memories = self.fetch_memory_data(memory_ids)
if not memories:
result["errors"].append("无法获取记忆数据")
return result
# 执行Parquet归档
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_name = f"inactive_memories_{timestamp}"
print(f"📦 开始归档到 {archive_name}.parquet...")
archive_result = self.parquet_archiver.archive_memories(memories, archive_name)
if archive_result:
# 标记为已归档
self.access_tracker.mark_as_archived(memory_ids)
self.access_tracker.save_all()
# 从活跃存储中移除(这里需要与MVP管理器集成)
self._remove_from_active_storage(memory_ids)
result["success"] = True
result["archived_count"] = len(memories)
result["archive_name"] = archive_name
print(f"✅ 成功归档 {len(memories)} 条记忆")
else:
result["errors"].append("Parquet归档失败")
except Exception as e:
error_msg = f"归档过程出错: {e}"
print(f"❌ {error_msg}")
result["errors"].append(error_msg)
return result
def _remove_from_active_storage(self, memory_ids: List[str]):
"""从活跃存储中移除记忆(占位符实现)"""
# 这里应该与MVP管理器集成,实际删除向量数据库中的记忆
# 目前只是记录操作
print(f"🗑️ 标记 {len(memory_ids)} 条记忆为已归档(从活跃存储移除)")
# 可以在这里添加实际的删除逻辑
# 例如:调用MVP管理器的删除方法
def search_archived_memories(self, query: str, max_results: int = 10) -> List[Dict]:
"""搜索已归档的记忆"""
print(f"🔍 在归档中搜索: {query}")
results = self.parquet_archiver.search_archived_content(query, max_results)
print(f"📋 找到 {len(results)} 条归档记忆")
return results
def restore_memory(self, memory_id: str) -> bool:
"""恢复归档的记忆到活跃存储"""
try:
# 从Parquet归档中恢复
memory_data = self.parquet_archiver.restore_memory(memory_id)
if not memory_data:
return False
# 这里应该将记忆重新添加到MVP管理器
# 目前只是模拟操作
print(f"🔄 记忆 {memory_id} 已从归档恢复(需要集成到MVP管理器)")
# 更新访问统计
self.access_tracker.record_access(memory_id, "restore")
self.access_tracker.save_all()
return True
except Exception as e:
print(f"❌ 恢复记忆失败: {e}")
return False
def cleanup_old_data(self):
"""清理旧数据"""
print("🧹 开始清理旧数据...")
# 清理旧的每日访问统计
self.access_tracker.cleanup_old_daily_stats(keep_days=90)
# 清理旧的归档文件
self.parquet_archiver.cleanup_old_archives(
keep_days=self.config["cleanup_old_archives_days"]
)
print("✅ 旧数据清理完成")
def get_optimization_report(self) -> Dict[str, Any]:
"""生成存储优化报告"""
storage_analysis = self.analyze_storage_usage()
access_summary = self.access_tracker.get_access_summary()
archive_stats = self.parquet_archiver.get_archive_stats()
report = {
"timestamp": datetime.now().isoformat(),
"storage_usage": storage_analysis,
"access_statistics": access_summary,
"archive_statistics": archive_stats,
"optimization_opportunities": []
}
# 分析优化机会
inactive_30d = access_summary["inactive_memories"]["30_days"]
if inactive_30d > 0:
report["optimization_opportunities"].append({
"type": "archive_inactive",
"description": f"可归档 {inactive_30d} 条30天未访问记忆",
"potential_savings": "减少向量数据库大小,提升查询性能"
})
total_size = storage_analysis["total_size_mb"]
if total_size > 500:
report["optimization_opportunities"].append({
"type": "storage_cleanup",
"description": f"总存储使用 {total_size:.1f}MB,建议定期清理",
"potential_savings": "释放磁盘空间"
})
return report
def print_status(self):
"""打印存储优化状态"""
print("🗄️ 存储优化状态报告")
print("=" * 40)
# 存储使用情况
storage_analysis = self.analyze_storage_usage()
print(f"📊 总存储使用: {storage_analysis['total_size_mb']:.2f} MB")
for name, info in storage_analysis["directories"].items():
print(f" {name}: {info['size_mb']:.2f} MB ({info['file_count']} 文件)")
# 访问统计
print("")
self.access_tracker.print_stats()
# 归档统计
print("")
self.parquet_archiver.print_stats()
# 优化建议
if storage_analysis["recommendations"]:
print("")
print("💡 优化建议:")
for rec in storage_analysis["recommendations"]:
print(f" • {rec}")
if __name__ == "__main__":
# 测试存储优化器
optimizer = StorageOptimizer()
optimizer.print_status()