memory_manager.py•11.1 kB
#!/usr/bin/env python3
"""
MemOS 统一记忆管理抽象层
支持SimpleMemOS和GeneralTextMemory两种后端切换,为后续功能扩展预留接口
"""
import os
import json
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
from datetime import datetime
from enum import Enum
class MemoryBackend(Enum):
"""记忆后端类型"""
SIMPLE = "simple"
OFFICIAL = "official"
class MemoryManager(ABC):
"""记忆管理器抽象基类"""
@abstractmethod
def remember(self, content: str, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""添加记忆"""
pass
@abstractmethod
def recall(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""检索记忆"""
pass
@abstractmethod
def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""从对话中提取记忆"""
pass
@abstractmethod
def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取每日记忆"""
pass
@abstractmethod
def dump_memories(self, backup_dir: str) -> bool:
"""备份记忆"""
pass
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
pass
class UnifiedMemoryManager:
"""统一记忆管理器 - 支持多种后端"""
def __init__(self, data_dir: str = "./memos_data", backend: str = "simple"):
"""初始化统一记忆管理器"""
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
# 加载配置
self.config = self._load_config()
# 确定后端类型
self.backend_type = MemoryBackend(backend)
# 初始化后端
self.backend = self._create_backend()
# 性能监控
self.stats = {
"operations": 0,
"start_time": datetime.now(),
"last_operation": None
}
print(f"✅ 统一记忆管理器初始化成功 (后端: {self.backend_type.value})")
def _load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
config_path = os.environ.get('MEMOS_CONFIG', str(self.data_dir / "concurrent_config.json"))
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 添加backend配置选项
if 'backend' not in config:
config['backend'] = 'simple'
print(f"[UNIFIED] 加载配置文件: {config_path}")
print(f"[UNIFIED] 后端类型: {config.get('backend', 'simple')}")
return config
except Exception as e:
print(f"⚠️ 配置文件加载失败: {e}")
return {
"backend": "simple",
"qdrant_port": 6335,
"features": {
"auto_extract": True,
"daily_review": True,
"backup": True
}
}
def _create_backend(self) -> MemoryManager:
"""创建后端实例"""
if self.backend_type == MemoryBackend.SIMPLE:
return SimpleMemoryBackend(self.data_dir, self.config)
elif self.backend_type == MemoryBackend.OFFICIAL:
return OfficialMemoryBackend(self.data_dir, self.config)
else:
raise ValueError(f"不支持的后端类型: {self.backend_type}")
def _update_stats(self, operation: str):
"""更新性能统计"""
self.stats["operations"] += 1
self.stats["last_operation"] = {
"type": operation,
"timestamp": datetime.now()
}
def remember(self, content: str, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""统一的记忆添加接口"""
self._update_stats("remember")
try:
result = self.backend.remember(content, tags, metadata)
print(f"[UNIFIED] 记忆添加{'成功' if result else '失败'}")
return result
except Exception as e:
print(f"❌ 统一记忆添加失败: {e}")
return False
def recall(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""统一的记忆检索接口"""
self._update_stats("recall")
try:
results = self.backend.recall(query, top_k)
print(f"[UNIFIED] 检索到 {len(results)} 条记忆")
return results
except Exception as e:
print(f"❌ 统一记忆检索失败: {e}")
return []
def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""统一的记忆提取接口"""
self._update_stats("extract")
try:
results = self.backend.extract_from_messages(messages)
print(f"[UNIFIED] 提取到 {len(results)} 条记忆")
return results
except Exception as e:
print(f"❌ 统一记忆提取失败: {e}")
return []
def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]:
"""统一的每日记忆获取接口"""
self._update_stats("get_daily")
try:
results = self.backend.get_daily_memories(date)
print(f"[UNIFIED] 获取到 {len(results)} 条每日记忆")
return results
except Exception as e:
print(f"❌ 统一每日记忆获取失败: {e}")
return []
def dump_memories(self, backup_dir: str) -> bool:
"""统一的记忆备份接口"""
self._update_stats("dump")
try:
result = self.backend.dump_memories(backup_dir)
print(f"[UNIFIED] 记忆备份{'成功' if result else '失败'}")
return result
except Exception as e:
print(f"❌ 统一记忆备份失败: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""获取统一的统计信息"""
try:
backend_stats = self.backend.get_stats()
unified_stats = {
"backend_type": self.backend_type.value,
"unified_stats": self.stats,
"backend_stats": backend_stats,
"uptime": (datetime.now() - self.stats["start_time"]).total_seconds(),
"config": self.config
}
return unified_stats
except Exception as e:
print(f"❌ 获取统计信息失败: {e}")
return {"error": str(e)}
def switch_backend(self, new_backend: str) -> bool:
"""切换后端"""
try:
old_backend = self.backend_type.value
self.backend_type = MemoryBackend(new_backend)
self.backend = self._create_backend()
print(f"✅ 后端切换成功: {old_backend} -> {new_backend}")
return True
except Exception as e:
print(f"❌ 后端切换失败: {e}")
return False
class SimpleMemoryBackend(MemoryManager):
"""SimpleMemOS后端实现"""
def __init__(self, data_dir: Path, config: Dict[str, Any]):
self.data_dir = data_dir
self.config = config
# 导入并初始化MVP记忆管理器
from mvp_memory import MVPMemoryManager
self.mvp_manager = MVPMemoryManager(str(data_dir))
def remember(self, content: str, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
return self.mvp_manager.remember(content, tags, metadata)
def recall(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
return self.mvp_manager.recall(query, top_k)
def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return self.mvp_manager.extract_from_messages(messages)
def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]:
return self.mvp_manager.get_daily_memories(date)
def dump_memories(self, backup_dir: str) -> bool:
return self.mvp_manager.dump_memories(backup_dir)
def get_stats(self) -> Dict[str, Any]:
return {
"backend": "simple",
"implementation": "MVPMemoryManager",
"features": ["remember", "recall", "extract", "daily_review", "backup"]
}
class OfficialMemoryBackend(MemoryManager):
"""官方GeneralTextMemory后端实现(预留)"""
def __init__(self, data_dir: Path, config: Dict[str, Any]):
self.data_dir = data_dir
self.config = config
print("⚠️ 官方后端暂未实现,使用Simple后端")
# 暂时使用Simple后端
from mvp_memory import MVPMemoryManager
self.mvp_manager = MVPMemoryManager(str(data_dir))
def remember(self, content: str, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
# TODO: 实现官方GeneralTextMemory.add()
return self.mvp_manager.remember(content, tags, metadata)
def recall(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
# TODO: 实现官方GeneralTextMemory.search()
return self.mvp_manager.recall(query, top_k)
def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
# TODO: 实现官方GeneralTextMemory.extract()
return self.mvp_manager.extract_from_messages(messages)
def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]:
# TODO: 实现官方GeneralTextMemory.get_all()过滤
return self.mvp_manager.get_daily_memories(date)
def dump_memories(self, backup_dir: str) -> bool:
# TODO: 实现官方GeneralTextMemory.dump()
return self.mvp_manager.dump_memories(backup_dir)
def get_stats(self) -> Dict[str, Any]:
return {
"backend": "official",
"implementation": "GeneralTextMemory (预留)",
"status": "未实现,使用Simple后端",
"features": ["remember", "recall", "extract", "daily_review", "backup"]
}
def main():
"""主函数 - 用于测试"""
# 测试Simple后端
manager = UnifiedMemoryManager(backend="simple")
# 测试基本功能
success = manager.remember("测试统一记忆管理器", tags=["测试", "统一"])
print(f"记忆添加: {success}")
results = manager.recall("统一记忆", top_k=3)
print(f"检索结果: {len(results)} 条")
stats = manager.get_stats()
print(f"统计信息: {stats}")
if __name__ == "__main__":
main()