mvp_memory.py•42.1 kB
#!/usr/bin/env python3
"""
MemOS MVP记忆管理模块
基于专家指导的"够用就先上"方案,先使用现有SimpleMemOS,为官方API预留接口
"""
import os
import json
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import time
# 优先使用增强版MemOS(Qwen模型),降级到基础版SimpleMemOS
try:
from enhanced_simple_memos import EnhancedSimpleMemOS
ENHANCED_AVAILABLE = True
print("✅ 增强版MemOS可用 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)")
except ImportError as e:
ENHANCED_AVAILABLE = False
print(f"⚠️ 增强版MemOS不可用,将使用基础版本: {e}")
# 基础版作为后备
from usage_examples import SimpleMemOS
# 主题漂移检测器
from topic_drift_detector import TopicDriftDetector
class MVPMemoryManager:
"""MVP记忆管理器,基于现有SimpleMemOS,为官方API预留接口"""
def __init__(self, data_dir: str = "./memos_data", use_enhanced: bool = True):
"""初始化MVP记忆管理器"""
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
# 拆分期望模式和实际模式
self.desired_mode = use_enhanced and ENHANCED_AVAILABLE # 用户期望的模式
self.current_mode = self.desired_mode # 当前实际运行的模式
self.fallback_reason = None # 降级原因
# 加载配置
self.config = self._load_config()
# 加载官方最佳实践配置
self.official_config = self._load_official_config()
# 初始化智能容量管理器
self.capacity_manager = self._initialize_capacity_manager()
# 初始化自动摘要压缩管线
self.compression_pipeline = self._initialize_compression_pipeline()
# 初始化主题漂移检测器
self.topic_drift_detector = self._initialize_topic_drift_detector()
# 初始化MemOS实例
self.memory = self._create_memory_instance()
# 显示最终状态
if self.current_mode:
model_info = "增强版 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)"
else:
model_info = f"基础版 (哈希向量){' - ' + self.fallback_reason if self.fallback_reason else ''}"
print(f"✅ MVP记忆管理器初始化成功 ({model_info})")
@property
def use_enhanced(self):
"""兼容性属性:返回当前实际运行模式"""
return self.current_mode
def get_status_info(self):
"""获取详细状态信息"""
base_info = {
"mode": "enhanced" if self.current_mode else "basic",
"model": "Qwen3-Embedding-0.6B" if self.current_mode else "哈希向量",
"reranker": "Qwen3-Reranker-0.6B" if self.current_mode else "无",
"status": "正常运行" if self.current_mode else f"基础模式{' - ' + self.fallback_reason if self.fallback_reason else ''}"
}
# 添加官方配置信息
if hasattr(self, 'official_config') and self.official_config:
base_info.update({
"official_config": True,
"config_type": self.official_config.get("config_type", "unknown"),
"memory_backend": self.official_config.get("memory", {}).get("backend", "unknown"),
"capacity_planning": self.official_config.get("scheduler", {}).get("memory_capacities", {}),
"optimizations": {
"batch_processing": self.official_config.get("scheduler", {}).get("batch_processing", {}).get("enabled", False),
"memory_compression": self.official_config.get("scheduler", {}).get("performance_optimization", {}).get("enable_memory_compression", False)
}
})
else:
base_info["official_config"] = False
# 添加容量管理信息
if hasattr(self, 'capacity_manager') and self.capacity_manager:
try:
capacity_report = self.capacity_manager.get_capacity_report()
base_info.update({
"capacity_management": {
"enabled": True,
"monitoring_status": capacity_report.get("monitoring_status", "unknown"),
"system_health": capacity_report.get("system_health", "unknown"),
"current_metrics": {
"total_memories": capacity_report.get("current_metrics", {}).get("memory_total_count", 0),
"add_avg_time": capacity_report.get("current_metrics", {}).get("memory_add_avg_time", 0),
"search_avg_time": capacity_report.get("current_metrics", {}).get("memory_search_avg_time", 0)
},
"memory_usage": capacity_report.get("memory_stats", {})
}
})
except Exception as e:
base_info["capacity_management"] = {"enabled": False, "error": str(e)}
else:
base_info["capacity_management"] = {"enabled": False}
# 添加压缩管线信息
if hasattr(self, 'compression_pipeline') and self.compression_pipeline:
try:
compression_stats = self.compression_pipeline.get_compression_stats()
base_info.update({
"compression_pipeline": {
"enabled": True,
"auto_schedule": self.compression_pipeline.config.enable_auto_schedule,
"total_compressed": compression_stats.get("total_compressed", 0),
"total_tokens_saved": compression_stats.get("total_tokens_saved", 0),
"last_compression": compression_stats.get("last_compression_time"),
"token_threshold": self.compression_pipeline.config.token_threshold,
"compression_ratio": self.compression_pipeline.config.compression_ratio
}
})
except Exception as e:
base_info["compression_pipeline"] = {"enabled": False, "error": str(e)}
else:
base_info["compression_pipeline"] = {"enabled": False}
return base_info
async def health_check(self, timeout: int = 10):
"""健康检查:验证系统是否正常工作"""
try:
# 测试记忆添加
test_content = f"健康检查测试 - {time.time()}"
success = self.remember(test_content, tags=["健康检查"])
if success:
# 测试记忆检索
results = self.recall("健康检查", top_k=1)
if results:
return {
"healthy": True,
"mode": "enhanced" if self.current_mode else "basic",
"message": "系统运行正常"
}
return {
"healthy": False,
"mode": "enhanced" if self.current_mode else "basic",
"message": "功能测试失败"
}
except Exception as e:
return {
"healthy": False,
"mode": "unknown",
"message": f"健康检查失败: {e}"
}
def _load_config(self) -> Dict[str, Any]:
"""从concurrent_config.json加载配置"""
config_path = os.environ.get('MEMOS_CONFIG', str(self.data_dir / "concurrent_config.json"))
try:
with open(config_path, 'r', encoding='utf-8') as f:
concurrent_config = json.load(f)
print(f"[MVP] 加载配置文件: {config_path}")
print(f"[MVP] Qdrant端口: {concurrent_config.get('qdrant_port', 6335)}")
return concurrent_config
except Exception as e:
print(f"⚠️ 配置文件加载失败: {e}")
# 使用默认配置
return {
"qdrant_port": 6335,
"qdrant_mode": "server"
}
def _load_official_config(self):
"""加载官方最佳实践配置"""
official_config_path = self.data_dir / "official_config.json"
try:
if official_config_path.exists():
with open(official_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"[MVP] 官方配置已加载: {official_config_path}")
return config
else:
# 尝试生成官方配置
try:
from official_config_manager import create_official_config_manager
config_manager = create_official_config_manager(str(self.data_dir))
config = config_manager.generate_official_config()
config_manager.save_config()
print(f"[MVP] 官方配置已生成并加载")
return config
except Exception as e:
print(f"[MVP] 官方配置生成失败: {e}")
return None
except Exception as e:
print(f"[MVP] 官方配置加载失败: {e}")
return None
def _initialize_capacity_manager(self):
"""初始化智能容量管理器"""
try:
from capacity_manager import create_capacity_manager, CapacityConfig
# 从官方配置中获取容量设置
if self.official_config and "scheduler" in self.official_config:
capacities = self.official_config["scheduler"].get("memory_capacities", {})
batch_config = self.official_config["scheduler"].get("batch_processing", {})
config = CapacityConfig(
working_memory_capacity=capacities.get("working_memory_capacity", 20),
user_memory_capacity=capacities.get("user_memory_capacity", 500),
long_term_memory_capacity=capacities.get("long_term_memory_capacity", 2000),
batch_size=batch_config.get("batch_size", 32),
max_concurrent_requests=batch_config.get("max_concurrent_requests", 4)
)
else:
config = CapacityConfig() # 使用默认配置
capacity_manager = create_capacity_manager(str(self.data_dir), config)
# 启动性能监控
capacity_manager.start_monitoring()
print(f"[MVP] 智能容量管理器已初始化")
return capacity_manager
except Exception as e:
print(f"[MVP] 容量管理器初始化失败: {e}")
return None
def _initialize_compression_pipeline(self):
"""初始化自动摘要压缩管线"""
try:
from compression_pipeline import create_compression_pipeline, CompressionConfig
# 从官方配置中获取压缩设置
if self.official_config and "scheduler" in self.official_config:
performance_config = self.official_config["scheduler"].get("performance_optimization", {})
config = CompressionConfig(
token_threshold=performance_config.get("compression_threshold", 1000),
enable_auto_schedule=performance_config.get("enable_memory_compression", True),
compression_ratio=performance_config.get("compression_ratio", 0.3)
)
else:
config = CompressionConfig() # 使用默认配置
compression_pipeline = create_compression_pipeline(self, config)
# 启动定时调度器
if config.enable_auto_schedule:
compression_pipeline.start_scheduler()
print(f"[MVP] 自动摘要压缩管线已初始化")
return compression_pipeline
except Exception as e:
print(f"[MVP] 压缩管线初始化失败: {e}")
return None
def _initialize_topic_drift_detector(self):
"""初始化主题漂移检测器"""
try:
# 从官方配置中获取主题漂移检测参数
if self.official_config and 'topic_drift' in self.official_config:
drift_config = self.official_config['topic_drift']
window_size = drift_config.get('window_size', 5)
drift_threshold = drift_config.get('drift_threshold', 0.5)
min_similarity = drift_config.get('min_similarity', 0.3)
else:
# 使用默认配置
window_size = 5
drift_threshold = 0.5
min_similarity = 0.3
detector = TopicDriftDetector(
window_size=window_size,
drift_threshold=drift_threshold,
min_similarity=min_similarity
)
print(f"[MVP] 主题漂移检测器已初始化")
return detector
except Exception as e:
print(f"[MVP] 主题漂移检测器初始化失败: {e}")
# 返回一个简单的占位符,不影响主要功能
return None
def _create_memory_instance(self):
"""创建MemOS实例"""
try:
if self.current_mode:
# 尝试使用增强版MemOS
memory = EnhancedSimpleMemOS(str(self.data_dir))
print("✅ 增强版MemOS实例创建成功 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)")
else:
# 使用基础版SimpleMemOS
memory = SimpleMemOS(str(self.data_dir))
print("✅ 基础版SimpleMemOS实例创建成功 (哈希向量)")
return memory
except Exception as e:
print(f"❌ MemOS实例创建失败: {e}")
# 如果增强版失败,自动降级到基础版
if self.current_mode:
print("🔄 增强版初始化失败,自动降级到基础版...")
self.fallback_reason = f"初始化失败: {str(e)[:50]}..."
try:
memory = SimpleMemOS(str(self.data_dir))
print("✅ 自动降级到基础版成功 (哈希向量)")
self.current_mode = False # 只改变当前模式,保持期望模式不变
return memory
except Exception as e2:
print(f"❌ 基础版也失败: {e2}")
self.fallback_reason = f"全部失败: {str(e2)[:50]}..."
raise
def test_connection(self) -> bool:
"""测试连接和基础功能"""
try:
print("🔍 测试MVP记忆管理器连接...")
# 测试添加记忆
test_content = "这是一个MVP测试记忆"
memory_id = self.memory.add_memory(test_content, tags=["test", "mvp"])
if memory_id:
print("✅ 记忆添加测试成功")
else:
print("⚠️ 记忆添加测试失败")
return False
# 测试搜索记忆
results = self.memory.search_memories("MVP测试", limit=1)
if results:
print(f"✅ 记忆搜索测试成功,找到 {len(results)} 条记忆")
else:
print("⚠️ 记忆搜索测试:未找到结果")
# 测试获取所有记忆
all_memories = self.memory.get_all_memories()
print(f"✅ 获取所有记忆测试成功,共 {len(all_memories)} 条记忆")
return True
except Exception as e:
print(f"❌ 连接测试失败: {e}")
return False
def remember(self, content: str, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""一键记忆功能 - 集成智能容量管理"""
start_time = time.time()
try:
# 准备元数据
full_metadata = {
"type": "manual",
"source": "manual",
"timestamp": datetime.now().isoformat(),
**(metadata or {})
}
# 使用SimpleMemOS的add_memory方法
memory_id = self.memory.add_memory(content, tags=tags, metadata=full_metadata)
if memory_id:
# 记录操作时间
duration = time.time() - start_time
if self.capacity_manager:
self.capacity_manager.record_operation_time("memory_add", duration)
# 更新容量统计(简化版,实际应该根据内容类型分类)
from capacity_manager import MemoryType
total_memories = len(self.memory.get_all_memories()) if hasattr(self.memory, 'get_all_memories') else 1
self.capacity_manager.update_memory_stats(MemoryType.USER, total_memories)
print(f"✅ 记忆已保存 #{memory_id}: {content[:50]}... (耗时: {duration:.3f}s)")
return True
else:
print(f"❌ 记忆保存失败: {content[:50]}...")
return False
except Exception as e:
print(f"❌ 记忆保存失败: {e}")
return False
def recall(self, query: str, top_k: int = 5, use_reranker: bool = None,
use_feedback_boost: bool = True, use_time_decay: bool = True,
time_decay_tau: float = 30.0, use_topic_drift_detection: bool = True) -> List[Dict[str, Any]]:
"""智能检索功能 - 默认使用Qwen重排器优化结果,支持反馈分数加权、时间感知排序和主题漂移检测"""
start_time = time.time()
# 导入访问追踪器(延迟导入避免循环依赖)
try:
from access_tracker import track_batch_access
access_tracking_enabled = True
except ImportError:
access_tracking_enabled = False
try:
# 主题漂移检测
if use_topic_drift_detection and self.topic_drift_detector:
should_clear, reason = self.topic_drift_detector.should_clear_candidates(query)
if should_clear:
print(f"🔄 主题漂移检测: {reason}")
print(f" 建议清空候选集,重新开始检索")
# 注意:这里我们只是提示,实际的候选集清空需要在上层应用中处理
# 因为MVP管理器本身是无状态的检索服务
# 如果是增强版MemOS,默认启用重排器
if self.use_enhanced and hasattr(self.memory, 'search_memories'):
# 默认启用重排器,除非明确禁用
if use_reranker is None:
use_reranker = True
# 获取更多结果用于反馈排序
search_limit = top_k * 2 if use_feedback_boost else top_k
results = self.memory.search_memories(query, limit=search_limit, use_reranker=use_reranker)
# 显示使用的模型信息
rerank_info = "含重排优化" if use_reranker else "仅向量搜索"
print(f"🔍 使用Qwen模型检索到 {len(results)} 条相关记忆 ({rerank_info})")
else:
# 基础版SimpleMemOS
search_limit = top_k * 2 if use_feedback_boost else top_k
results = self.memory.search_memories(query, limit=search_limit)
print(f"🔍 使用基础版检索到 {len(results)} 条相关记忆 (哈希向量)")
# 应用反馈分数加权
if use_feedback_boost and results:
results = self._apply_feedback_boost(results)
print(f"📊 应用反馈分数加权,重新排序结果")
# 应用时间感知排序
if use_time_decay and results:
results = self._apply_time_decay_ranking(results, time_decay_tau)
print(f"⏰ 应用时间感知排序,τ={time_decay_tau}天")
# 记录操作时间
duration = time.time() - start_time
if self.capacity_manager:
self.capacity_manager.record_operation_time("memory_search", duration)
# 返回top_k结果
final_results = results[:top_k]
# 记录访问统计
if access_tracking_enabled and final_results:
memory_ids = [str(result.get('id', '')) for result in final_results if result.get('id')]
if memory_ids:
track_batch_access(memory_ids, "search")
print(f"🔍 检索完成,返回 {len(final_results)} 条结果 (耗时: {duration:.3f}s)")
return final_results
except Exception as e:
print(f"❌ 记忆检索失败: {e}")
return []
def _apply_feedback_boost(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""应用反馈分数加权到检索结果"""
try:
for result in results:
metadata = result.get('metadata', {})
usage_score = metadata.get('usage_score', 0)
# 获取原始相似度分数
original_score = result.get('score', 0.5)
# 计算反馈加权:usage_score转换为0.8-1.2的乘数
# 正分数提升排序,负分数降低排序
feedback_multiplier = 1.0 + (usage_score * 0.1) # 每1分影响10%
feedback_multiplier = max(0.5, min(2.0, feedback_multiplier)) # 限制在0.5-2.0之间
# 应用加权
boosted_score = original_score * feedback_multiplier
result['score'] = boosted_score
result['feedback_boost'] = feedback_multiplier
# 添加调试信息
if usage_score != 0:
print(f" 📈 记忆#{result.get('id')}: usage_score={usage_score}, "
f"boost={feedback_multiplier:.2f}, score={original_score:.3f}→{boosted_score:.3f}")
# 按新分数重新排序
results.sort(key=lambda x: x.get('score', 0), reverse=True)
return results
except Exception as e:
print(f"❌ 反馈加权失败: {e}")
return results
def _apply_time_decay_ranking(self, results: List[Dict[str, Any]], tau_days: float = 30.0) -> List[Dict[str, Any]]:
"""应用时间衰减函数到检索结果,实现时间感知排序"""
try:
import math
from datetime import datetime
current_time = datetime.now()
for result in results:
metadata = result.get('metadata', {})
timestamp_str = metadata.get('timestamp')
if timestamp_str:
try:
# 解析时间戳
if timestamp_str.endswith('Z'):
# ISO格式带Z后缀
memory_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
elif '+' in timestamp_str or timestamp_str.count('-') > 2:
# ISO格式带时区信息
memory_time = datetime.fromisoformat(timestamp_str)
else:
# 简单ISO格式
memory_time = datetime.fromisoformat(timestamp_str)
# 计算时间差(天数)
time_delta = (current_time - memory_time.replace(tzinfo=None)).total_seconds() / (24 * 3600)
# 计算时间衰减因子:exp(-Δt/τ)
time_decay_factor = math.exp(-time_delta / tau_days)
# 获取当前分数(可能已经被反馈加权过)
current_score = result.get('score', 0.5)
# 应用时间衰减:rank = cos_sim × exp(-Δt/τ)
time_aware_score = current_score * time_decay_factor
result['score'] = time_aware_score
result['time_decay_factor'] = time_decay_factor
result['days_ago'] = time_delta
# 添加调试信息
if time_delta < tau_days: # 只显示相对较新的记忆的调试信息
print(f" ⏰ 记忆#{result.get('id')}: {time_delta:.1f}天前, "
f"衰减={time_decay_factor:.3f}, score={current_score:.3f}→{time_aware_score:.3f}")
except (ValueError, TypeError) as e:
# 时间戳解析失败,保持原分数
print(f" ⚠️ 记忆#{result.get('id')} 时间戳解析失败: {timestamp_str}, 错误: {e}")
result['time_decay_factor'] = 1.0
result['days_ago'] = float('inf')
else:
# 没有时间戳,设置默认衰减因子
result['time_decay_factor'] = 0.1 # 给没有时间戳的记忆较低权重
result['days_ago'] = float('inf')
print(f" ⚠️ 记忆#{result.get('id')} 缺少时间戳,使用默认衰减因子0.1")
# 按时间感知分数重新排序
results.sort(key=lambda x: x.get('score', 0), reverse=True)
return results
except Exception as e:
print(f"❌ 时间感知排序失败: {e}")
return results
def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""从对话消息中提取记忆(暂时使用简单实现)"""
try:
# 暂时使用简单的提取逻辑,后续可以集成官方extract方法
extracted_memories = []
for message in messages:
if message.get("role") == "user":
content = message.get("content", "")
# 简单的启发式规则:包含"我"、"喜欢"、"不喜欢"等个人信息的内容
if any(keyword in content for keyword in ["我", "喜欢", "不喜欢", "记住", "重要"]):
memory_data = {
"content": content,
"metadata": {
"type": "extracted",
"source": "conversation",
"timestamp": datetime.now().isoformat()
}
}
extracted_memories.append(memory_data)
print(f"🧠 从对话中提取到 {len(extracted_memories)} 条记忆")
return extracted_memories
except Exception as e:
print(f"❌ 记忆提取失败: {e}")
return []
def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取指定日期的记忆"""
try:
# 获取所有记忆
all_memories = self.memory.get_all_memories()
# 如果没有指定日期,使用今天
if not date:
date = datetime.now().strftime("%Y-%m-%d")
# 过滤指定日期的记忆
daily_memories = []
for memory in all_memories:
# 检查metadata中的timestamp
metadata = memory.get("metadata", {})
timestamp = metadata.get("timestamp", "")
if timestamp.startswith(date):
daily_memories.append(memory)
print(f"📅 找到 {date} 的 {len(daily_memories)} 条记忆")
return daily_memories
except Exception as e:
print(f"❌ 获取每日记忆失败: {e}")
return []
def dump_memories(self, backup_dir: str) -> bool:
"""备份记忆到指定目录"""
try:
backup_path = Path(backup_dir)
backup_path.mkdir(parents=True, exist_ok=True)
# 获取所有记忆并保存为JSON
all_memories = self.memory.get_all_memories()
backup_file = backup_path / f"memories_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(all_memories, f, ensure_ascii=False, indent=2)
print(f"💾 记忆已备份到: {backup_file}")
return True
except Exception as e:
print(f"❌ 记忆备份失败: {e}")
return False
def provide_feedback(self, memory_id: str, feedback_type: str) -> bool:
"""为记忆提供👍/👎反馈,影响后续检索排序"""
try:
# 获取所有记忆
all_memories = self.memory.get_all_memories()
# 查找目标记忆
target_memory = None
for memory in all_memories:
if str(memory.get('id', '')) == str(memory_id):
target_memory = memory
break
if not target_memory:
print(f"❌ 未找到记忆ID: {memory_id}")
return False
# 获取当前metadata
metadata = target_memory.get('metadata', {})
current_score = metadata.get('usage_score', 0)
# 根据反馈类型调整分数
if feedback_type == 'thumbs_up' or feedback_type == '👍':
new_score = current_score + 1
feedback_emoji = '👍'
print(f"👍 正面反馈: usage_score {current_score} → {new_score}")
elif feedback_type == 'thumbs_down' or feedback_type == '👎':
new_score = current_score - 1
feedback_emoji = '👎'
print(f"👎 负面反馈: usage_score {current_score} → {new_score}")
else:
print(f"❌ 无效的反馈类型: {feedback_type}")
return False
# 更新metadata
metadata['usage_score'] = new_score
metadata['last_feedback'] = datetime.now().isoformat()
metadata['feedback_count'] = metadata.get('feedback_count', 0) + 1
# 如果分数过低,标记为需要重写
if new_score <= -3:
metadata['needs_rewrite'] = True
print(f"⚠️ 记忆分数过低({new_score}),已标记为需要重写")
# 更新记忆(这里需要调用底层的更新方法)
# 由于SimpleMemOS可能没有直接的更新方法,我们通过重新添加来实现
content = target_memory.get('content', '')
tags = target_memory.get('tags', [])
# 删除旧记忆并添加新记忆(保持相同ID)
try:
# 尝试更新记忆
success = self._update_memory_metadata(memory_id, metadata)
if success:
print(f"✅ 反馈已保存 {feedback_emoji} 记忆#{memory_id}: {content[:30]}...")
return True
else:
print(f"❌ 记忆更新失败")
return False
except Exception as e:
print(f"❌ 反馈保存失败: {e}")
return False
except Exception as e:
print(f"❌ 反馈处理失败: {e}")
return False
def _update_memory_metadata(self, memory_id: str, new_metadata: Dict[str, Any]) -> bool:
"""更新记忆的metadata(内部方法)"""
try:
# 转换memory_id为整数
memory_id_int = int(memory_id)
# 检查是否是增强版MemOS
if hasattr(self.memory, 'update_memory_metadata'):
# 使用增强版的更新方法
return self.memory.update_memory_metadata(memory_id_int, new_metadata)
elif hasattr(self.memory, 'vector_db'):
# 直接使用Qdrant客户端更新(增强版使用vector_db)
self.memory.vector_db.set_payload(
collection_name=self.memory.collection_name,
payload={"metadata": new_metadata},
points=[memory_id_int]
)
print(f"✅ 记忆#{memory_id}的metadata已更新")
return True
elif hasattr(self.memory, 'qdrant_client'):
# 直接使用Qdrant客户端更新(其他版本使用qdrant_client)
self.memory.qdrant_client.set_payload(
collection_name=self.memory.collection_name,
payload={"metadata": new_metadata},
points=[memory_id_int]
)
print(f"✅ 记忆#{memory_id}的metadata已更新")
return True
else:
# 如果没有直接的更新方法,返回False
print("⚠️ 当前存储后端不支持直接更新")
return False
except ValueError:
print(f"❌ 无效的记忆ID格式: {memory_id}")
return False
except Exception as e:
print(f"❌ metadata更新失败: {e}")
return False
def get_feedback_stats(self) -> Dict[str, Any]:
"""获取反馈统计信息"""
try:
all_memories = self.memory.get_all_memories()
stats = {
'total_memories': len(all_memories),
'memories_with_feedback': 0,
'positive_feedback': 0,
'negative_feedback': 0,
'average_score': 0,
'needs_rewrite': 0
}
total_score = 0
scored_memories = 0
for memory in all_memories:
metadata = memory.get('metadata', {})
usage_score = metadata.get('usage_score')
if usage_score is not None:
stats['memories_with_feedback'] += 1
total_score += usage_score
scored_memories += 1
if usage_score > 0:
stats['positive_feedback'] += 1
elif usage_score < 0:
stats['negative_feedback'] += 1
if metadata.get('needs_rewrite'):
stats['needs_rewrite'] += 1
if scored_memories > 0:
stats['average_score'] = round(total_score / scored_memories, 2)
return stats
except Exception as e:
print(f"❌ 获取反馈统计失败: {e}")
return {}
def apply_score_decay(self, decay_factor: float = 0.98) -> bool:
"""应用分数衰减机制,防止记忆僵化"""
try:
all_memories = self.memory.get_all_memories()
updated_count = 0
for memory in all_memories:
memory_id = memory.get('id')
metadata = memory.get('metadata', {})
usage_score = metadata.get('usage_score')
if usage_score is not None and usage_score != 0:
# 应用衰减
new_score = usage_score * decay_factor
# 如果分数接近0,直接设为0
if abs(new_score) < 0.1:
new_score = 0
metadata['usage_score'] = round(new_score, 2)
# 更新记忆
if self._update_memory_metadata(str(memory_id), metadata):
updated_count += 1
print(f"✅ 分数衰减完成,更新了 {updated_count} 条记忆")
return True
except Exception as e:
print(f"❌ 分数衰减失败: {e}")
return False
def get_capacity_report(self) -> Dict[str, Any]:
"""获取详细的容量管理报告"""
if hasattr(self, 'capacity_manager') and self.capacity_manager:
return self.capacity_manager.get_capacity_report()
else:
return {"error": "容量管理器未启用"}
def get_performance_metrics(self) -> Dict[str, Any]:
"""获取性能指标"""
if hasattr(self, 'capacity_manager') and self.capacity_manager:
metrics = self.capacity_manager.get_performance_metrics()
return {
"timestamp": metrics.timestamp,
"memory_add_avg_time": metrics.memory_add_avg_time,
"memory_search_avg_time": metrics.memory_search_avg_time,
"memory_total_count": metrics.memory_total_count,
"system_health": metrics.system_health,
"cpu_usage": metrics.cpu_usage,
"memory_usage_mb": metrics.memory_usage_mb,
"disk_usage_mb": metrics.disk_usage_mb
}
else:
return {"error": "容量管理器未启用"}
def trigger_manual_optimization(self) -> bool:
"""手动触发系统优化"""
if hasattr(self, 'capacity_manager') and self.capacity_manager:
try:
# 获取当前状态
metrics = self.capacity_manager.get_performance_metrics()
# 检查是否需要优化
optimizations_applied = []
# 检查内存使用情况
for memory_type, stats in metrics.memory_usage_by_type.items():
if stats.is_critical():
print(f"🗜️ 触发{memory_type}内存优化")
optimizations_applied.append(f"{memory_type}_compression")
# 检查性能指标
if metrics.memory_add_avg_time > 1.0:
print(f"⚡ 优化记忆添加性能")
optimizations_applied.append("add_performance")
if metrics.memory_search_avg_time > 0.5:
print(f"⚡ 优化记忆检索性能")
optimizations_applied.append("search_performance")
if optimizations_applied:
print(f"✅ 应用了 {len(optimizations_applied)} 项优化: {', '.join(optimizations_applied)}")
return True
else:
print(f"ℹ️ 系统运行良好,无需优化")
return True
except Exception as e:
print(f"❌ 手动优化失败: {e}")
return False
else:
print(f"❌ 容量管理器未启用")
return False
def trigger_compression(self, max_memories: int = 50, test_mode: bool = False) -> Dict[str, Any]:
"""手动触发记忆压缩"""
if hasattr(self, 'compression_pipeline') and self.compression_pipeline:
try:
result = self.compression_pipeline.run_compression_batch(
max_memories=max_memories,
test_mode=test_mode
)
return result
except Exception as e:
print(f"❌ 压缩失败: {e}")
return {"error": str(e)}
else:
print(f"❌ 压缩管线未启用")
return {"error": "压缩管线未启用"}
def get_compression_stats(self) -> Dict[str, Any]:
"""获取压缩统计信息"""
if hasattr(self, 'compression_pipeline') and self.compression_pipeline:
return self.compression_pipeline.get_compression_stats()
else:
return {"error": "压缩管线未启用"}
def cleanup_old_archives(self, days: int = None) -> bool:
"""清理旧的归档文件"""
if hasattr(self, 'compression_pipeline') and self.compression_pipeline:
try:
self.compression_pipeline.cleanup_old_archives(days)
return True
except Exception as e:
print(f"❌ 清理归档失败: {e}")
return False
else:
print(f"❌ 压缩管线未启用")
return False
def create_mvp_memory_manager(data_dir: str = "./memos_data", use_official_config: bool = True) -> MVPMemoryManager:
"""创建MVP记忆管理器的工厂函数"""
if use_official_config:
# 尝试使用官方最佳实践配置
try:
from official_config_manager import create_official_config_manager
config_manager = create_official_config_manager(data_dir)
config_manager.save_config()
print("✅ 应用官方最佳实践配置")
except Exception as e:
print(f"⚠️ 官方配置应用失败,使用默认配置: {e}")
return MVPMemoryManager(data_dir)
if __name__ == "__main__":
# 测试MVP记忆管理器
print("🚀 测试MVP记忆管理器")
print("=" * 50)
try:
# 创建管理器
manager = create_mvp_memory_manager()
# 测试连接
if manager.test_connection():
print("✅ MVP记忆管理器测试通过")
else:
print("❌ MVP记忆管理器测试失败")
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")