Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
parquet_archiver.py12.3 kB
#!/usr/bin/env python3 """ MemOS Parquet归档处理器 将30天未访问的记忆数据归档为Parquet格式,优化存储空间和查询性能 """ import json import pandas as pd from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import pyarrow as pa import pyarrow.parquet as pq class ParquetArchiver: """Parquet格式归档处理器""" def __init__(self, data_dir: str = "./memos_data"): self.data_dir = Path(data_dir) self.archive_dir = self.data_dir / "parquet_archives" self.archive_dir.mkdir(exist_ok=True) # 归档索引文件 self.index_file = self.archive_dir / "archive_index.json" self.archive_index = self._load_archive_index() print(f"📦 Parquet归档器初始化完成") print(f" 归档目录: {self.archive_dir}") def _load_archive_index(self) -> Dict: """加载归档索引""" if self.index_file.exists(): try: with open(self.index_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"⚠️ 加载归档索引失败: {e}") return {"archives": [], "memory_locations": {}} else: return {"archives": [], "memory_locations": {}} def _save_archive_index(self): """保存归档索引""" try: with open(self.index_file, 'w', encoding='utf-8') as f: json.dump(self.archive_index, f, ensure_ascii=False, indent=2) except Exception as e: print(f"❌ 保存归档索引失败: {e}") def _prepare_memory_data(self, memories: List[Dict]) -> pd.DataFrame: """准备记忆数据为DataFrame格式""" records = [] for memory in memories: # 扁平化记忆数据 record = { "memory_id": str(memory.get("id", "")), "content": memory.get("content", ""), "tags": json.dumps(memory.get("tags", []), ensure_ascii=False), "created_at": memory.get("metadata", {}).get("timestamp", ""), "memory_type": memory.get("metadata", {}).get("type", ""), "source": memory.get("metadata", {}).get("source", ""), "archived_at": datetime.now().isoformat(), "archive_reason": memory.get("archive_reason", "30_days_inactive") } # 添加访问统计信息(如果有) if "access_stats" in memory: stats = memory["access_stats"] record.update({ "last_access": stats.get("last_access", ""), "access_count": stats.get("access_count", 0), "first_access": stats.get("first_access", "") }) records.append(record) return pd.DataFrame(records) def archive_memories(self, memories: List[Dict], archive_name: Optional[str] = None) -> str: """将记忆归档为Parquet文件""" if not memories: print("⚠️ 没有记忆需要归档") return "" # 生成归档文件名 if archive_name is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_name = f"memories_archive_{timestamp}" archive_file = self.archive_dir / f"{archive_name}.parquet" try: # 准备数据 df = self._prepare_memory_data(memories) # 写入Parquet文件 df.to_parquet(archive_file, engine='pyarrow', compression='snappy') # 更新索引 archive_info = { "archive_name": archive_name, "file_path": str(archive_file), "created_at": datetime.now().isoformat(), "memory_count": len(memories), "file_size": archive_file.stat().st_size, "compression": "snappy" } self.archive_index["archives"].append(archive_info) # 更新记忆位置索引 for memory in memories: memory_id = str(memory.get("id", "")) self.archive_index["memory_locations"][memory_id] = { "archive_name": archive_name, "archived_at": datetime.now().isoformat() } self._save_archive_index() print(f"✅ 成功归档 {len(memories)} 条记忆到 {archive_file}") print(f" 文件大小: {archive_file.stat().st_size / 1024:.2f} KB") return archive_name except Exception as e: print(f"❌ 归档失败: {e}") return "" def query_archived_memories(self, archive_name: str, filters: Optional[Dict] = None) -> pd.DataFrame: """查询归档的记忆""" archive_file = self.archive_dir / f"{archive_name}.parquet" if not archive_file.exists(): print(f"❌ 归档文件不存在: {archive_file}") return pd.DataFrame() try: df = pd.read_parquet(archive_file) # 应用过滤器 if filters: for column, value in filters.items(): if column in df.columns: if isinstance(value, str): df = df[df[column].str.contains(value, na=False)] else: df = df[df[column] == value] return df except Exception as e: print(f"❌ 查询归档失败: {e}") return pd.DataFrame() def search_archived_content(self, query: str, max_results: int = 10) -> List[Dict]: """在所有归档中搜索内容""" results = [] for archive_info in self.archive_index["archives"]: archive_name = archive_info["archive_name"] try: df = self.query_archived_memories(archive_name) # 简单的文本搜索 matches = df[df["content"].str.contains(query, case=False, na=False)] for _, row in matches.iterrows(): results.append({ "memory_id": row["memory_id"], "content": row["content"][:200] + "..." if len(row["content"]) > 200 else row["content"], "tags": json.loads(row["tags"]) if row["tags"] else [], "archive_name": archive_name, "archived_at": row["archived_at"], "last_access": row.get("last_access", ""), "access_count": row.get("access_count", 0) }) if len(results) >= max_results: break except Exception as e: print(f"⚠️ 搜索归档 {archive_name} 失败: {e}") continue # 按访问次数排序 results.sort(key=lambda x: x["access_count"], reverse=True) return results[:max_results] def get_archive_stats(self) -> Dict: """获取归档统计信息""" total_archives = len(self.archive_index["archives"]) total_memories = len(self.archive_index["memory_locations"]) total_size = 0 for archive_info in self.archive_index["archives"]: total_size += archive_info.get("file_size", 0) # 最新和最旧的归档 archives_by_date = sorted( self.archive_index["archives"], key=lambda x: x["created_at"] ) return { "total_archives": total_archives, "total_archived_memories": total_memories, "total_size_bytes": total_size, "total_size_mb": total_size / (1024 * 1024), "oldest_archive": archives_by_date[0]["created_at"] if archives_by_date else None, "newest_archive": archives_by_date[-1]["created_at"] if archives_by_date else None, "archives": [ { "name": info["archive_name"], "memory_count": info["memory_count"], "size_kb": info["file_size"] / 1024, "created_at": info["created_at"] } for info in self.archive_index["archives"] ] } def restore_memory(self, memory_id: str) -> Optional[Dict]: """从归档中恢复特定记忆""" if memory_id not in self.archive_index["memory_locations"]: print(f"❌ 记忆 {memory_id} 未在归档中找到") return None location = self.archive_index["memory_locations"][memory_id] archive_name = location["archive_name"] try: df = self.query_archived_memories(archive_name, {"memory_id": memory_id}) if df.empty: print(f"❌ 在归档 {archive_name} 中未找到记忆 {memory_id}") return None row = df.iloc[0] # 重构记忆对象 memory = { "id": int(row["memory_id"]), "content": row["content"], "tags": json.loads(row["tags"]) if row["tags"] else [], "metadata": { "type": row["memory_type"], "source": row["source"], "timestamp": row["created_at"] }, "restored_from_archive": { "archive_name": archive_name, "restored_at": datetime.now().isoformat() } } print(f"✅ 成功从归档恢复记忆 {memory_id}") return memory except Exception as e: print(f"❌ 恢复记忆失败: {e}") return None def cleanup_old_archives(self, keep_days: int = 365): """清理旧的归档文件""" cutoff_date = datetime.now() - timedelta(days=keep_days) archives_to_remove = [] for archive_info in self.archive_index["archives"]: created_at = datetime.fromisoformat(archive_info["created_at"]) if created_at < cutoff_date: archives_to_remove.append(archive_info) for archive_info in archives_to_remove: archive_file = Path(archive_info["file_path"]) if archive_file.exists(): archive_file.unlink() # 从索引中移除 self.archive_index["archives"].remove(archive_info) # 移除记忆位置索引 memory_ids_to_remove = [ mid for mid, loc in self.archive_index["memory_locations"].items() if loc["archive_name"] == archive_info["archive_name"] ] for mid in memory_ids_to_remove: del self.archive_index["memory_locations"][mid] if archives_to_remove: self._save_archive_index() print(f"🧹 清理了 {len(archives_to_remove)} 个旧归档文件") def print_stats(self): """打印归档统计信息""" stats = self.get_archive_stats() print("📦 Parquet归档统计:") print(f" 归档文件数量: {stats['total_archives']}") print(f" 已归档记忆数量: {stats['total_archived_memories']}") print(f" 总存储大小: {stats['total_size_mb']:.2f} MB") if stats['oldest_archive']: print(f" 最早归档: {stats['oldest_archive']}") if stats['newest_archive']: print(f" 最新归档: {stats['newest_archive']}") if __name__ == "__main__": # 测试Parquet归档器 archiver = ParquetArchiver() archiver.print_stats()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server