Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
storage_optimizer.py12.1 kB
#!/usr/bin/env python3 """ MemOS存储优化管理器 整合访问统计、Parquet归档和现有归档系统,实现30天未访问数据的自动归档 """ import json import shutil from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from access_tracker import AccessTracker from parquet_archiver import ParquetArchiver class StorageOptimizer: """存储优化管理器""" def __init__(self, data_dir: str = "./memos_data"): self.data_dir = Path(data_dir) self.access_tracker = AccessTracker(data_dir) self.parquet_archiver = ParquetArchiver(data_dir) # 现有归档目录 self.memory_archives_dir = self.data_dir.parent / "memory_archives" # 配置 self.config = { "inactive_days_threshold": 30, "min_access_count": 1, "auto_archive_enabled": True, "parquet_compression": "snappy", "cleanup_old_archives_days": 365 } print(f"🗄️ 存储优化器初始化完成") print(f" 未访问阈值: {self.config['inactive_days_threshold']} 天") def analyze_storage_usage(self) -> Dict[str, Any]: """分析当前存储使用情况""" analysis = { "timestamp": datetime.now().isoformat(), "directories": {}, "total_size_mb": 0, "recommendations": [] } # 分析各个目录的存储使用 directories_to_check = [ self.data_dir / "qdrant_storage", self.data_dir / "backups", self.data_dir / "parquet_archives", self.memory_archives_dir ] for dir_path in directories_to_check: if dir_path.exists(): size_bytes = sum(f.stat().st_size for f in dir_path.rglob('*') if f.is_file()) size_mb = size_bytes / (1024 * 1024) file_count = len(list(dir_path.rglob('*'))) analysis["directories"][str(dir_path.name)] = { "size_mb": round(size_mb, 2), "size_bytes": size_bytes, "file_count": file_count, "path": str(dir_path) } analysis["total_size_mb"] += size_mb # 生成优化建议 access_summary = self.access_tracker.get_access_summary() inactive_30d = access_summary["inactive_memories"]["30_days"] if inactive_30d > 10: analysis["recommendations"].append( f"发现 {inactive_30d} 条30天未访问记忆,建议执行归档操作" ) qdrant_size = analysis["directories"].get("qdrant_storage", {}).get("size_mb", 0) if qdrant_size > 100: analysis["recommendations"].append( f"Qdrant存储占用 {qdrant_size:.1f}MB,建议定期归档减少向量数据库大小" ) return analysis def get_archival_candidates(self, days_threshold: int = None) -> List[Dict]: """获取归档候选记忆""" if days_threshold is None: days_threshold = self.config["inactive_days_threshold"] candidates = self.access_tracker.get_archival_candidates( days_threshold=days_threshold, min_access_count=self.config["min_access_count"] ) print(f"📋 找到 {len(candidates)} 个归档候选记忆") return candidates def fetch_memory_data(self, memory_ids: List[str]) -> List[Dict]: """从MVP管理器获取记忆数据""" try: # 导入MVP管理器 from mvp_memory import create_mvp_memory_manager mvp = create_mvp_memory_manager() memories = [] for memory_id in memory_ids: try: # 获取记忆数据 memory_data = mvp.get_memory(int(memory_id)) if memory_data: # 添加访问统计信息 access_stats = self.access_tracker.get_memory_stats(memory_id) if access_stats: memory_data["access_stats"] = access_stats memories.append(memory_data) except Exception as e: print(f"⚠️ 获取记忆 {memory_id} 失败: {e}") continue return memories except Exception as e: print(f"❌ 获取记忆数据失败: {e}") return [] def archive_inactive_memories(self, dry_run: bool = False) -> Dict[str, Any]: """归档未访问的记忆""" result = { "success": False, "archived_count": 0, "archive_name": "", "errors": [], "dry_run": dry_run } try: # 获取归档候选 candidates = self.get_archival_candidates() if not candidates: print("✅ 没有需要归档的记忆") result["success"] = True return result memory_ids = [c["memory_id"] for c in candidates] if dry_run: print(f"🔍 模拟运行:将归档 {len(memory_ids)} 条记忆") for candidate in candidates[:5]: # 显示前5个 print(f" ID {candidate['memory_id']}: {candidate['days_since_access']}天未访问") if len(candidates) > 5: print(f" ... 还有 {len(candidates) - 5} 条记忆") result["success"] = True result["archived_count"] = len(memory_ids) return result # 获取记忆数据 print(f"📥 获取 {len(memory_ids)} 条记忆数据...") memories = self.fetch_memory_data(memory_ids) if not memories: result["errors"].append("无法获取记忆数据") return result # 执行Parquet归档 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_name = f"inactive_memories_{timestamp}" print(f"📦 开始归档到 {archive_name}.parquet...") archive_result = self.parquet_archiver.archive_memories(memories, archive_name) if archive_result: # 标记为已归档 self.access_tracker.mark_as_archived(memory_ids) self.access_tracker.save_all() # 从活跃存储中移除(这里需要与MVP管理器集成) self._remove_from_active_storage(memory_ids) result["success"] = True result["archived_count"] = len(memories) result["archive_name"] = archive_name print(f"✅ 成功归档 {len(memories)} 条记忆") else: result["errors"].append("Parquet归档失败") except Exception as e: error_msg = f"归档过程出错: {e}" print(f"❌ {error_msg}") result["errors"].append(error_msg) return result def _remove_from_active_storage(self, memory_ids: List[str]): """从活跃存储中移除记忆(占位符实现)""" # 这里应该与MVP管理器集成,实际删除向量数据库中的记忆 # 目前只是记录操作 print(f"🗑️ 标记 {len(memory_ids)} 条记忆为已归档(从活跃存储移除)") # 可以在这里添加实际的删除逻辑 # 例如:调用MVP管理器的删除方法 def search_archived_memories(self, query: str, max_results: int = 10) -> List[Dict]: """搜索已归档的记忆""" print(f"🔍 在归档中搜索: {query}") results = self.parquet_archiver.search_archived_content(query, max_results) print(f"📋 找到 {len(results)} 条归档记忆") return results def restore_memory(self, memory_id: str) -> bool: """恢复归档的记忆到活跃存储""" try: # 从Parquet归档中恢复 memory_data = self.parquet_archiver.restore_memory(memory_id) if not memory_data: return False # 这里应该将记忆重新添加到MVP管理器 # 目前只是模拟操作 print(f"🔄 记忆 {memory_id} 已从归档恢复(需要集成到MVP管理器)") # 更新访问统计 self.access_tracker.record_access(memory_id, "restore") self.access_tracker.save_all() return True except Exception as e: print(f"❌ 恢复记忆失败: {e}") return False def cleanup_old_data(self): """清理旧数据""" print("🧹 开始清理旧数据...") # 清理旧的每日访问统计 self.access_tracker.cleanup_old_daily_stats(keep_days=90) # 清理旧的归档文件 self.parquet_archiver.cleanup_old_archives( keep_days=self.config["cleanup_old_archives_days"] ) print("✅ 旧数据清理完成") def get_optimization_report(self) -> Dict[str, Any]: """生成存储优化报告""" storage_analysis = self.analyze_storage_usage() access_summary = self.access_tracker.get_access_summary() archive_stats = self.parquet_archiver.get_archive_stats() report = { "timestamp": datetime.now().isoformat(), "storage_usage": storage_analysis, "access_statistics": access_summary, "archive_statistics": archive_stats, "optimization_opportunities": [] } # 分析优化机会 inactive_30d = access_summary["inactive_memories"]["30_days"] if inactive_30d > 0: report["optimization_opportunities"].append({ "type": "archive_inactive", "description": f"可归档 {inactive_30d} 条30天未访问记忆", "potential_savings": "减少向量数据库大小,提升查询性能" }) total_size = storage_analysis["total_size_mb"] if total_size > 500: report["optimization_opportunities"].append({ "type": "storage_cleanup", "description": f"总存储使用 {total_size:.1f}MB,建议定期清理", "potential_savings": "释放磁盘空间" }) return report def print_status(self): """打印存储优化状态""" print("🗄️ 存储优化状态报告") print("=" * 40) # 存储使用情况 storage_analysis = self.analyze_storage_usage() print(f"📊 总存储使用: {storage_analysis['total_size_mb']:.2f} MB") for name, info in storage_analysis["directories"].items(): print(f" {name}: {info['size_mb']:.2f} MB ({info['file_count']} 文件)") # 访问统计 print("") self.access_tracker.print_stats() # 归档统计 print("") self.parquet_archiver.print_stats() # 优化建议 if storage_analysis["recommendations"]: print("") print("💡 优化建议:") for rec in storage_analysis["recommendations"]: print(f" • {rec}") if __name__ == "__main__": # 测试存储优化器 optimizer = StorageOptimizer() optimizer.print_status()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server