Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
mvp_memory.py42.1 kB
#!/usr/bin/env python3 """ MemOS MVP记忆管理模块 基于专家指导的"够用就先上"方案,先使用现有SimpleMemOS,为官方API预留接口 """ import os import json import time from pathlib import Path from typing import Dict, Any, List, Optional from datetime import datetime import time # 优先使用增强版MemOS(Qwen模型),降级到基础版SimpleMemOS try: from enhanced_simple_memos import EnhancedSimpleMemOS ENHANCED_AVAILABLE = True print("✅ 增强版MemOS可用 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)") except ImportError as e: ENHANCED_AVAILABLE = False print(f"⚠️ 增强版MemOS不可用,将使用基础版本: {e}") # 基础版作为后备 from usage_examples import SimpleMemOS # 主题漂移检测器 from topic_drift_detector import TopicDriftDetector class MVPMemoryManager: """MVP记忆管理器,基于现有SimpleMemOS,为官方API预留接口""" def __init__(self, data_dir: str = "./memos_data", use_enhanced: bool = True): """初始化MVP记忆管理器""" self.data_dir = Path(data_dir) self.data_dir.mkdir(exist_ok=True) # 拆分期望模式和实际模式 self.desired_mode = use_enhanced and ENHANCED_AVAILABLE # 用户期望的模式 self.current_mode = self.desired_mode # 当前实际运行的模式 self.fallback_reason = None # 降级原因 # 加载配置 self.config = self._load_config() # 加载官方最佳实践配置 self.official_config = self._load_official_config() # 初始化智能容量管理器 self.capacity_manager = self._initialize_capacity_manager() # 初始化自动摘要压缩管线 self.compression_pipeline = self._initialize_compression_pipeline() # 初始化主题漂移检测器 self.topic_drift_detector = self._initialize_topic_drift_detector() # 初始化MemOS实例 self.memory = self._create_memory_instance() # 显示最终状态 if self.current_mode: model_info = "增强版 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)" else: model_info = f"基础版 (哈希向量){' - ' + self.fallback_reason if self.fallback_reason else ''}" print(f"✅ MVP记忆管理器初始化成功 ({model_info})") @property def use_enhanced(self): """兼容性属性:返回当前实际运行模式""" return self.current_mode def get_status_info(self): """获取详细状态信息""" base_info = { "mode": "enhanced" if self.current_mode else "basic", "model": "Qwen3-Embedding-0.6B" if self.current_mode else "哈希向量", "reranker": "Qwen3-Reranker-0.6B" if self.current_mode else "无", "status": "正常运行" if self.current_mode else f"基础模式{' - ' + self.fallback_reason if self.fallback_reason else ''}" } # 添加官方配置信息 if hasattr(self, 'official_config') and self.official_config: base_info.update({ "official_config": True, "config_type": self.official_config.get("config_type", "unknown"), "memory_backend": self.official_config.get("memory", {}).get("backend", "unknown"), "capacity_planning": self.official_config.get("scheduler", {}).get("memory_capacities", {}), "optimizations": { "batch_processing": self.official_config.get("scheduler", {}).get("batch_processing", {}).get("enabled", False), "memory_compression": self.official_config.get("scheduler", {}).get("performance_optimization", {}).get("enable_memory_compression", False) } }) else: base_info["official_config"] = False # 添加容量管理信息 if hasattr(self, 'capacity_manager') and self.capacity_manager: try: capacity_report = self.capacity_manager.get_capacity_report() base_info.update({ "capacity_management": { "enabled": True, "monitoring_status": capacity_report.get("monitoring_status", "unknown"), "system_health": capacity_report.get("system_health", "unknown"), "current_metrics": { "total_memories": capacity_report.get("current_metrics", {}).get("memory_total_count", 0), "add_avg_time": capacity_report.get("current_metrics", {}).get("memory_add_avg_time", 0), "search_avg_time": capacity_report.get("current_metrics", {}).get("memory_search_avg_time", 0) }, "memory_usage": capacity_report.get("memory_stats", {}) } }) except Exception as e: base_info["capacity_management"] = {"enabled": False, "error": str(e)} else: base_info["capacity_management"] = {"enabled": False} # 添加压缩管线信息 if hasattr(self, 'compression_pipeline') and self.compression_pipeline: try: compression_stats = self.compression_pipeline.get_compression_stats() base_info.update({ "compression_pipeline": { "enabled": True, "auto_schedule": self.compression_pipeline.config.enable_auto_schedule, "total_compressed": compression_stats.get("total_compressed", 0), "total_tokens_saved": compression_stats.get("total_tokens_saved", 0), "last_compression": compression_stats.get("last_compression_time"), "token_threshold": self.compression_pipeline.config.token_threshold, "compression_ratio": self.compression_pipeline.config.compression_ratio } }) except Exception as e: base_info["compression_pipeline"] = {"enabled": False, "error": str(e)} else: base_info["compression_pipeline"] = {"enabled": False} return base_info async def health_check(self, timeout: int = 10): """健康检查:验证系统是否正常工作""" try: # 测试记忆添加 test_content = f"健康检查测试 - {time.time()}" success = self.remember(test_content, tags=["健康检查"]) if success: # 测试记忆检索 results = self.recall("健康检查", top_k=1) if results: return { "healthy": True, "mode": "enhanced" if self.current_mode else "basic", "message": "系统运行正常" } return { "healthy": False, "mode": "enhanced" if self.current_mode else "basic", "message": "功能测试失败" } except Exception as e: return { "healthy": False, "mode": "unknown", "message": f"健康检查失败: {e}" } def _load_config(self) -> Dict[str, Any]: """从concurrent_config.json加载配置""" config_path = os.environ.get('MEMOS_CONFIG', str(self.data_dir / "concurrent_config.json")) try: with open(config_path, 'r', encoding='utf-8') as f: concurrent_config = json.load(f) print(f"[MVP] 加载配置文件: {config_path}") print(f"[MVP] Qdrant端口: {concurrent_config.get('qdrant_port', 6335)}") return concurrent_config except Exception as e: print(f"⚠️ 配置文件加载失败: {e}") # 使用默认配置 return { "qdrant_port": 6335, "qdrant_mode": "server" } def _load_official_config(self): """加载官方最佳实践配置""" official_config_path = self.data_dir / "official_config.json" try: if official_config_path.exists(): with open(official_config_path, 'r', encoding='utf-8') as f: config = json.load(f) print(f"[MVP] 官方配置已加载: {official_config_path}") return config else: # 尝试生成官方配置 try: from official_config_manager import create_official_config_manager config_manager = create_official_config_manager(str(self.data_dir)) config = config_manager.generate_official_config() config_manager.save_config() print(f"[MVP] 官方配置已生成并加载") return config except Exception as e: print(f"[MVP] 官方配置生成失败: {e}") return None except Exception as e: print(f"[MVP] 官方配置加载失败: {e}") return None def _initialize_capacity_manager(self): """初始化智能容量管理器""" try: from capacity_manager import create_capacity_manager, CapacityConfig # 从官方配置中获取容量设置 if self.official_config and "scheduler" in self.official_config: capacities = self.official_config["scheduler"].get("memory_capacities", {}) batch_config = self.official_config["scheduler"].get("batch_processing", {}) config = CapacityConfig( working_memory_capacity=capacities.get("working_memory_capacity", 20), user_memory_capacity=capacities.get("user_memory_capacity", 500), long_term_memory_capacity=capacities.get("long_term_memory_capacity", 2000), batch_size=batch_config.get("batch_size", 32), max_concurrent_requests=batch_config.get("max_concurrent_requests", 4) ) else: config = CapacityConfig() # 使用默认配置 capacity_manager = create_capacity_manager(str(self.data_dir), config) # 启动性能监控 capacity_manager.start_monitoring() print(f"[MVP] 智能容量管理器已初始化") return capacity_manager except Exception as e: print(f"[MVP] 容量管理器初始化失败: {e}") return None def _initialize_compression_pipeline(self): """初始化自动摘要压缩管线""" try: from compression_pipeline import create_compression_pipeline, CompressionConfig # 从官方配置中获取压缩设置 if self.official_config and "scheduler" in self.official_config: performance_config = self.official_config["scheduler"].get("performance_optimization", {}) config = CompressionConfig( token_threshold=performance_config.get("compression_threshold", 1000), enable_auto_schedule=performance_config.get("enable_memory_compression", True), compression_ratio=performance_config.get("compression_ratio", 0.3) ) else: config = CompressionConfig() # 使用默认配置 compression_pipeline = create_compression_pipeline(self, config) # 启动定时调度器 if config.enable_auto_schedule: compression_pipeline.start_scheduler() print(f"[MVP] 自动摘要压缩管线已初始化") return compression_pipeline except Exception as e: print(f"[MVP] 压缩管线初始化失败: {e}") return None def _initialize_topic_drift_detector(self): """初始化主题漂移检测器""" try: # 从官方配置中获取主题漂移检测参数 if self.official_config and 'topic_drift' in self.official_config: drift_config = self.official_config['topic_drift'] window_size = drift_config.get('window_size', 5) drift_threshold = drift_config.get('drift_threshold', 0.5) min_similarity = drift_config.get('min_similarity', 0.3) else: # 使用默认配置 window_size = 5 drift_threshold = 0.5 min_similarity = 0.3 detector = TopicDriftDetector( window_size=window_size, drift_threshold=drift_threshold, min_similarity=min_similarity ) print(f"[MVP] 主题漂移检测器已初始化") return detector except Exception as e: print(f"[MVP] 主题漂移检测器初始化失败: {e}") # 返回一个简单的占位符,不影响主要功能 return None def _create_memory_instance(self): """创建MemOS实例""" try: if self.current_mode: # 尝试使用增强版MemOS memory = EnhancedSimpleMemOS(str(self.data_dir)) print("✅ 增强版MemOS实例创建成功 (Qwen3-Embedding-0.6B + Qwen3-Reranker-0.6B)") else: # 使用基础版SimpleMemOS memory = SimpleMemOS(str(self.data_dir)) print("✅ 基础版SimpleMemOS实例创建成功 (哈希向量)") return memory except Exception as e: print(f"❌ MemOS实例创建失败: {e}") # 如果增强版失败,自动降级到基础版 if self.current_mode: print("🔄 增强版初始化失败,自动降级到基础版...") self.fallback_reason = f"初始化失败: {str(e)[:50]}..." try: memory = SimpleMemOS(str(self.data_dir)) print("✅ 自动降级到基础版成功 (哈希向量)") self.current_mode = False # 只改变当前模式,保持期望模式不变 return memory except Exception as e2: print(f"❌ 基础版也失败: {e2}") self.fallback_reason = f"全部失败: {str(e2)[:50]}..." raise def test_connection(self) -> bool: """测试连接和基础功能""" try: print("🔍 测试MVP记忆管理器连接...") # 测试添加记忆 test_content = "这是一个MVP测试记忆" memory_id = self.memory.add_memory(test_content, tags=["test", "mvp"]) if memory_id: print("✅ 记忆添加测试成功") else: print("⚠️ 记忆添加测试失败") return False # 测试搜索记忆 results = self.memory.search_memories("MVP测试", limit=1) if results: print(f"✅ 记忆搜索测试成功,找到 {len(results)} 条记忆") else: print("⚠️ 记忆搜索测试:未找到结果") # 测试获取所有记忆 all_memories = self.memory.get_all_memories() print(f"✅ 获取所有记忆测试成功,共 {len(all_memories)} 条记忆") return True except Exception as e: print(f"❌ 连接测试失败: {e}") return False def remember(self, content: str, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None) -> bool: """一键记忆功能 - 集成智能容量管理""" start_time = time.time() try: # 准备元数据 full_metadata = { "type": "manual", "source": "manual", "timestamp": datetime.now().isoformat(), **(metadata or {}) } # 使用SimpleMemOS的add_memory方法 memory_id = self.memory.add_memory(content, tags=tags, metadata=full_metadata) if memory_id: # 记录操作时间 duration = time.time() - start_time if self.capacity_manager: self.capacity_manager.record_operation_time("memory_add", duration) # 更新容量统计(简化版,实际应该根据内容类型分类) from capacity_manager import MemoryType total_memories = len(self.memory.get_all_memories()) if hasattr(self.memory, 'get_all_memories') else 1 self.capacity_manager.update_memory_stats(MemoryType.USER, total_memories) print(f"✅ 记忆已保存 #{memory_id}: {content[:50]}... (耗时: {duration:.3f}s)") return True else: print(f"❌ 记忆保存失败: {content[:50]}...") return False except Exception as e: print(f"❌ 记忆保存失败: {e}") return False def recall(self, query: str, top_k: int = 5, use_reranker: bool = None, use_feedback_boost: bool = True, use_time_decay: bool = True, time_decay_tau: float = 30.0, use_topic_drift_detection: bool = True) -> List[Dict[str, Any]]: """智能检索功能 - 默认使用Qwen重排器优化结果,支持反馈分数加权、时间感知排序和主题漂移检测""" start_time = time.time() # 导入访问追踪器(延迟导入避免循环依赖) try: from access_tracker import track_batch_access access_tracking_enabled = True except ImportError: access_tracking_enabled = False try: # 主题漂移检测 if use_topic_drift_detection and self.topic_drift_detector: should_clear, reason = self.topic_drift_detector.should_clear_candidates(query) if should_clear: print(f"🔄 主题漂移检测: {reason}") print(f" 建议清空候选集,重新开始检索") # 注意:这里我们只是提示,实际的候选集清空需要在上层应用中处理 # 因为MVP管理器本身是无状态的检索服务 # 如果是增强版MemOS,默认启用重排器 if self.use_enhanced and hasattr(self.memory, 'search_memories'): # 默认启用重排器,除非明确禁用 if use_reranker is None: use_reranker = True # 获取更多结果用于反馈排序 search_limit = top_k * 2 if use_feedback_boost else top_k results = self.memory.search_memories(query, limit=search_limit, use_reranker=use_reranker) # 显示使用的模型信息 rerank_info = "含重排优化" if use_reranker else "仅向量搜索" print(f"🔍 使用Qwen模型检索到 {len(results)} 条相关记忆 ({rerank_info})") else: # 基础版SimpleMemOS search_limit = top_k * 2 if use_feedback_boost else top_k results = self.memory.search_memories(query, limit=search_limit) print(f"🔍 使用基础版检索到 {len(results)} 条相关记忆 (哈希向量)") # 应用反馈分数加权 if use_feedback_boost and results: results = self._apply_feedback_boost(results) print(f"📊 应用反馈分数加权,重新排序结果") # 应用时间感知排序 if use_time_decay and results: results = self._apply_time_decay_ranking(results, time_decay_tau) print(f"⏰ 应用时间感知排序,τ={time_decay_tau}天") # 记录操作时间 duration = time.time() - start_time if self.capacity_manager: self.capacity_manager.record_operation_time("memory_search", duration) # 返回top_k结果 final_results = results[:top_k] # 记录访问统计 if access_tracking_enabled and final_results: memory_ids = [str(result.get('id', '')) for result in final_results if result.get('id')] if memory_ids: track_batch_access(memory_ids, "search") print(f"🔍 检索完成,返回 {len(final_results)} 条结果 (耗时: {duration:.3f}s)") return final_results except Exception as e: print(f"❌ 记忆检索失败: {e}") return [] def _apply_feedback_boost(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """应用反馈分数加权到检索结果""" try: for result in results: metadata = result.get('metadata', {}) usage_score = metadata.get('usage_score', 0) # 获取原始相似度分数 original_score = result.get('score', 0.5) # 计算反馈加权:usage_score转换为0.8-1.2的乘数 # 正分数提升排序,负分数降低排序 feedback_multiplier = 1.0 + (usage_score * 0.1) # 每1分影响10% feedback_multiplier = max(0.5, min(2.0, feedback_multiplier)) # 限制在0.5-2.0之间 # 应用加权 boosted_score = original_score * feedback_multiplier result['score'] = boosted_score result['feedback_boost'] = feedback_multiplier # 添加调试信息 if usage_score != 0: print(f" 📈 记忆#{result.get('id')}: usage_score={usage_score}, " f"boost={feedback_multiplier:.2f}, score={original_score:.3f}→{boosted_score:.3f}") # 按新分数重新排序 results.sort(key=lambda x: x.get('score', 0), reverse=True) return results except Exception as e: print(f"❌ 反馈加权失败: {e}") return results def _apply_time_decay_ranking(self, results: List[Dict[str, Any]], tau_days: float = 30.0) -> List[Dict[str, Any]]: """应用时间衰减函数到检索结果,实现时间感知排序""" try: import math from datetime import datetime current_time = datetime.now() for result in results: metadata = result.get('metadata', {}) timestamp_str = metadata.get('timestamp') if timestamp_str: try: # 解析时间戳 if timestamp_str.endswith('Z'): # ISO格式带Z后缀 memory_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) elif '+' in timestamp_str or timestamp_str.count('-') > 2: # ISO格式带时区信息 memory_time = datetime.fromisoformat(timestamp_str) else: # 简单ISO格式 memory_time = datetime.fromisoformat(timestamp_str) # 计算时间差(天数) time_delta = (current_time - memory_time.replace(tzinfo=None)).total_seconds() / (24 * 3600) # 计算时间衰减因子:exp(-Δt/τ) time_decay_factor = math.exp(-time_delta / tau_days) # 获取当前分数(可能已经被反馈加权过) current_score = result.get('score', 0.5) # 应用时间衰减:rank = cos_sim × exp(-Δt/τ) time_aware_score = current_score * time_decay_factor result['score'] = time_aware_score result['time_decay_factor'] = time_decay_factor result['days_ago'] = time_delta # 添加调试信息 if time_delta < tau_days: # 只显示相对较新的记忆的调试信息 print(f" ⏰ 记忆#{result.get('id')}: {time_delta:.1f}天前, " f"衰减={time_decay_factor:.3f}, score={current_score:.3f}→{time_aware_score:.3f}") except (ValueError, TypeError) as e: # 时间戳解析失败,保持原分数 print(f" ⚠️ 记忆#{result.get('id')} 时间戳解析失败: {timestamp_str}, 错误: {e}") result['time_decay_factor'] = 1.0 result['days_ago'] = float('inf') else: # 没有时间戳,设置默认衰减因子 result['time_decay_factor'] = 0.1 # 给没有时间戳的记忆较低权重 result['days_ago'] = float('inf') print(f" ⚠️ 记忆#{result.get('id')} 缺少时间戳,使用默认衰减因子0.1") # 按时间感知分数重新排序 results.sort(key=lambda x: x.get('score', 0), reverse=True) return results except Exception as e: print(f"❌ 时间感知排序失败: {e}") return results def extract_from_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]: """从对话消息中提取记忆(暂时使用简单实现)""" try: # 暂时使用简单的提取逻辑,后续可以集成官方extract方法 extracted_memories = [] for message in messages: if message.get("role") == "user": content = message.get("content", "") # 简单的启发式规则:包含"我"、"喜欢"、"不喜欢"等个人信息的内容 if any(keyword in content for keyword in ["我", "喜欢", "不喜欢", "记住", "重要"]): memory_data = { "content": content, "metadata": { "type": "extracted", "source": "conversation", "timestamp": datetime.now().isoformat() } } extracted_memories.append(memory_data) print(f"🧠 从对话中提取到 {len(extracted_memories)} 条记忆") return extracted_memories except Exception as e: print(f"❌ 记忆提取失败: {e}") return [] def get_daily_memories(self, date: Optional[str] = None) -> List[Dict[str, Any]]: """获取指定日期的记忆""" try: # 获取所有记忆 all_memories = self.memory.get_all_memories() # 如果没有指定日期,使用今天 if not date: date = datetime.now().strftime("%Y-%m-%d") # 过滤指定日期的记忆 daily_memories = [] for memory in all_memories: # 检查metadata中的timestamp metadata = memory.get("metadata", {}) timestamp = metadata.get("timestamp", "") if timestamp.startswith(date): daily_memories.append(memory) print(f"📅 找到 {date} 的 {len(daily_memories)} 条记忆") return daily_memories except Exception as e: print(f"❌ 获取每日记忆失败: {e}") return [] def dump_memories(self, backup_dir: str) -> bool: """备份记忆到指定目录""" try: backup_path = Path(backup_dir) backup_path.mkdir(parents=True, exist_ok=True) # 获取所有记忆并保存为JSON all_memories = self.memory.get_all_memories() backup_file = backup_path / f"memories_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(backup_file, 'w', encoding='utf-8') as f: json.dump(all_memories, f, ensure_ascii=False, indent=2) print(f"💾 记忆已备份到: {backup_file}") return True except Exception as e: print(f"❌ 记忆备份失败: {e}") return False def provide_feedback(self, memory_id: str, feedback_type: str) -> bool: """为记忆提供👍/👎反馈,影响后续检索排序""" try: # 获取所有记忆 all_memories = self.memory.get_all_memories() # 查找目标记忆 target_memory = None for memory in all_memories: if str(memory.get('id', '')) == str(memory_id): target_memory = memory break if not target_memory: print(f"❌ 未找到记忆ID: {memory_id}") return False # 获取当前metadata metadata = target_memory.get('metadata', {}) current_score = metadata.get('usage_score', 0) # 根据反馈类型调整分数 if feedback_type == 'thumbs_up' or feedback_type == '👍': new_score = current_score + 1 feedback_emoji = '👍' print(f"👍 正面反馈: usage_score {current_score} → {new_score}") elif feedback_type == 'thumbs_down' or feedback_type == '👎': new_score = current_score - 1 feedback_emoji = '👎' print(f"👎 负面反馈: usage_score {current_score} → {new_score}") else: print(f"❌ 无效的反馈类型: {feedback_type}") return False # 更新metadata metadata['usage_score'] = new_score metadata['last_feedback'] = datetime.now().isoformat() metadata['feedback_count'] = metadata.get('feedback_count', 0) + 1 # 如果分数过低,标记为需要重写 if new_score <= -3: metadata['needs_rewrite'] = True print(f"⚠️ 记忆分数过低({new_score}),已标记为需要重写") # 更新记忆(这里需要调用底层的更新方法) # 由于SimpleMemOS可能没有直接的更新方法,我们通过重新添加来实现 content = target_memory.get('content', '') tags = target_memory.get('tags', []) # 删除旧记忆并添加新记忆(保持相同ID) try: # 尝试更新记忆 success = self._update_memory_metadata(memory_id, metadata) if success: print(f"✅ 反馈已保存 {feedback_emoji} 记忆#{memory_id}: {content[:30]}...") return True else: print(f"❌ 记忆更新失败") return False except Exception as e: print(f"❌ 反馈保存失败: {e}") return False except Exception as e: print(f"❌ 反馈处理失败: {e}") return False def _update_memory_metadata(self, memory_id: str, new_metadata: Dict[str, Any]) -> bool: """更新记忆的metadata(内部方法)""" try: # 转换memory_id为整数 memory_id_int = int(memory_id) # 检查是否是增强版MemOS if hasattr(self.memory, 'update_memory_metadata'): # 使用增强版的更新方法 return self.memory.update_memory_metadata(memory_id_int, new_metadata) elif hasattr(self.memory, 'vector_db'): # 直接使用Qdrant客户端更新(增强版使用vector_db) self.memory.vector_db.set_payload( collection_name=self.memory.collection_name, payload={"metadata": new_metadata}, points=[memory_id_int] ) print(f"✅ 记忆#{memory_id}的metadata已更新") return True elif hasattr(self.memory, 'qdrant_client'): # 直接使用Qdrant客户端更新(其他版本使用qdrant_client) self.memory.qdrant_client.set_payload( collection_name=self.memory.collection_name, payload={"metadata": new_metadata}, points=[memory_id_int] ) print(f"✅ 记忆#{memory_id}的metadata已更新") return True else: # 如果没有直接的更新方法,返回False print("⚠️ 当前存储后端不支持直接更新") return False except ValueError: print(f"❌ 无效的记忆ID格式: {memory_id}") return False except Exception as e: print(f"❌ metadata更新失败: {e}") return False def get_feedback_stats(self) -> Dict[str, Any]: """获取反馈统计信息""" try: all_memories = self.memory.get_all_memories() stats = { 'total_memories': len(all_memories), 'memories_with_feedback': 0, 'positive_feedback': 0, 'negative_feedback': 0, 'average_score': 0, 'needs_rewrite': 0 } total_score = 0 scored_memories = 0 for memory in all_memories: metadata = memory.get('metadata', {}) usage_score = metadata.get('usage_score') if usage_score is not None: stats['memories_with_feedback'] += 1 total_score += usage_score scored_memories += 1 if usage_score > 0: stats['positive_feedback'] += 1 elif usage_score < 0: stats['negative_feedback'] += 1 if metadata.get('needs_rewrite'): stats['needs_rewrite'] += 1 if scored_memories > 0: stats['average_score'] = round(total_score / scored_memories, 2) return stats except Exception as e: print(f"❌ 获取反馈统计失败: {e}") return {} def apply_score_decay(self, decay_factor: float = 0.98) -> bool: """应用分数衰减机制,防止记忆僵化""" try: all_memories = self.memory.get_all_memories() updated_count = 0 for memory in all_memories: memory_id = memory.get('id') metadata = memory.get('metadata', {}) usage_score = metadata.get('usage_score') if usage_score is not None and usage_score != 0: # 应用衰减 new_score = usage_score * decay_factor # 如果分数接近0,直接设为0 if abs(new_score) < 0.1: new_score = 0 metadata['usage_score'] = round(new_score, 2) # 更新记忆 if self._update_memory_metadata(str(memory_id), metadata): updated_count += 1 print(f"✅ 分数衰减完成,更新了 {updated_count} 条记忆") return True except Exception as e: print(f"❌ 分数衰减失败: {e}") return False def get_capacity_report(self) -> Dict[str, Any]: """获取详细的容量管理报告""" if hasattr(self, 'capacity_manager') and self.capacity_manager: return self.capacity_manager.get_capacity_report() else: return {"error": "容量管理器未启用"} def get_performance_metrics(self) -> Dict[str, Any]: """获取性能指标""" if hasattr(self, 'capacity_manager') and self.capacity_manager: metrics = self.capacity_manager.get_performance_metrics() return { "timestamp": metrics.timestamp, "memory_add_avg_time": metrics.memory_add_avg_time, "memory_search_avg_time": metrics.memory_search_avg_time, "memory_total_count": metrics.memory_total_count, "system_health": metrics.system_health, "cpu_usage": metrics.cpu_usage, "memory_usage_mb": metrics.memory_usage_mb, "disk_usage_mb": metrics.disk_usage_mb } else: return {"error": "容量管理器未启用"} def trigger_manual_optimization(self) -> bool: """手动触发系统优化""" if hasattr(self, 'capacity_manager') and self.capacity_manager: try: # 获取当前状态 metrics = self.capacity_manager.get_performance_metrics() # 检查是否需要优化 optimizations_applied = [] # 检查内存使用情况 for memory_type, stats in metrics.memory_usage_by_type.items(): if stats.is_critical(): print(f"🗜️ 触发{memory_type}内存优化") optimizations_applied.append(f"{memory_type}_compression") # 检查性能指标 if metrics.memory_add_avg_time > 1.0: print(f"⚡ 优化记忆添加性能") optimizations_applied.append("add_performance") if metrics.memory_search_avg_time > 0.5: print(f"⚡ 优化记忆检索性能") optimizations_applied.append("search_performance") if optimizations_applied: print(f"✅ 应用了 {len(optimizations_applied)} 项优化: {', '.join(optimizations_applied)}") return True else: print(f"ℹ️ 系统运行良好,无需优化") return True except Exception as e: print(f"❌ 手动优化失败: {e}") return False else: print(f"❌ 容量管理器未启用") return False def trigger_compression(self, max_memories: int = 50, test_mode: bool = False) -> Dict[str, Any]: """手动触发记忆压缩""" if hasattr(self, 'compression_pipeline') and self.compression_pipeline: try: result = self.compression_pipeline.run_compression_batch( max_memories=max_memories, test_mode=test_mode ) return result except Exception as e: print(f"❌ 压缩失败: {e}") return {"error": str(e)} else: print(f"❌ 压缩管线未启用") return {"error": "压缩管线未启用"} def get_compression_stats(self) -> Dict[str, Any]: """获取压缩统计信息""" if hasattr(self, 'compression_pipeline') and self.compression_pipeline: return self.compression_pipeline.get_compression_stats() else: return {"error": "压缩管线未启用"} def cleanup_old_archives(self, days: int = None) -> bool: """清理旧的归档文件""" if hasattr(self, 'compression_pipeline') and self.compression_pipeline: try: self.compression_pipeline.cleanup_old_archives(days) return True except Exception as e: print(f"❌ 清理归档失败: {e}") return False else: print(f"❌ 压缩管线未启用") return False def create_mvp_memory_manager(data_dir: str = "./memos_data", use_official_config: bool = True) -> MVPMemoryManager: """创建MVP记忆管理器的工厂函数""" if use_official_config: # 尝试使用官方最佳实践配置 try: from official_config_manager import create_official_config_manager config_manager = create_official_config_manager(data_dir) config_manager.save_config() print("✅ 应用官方最佳实践配置") except Exception as e: print(f"⚠️ 官方配置应用失败,使用默认配置: {e}") return MVPMemoryManager(data_dir) if __name__ == "__main__": # 测试MVP记忆管理器 print("🚀 测试MVP记忆管理器") print("=" * 50) try: # 创建管理器 manager = create_mvp_memory_manager() # 测试连接 if manager.test_connection(): print("✅ MVP记忆管理器测试通过") else: print("❌ MVP记忆管理器测试失败") except Exception as e: print(f"❌ 测试过程中发生错误: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server