Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
advanced_modules_check.py17.9 kB
#!/usr/bin/env python3 """ MemOS高级功能模块状态检查脚本 检查智能容量管理器、自动摘要压缩管线、反馈机制、时间感知排序和主题漂移检测器的工作状态 """ import json import time from datetime import datetime from pathlib import Path from typing import Dict, Any, List def check_capacity_manager(manager) -> Dict[str, Any]: """检查智能容量管理器状态""" print("\n🔍 检查智能容量管理器...") try: if not hasattr(manager, 'capacity_manager') or not manager.capacity_manager: return { "enabled": False, "status": "未启用", "error": "容量管理器未初始化" } # 获取容量报告 capacity_report = manager.capacity_manager.get_capacity_report() # 获取性能指标 performance_metrics = manager.capacity_manager.get_performance_metrics() # 检查监控状态 monitoring_status = "active" if manager.capacity_manager.is_monitoring else "inactive" result = { "enabled": True, "status": "正常运行", "monitoring_status": monitoring_status, "system_health": performance_metrics.system_health, "capacity_config": { "working_memory": manager.capacity_manager.config.working_memory_capacity, "user_memory": manager.capacity_manager.config.user_memory_capacity, "long_term_memory": manager.capacity_manager.config.long_term_memory_capacity, "warning_threshold": manager.capacity_manager.config.warning_threshold, "critical_threshold": manager.capacity_manager.config.critical_threshold }, "current_metrics": { "total_memories": performance_metrics.memory_total_count, "add_avg_time": f"{performance_metrics.memory_add_avg_time:.3f}s", "search_avg_time": f"{performance_metrics.memory_search_avg_time:.3f}s", "cpu_usage": f"{performance_metrics.cpu_usage:.1f}%", "memory_usage": f"{performance_metrics.memory_usage_mb:.1f}MB", "disk_usage": f"{performance_metrics.disk_usage_mb:.1f}MB" }, "memory_stats": capacity_report.get("memory_stats", {}), "performance_history_count": capacity_report.get("performance_history_count", 0) } print(f" ✅ 容量管理器状态: {result['status']}") print(f" 📊 监控状态: {monitoring_status}") print(f" 🏥 系统健康: {performance_metrics.system_health}") print(f" 📈 总记忆数: {performance_metrics.memory_total_count}") print(f" ⏱️ 平均添加时间: {performance_metrics.memory_add_avg_time:.3f}s") print(f" 🔍 平均检索时间: {performance_metrics.memory_search_avg_time:.3f}s") return result except Exception as e: return { "enabled": False, "status": "检查失败", "error": str(e) } def check_compression_pipeline(manager) -> Dict[str, Any]: """检查自动摘要压缩管线状态""" print("\n🗜️ 检查自动摘要压缩管线...") try: if not hasattr(manager, 'compression_pipeline') or not manager.compression_pipeline: return { "enabled": False, "status": "未启用", "error": "压缩管线未初始化" } # 获取压缩统计信息 compression_stats = manager.compression_pipeline.get_compression_stats() # 检查调度器状态 scheduler_status = "running" if manager.compression_pipeline.is_running else "stopped" # 检查配置 config = manager.compression_pipeline.config result = { "enabled": True, "status": "正常运行", "scheduler_status": scheduler_status, "auto_schedule": config.enable_auto_schedule, "schedule_time": config.schedule_time, "config": { "token_threshold": config.token_threshold, "compression_ratio": config.compression_ratio, "min_content_length": config.min_content_length, "max_content_length": config.max_content_length, "keep_original_days": config.keep_original_days }, "statistics": { "total_compressed": compression_stats.get("total_compressed", 0), "total_tokens_saved": compression_stats.get("total_tokens_saved", 0), "last_compression_time": compression_stats.get("last_compression_time"), "compression_history_count": len(compression_stats.get("compression_history", [])) }, "archive_dir": str(manager.compression_pipeline.archive_dir) } print(f" ✅ 压缩管线状态: {result['status']}") print(f" 📅 调度器状态: {scheduler_status}") print(f" 🔄 自动调度: {'启用' if config.enable_auto_schedule else '禁用'}") print(f" 🗜️ 已压缩记忆: {compression_stats.get('total_compressed', 0)} 条") print(f" 💾 节省tokens: {compression_stats.get('total_tokens_saved', 0)}") print(f" 📂 归档目录: {manager.compression_pipeline.archive_dir}") return result except Exception as e: return { "enabled": False, "status": "检查失败", "error": str(e) } def check_feedback_mechanism(manager) -> Dict[str, Any]: """检查反馈机制状态""" print("\n👍 检查反馈机制...") try: # 测试反馈功能 test_memory_id = None # 先添加一个测试记忆 test_content = f"反馈机制测试记忆 - {time.time()}" success = manager.remember(test_content, tags=["反馈测试"]) if success: # 获取刚添加的记忆ID recent_memories = manager.recall("反馈机制测试", top_k=1) if recent_memories: test_memory_id = recent_memories[0].get('id') # 获取反馈统计 feedback_stats = manager.get_feedback_stats() result = { "enabled": True, "status": "正常运行", "test_memory_created": success, "test_memory_id": test_memory_id, "feedback_stats": feedback_stats, "features": { "thumbs_up_down": True, "usage_score_tracking": True, "feedback_boost_ranking": True, "score_decay": True, "needs_rewrite_marking": True } } # 如果有测试记忆,测试反馈功能 if test_memory_id: # 测试正面反馈 feedback_success = manager.provide_feedback(str(test_memory_id), 'thumbs_up') result["test_feedback_success"] = feedback_success if feedback_success: print(f" ✅ 反馈功能测试成功 (记忆#{test_memory_id})") else: print(f" ⚠️ 反馈功能测试失败") print(f" ✅ 反馈机制状态: {result['status']}") print(f" 📊 总记忆数: {feedback_stats.get('total_memories', 0)}") print(f" 👍 有反馈记忆: {feedback_stats.get('memories_with_feedback', 0)}") print(f" 📈 正面反馈: {feedback_stats.get('positive_feedback', 0)}") print(f" 📉 负面反馈: {feedback_stats.get('negative_feedback', 0)}") print(f" ⭐ 平均分数: {feedback_stats.get('average_score', 0)}") return result except Exception as e: return { "enabled": False, "status": "检查失败", "error": str(e) } def check_time_aware_ranking(manager) -> Dict[str, Any]: """检查时间感知排序功能""" print("\n⏰ 检查时间感知排序功能...") try: # 测试时间感知排序 test_query = "时间感知测试" # 先进行普通检索 normal_results = manager.recall(test_query, top_k=3, use_time_decay=False) # 再进行时间感知检索 time_aware_results = manager.recall(test_query, top_k=3, use_time_decay=True, time_decay_tau=30.0) result = { "enabled": True, "status": "正常运行", "test_query": test_query, "normal_results_count": len(normal_results), "time_aware_results_count": len(time_aware_results), "time_decay_tau": 30.0, "features": { "time_decay_formula": "exp(-Δt/τ)", "configurable_tau": True, "automatic_timestamp_parsing": True, "score_adjustment": True } } # 分析时间感知效果 if time_aware_results: time_aware_info = [] for i, result_item in enumerate(time_aware_results): info = { "rank": i + 1, "id": result_item.get('id'), "score": result_item.get('score', 0), "time_decay_factor": result_item.get('time_decay_factor', 1.0), "days_ago": result_item.get('days_ago', 0) } time_aware_info.append(info) result["time_aware_analysis"] = time_aware_info print(f" ✅ 时间感知排序状态: {result['status']}") print(f" 🔍 测试查询: {test_query}") print(f" 📊 普通检索结果: {len(normal_results)} 条") print(f" ⏰ 时间感知结果: {len(time_aware_results)} 条") print(f" 📈 时间衰减参数τ: {result['time_decay_tau']} 天") # 显示前3个结果的时间感知信息 for info in time_aware_info[:3]: print(f" #{info['rank']}: 记忆#{info['id']}, " f"分数={info['score']:.3f}, " f"衰减={info['time_decay_factor']:.3f}, " f"{info['days_ago']:.1f}天前") else: print(f" ⚠️ 时间感知排序测试:无检索结果") return result except Exception as e: return { "enabled": False, "status": "检查失败", "error": str(e) } def check_topic_drift_detector(manager) -> Dict[str, Any]: """检查主题漂移检测器状态""" print("\n🔄 检查主题漂移检测器...") try: if not hasattr(manager, 'topic_drift_detector') or not manager.topic_drift_detector: return { "enabled": False, "status": "未启用", "error": "主题漂移检测器未初始化" } detector = manager.topic_drift_detector # 获取检测器配置 config = { "window_size": detector.window_size, "drift_threshold": detector.drift_threshold, "min_similarity": detector.min_similarity } # 获取统计信息 stats = detector.get_statistics() # 测试主题漂移检测 test_queries = [ "Python编程基础", "Python数据结构", "机器学习算法", # 主题开始漂移 "今天天气如何" # 完全不相关 ] drift_test_results = [] for query in test_queries: should_clear, reason = detector.should_clear_candidates(query) drift_test_results.append({ "query": query, "should_clear": should_clear, "reason": reason }) result = { "enabled": True, "status": "正常运行", "config": config, "statistics": stats, "test_results": drift_test_results, "features": { "sliding_window": True, "tf_idf_analysis": True, "cosine_similarity": True, "automatic_candidate_clearing": True, "configurable_thresholds": True } } print(f" ✅ 主题漂移检测器状态: {result['status']}") print(f" 📊 窗口大小: {config['window_size']}") print(f" 🎯 漂移阈值: {config['drift_threshold']}") print(f" 📉 最小相似度: {config['min_similarity']}") print(f" 📈 总查询数: {stats['total_queries']}") print(f" 🔄 漂移次数: {stats['drift_count']}") print(f" 📊 漂移率: {stats['drift_rate']}") # 显示测试结果 print(f" 🧪 漂移检测测试:") for test in drift_test_results: status = "🚨 清空候选集" if test['should_clear'] else "✅ 保持候选集" print(f" {status}: {test['query']} - {test['reason']}") return result except Exception as e: return { "enabled": False, "status": "检查失败", "error": str(e) } def generate_advanced_modules_report(manager) -> Dict[str, Any]: """生成高级功能模块状态报告""" print("🚀 开始MemOS高级功能模块状态检查") print("=" * 60) start_time = time.time() # 执行各项检查 capacity_result = check_capacity_manager(manager) compression_result = check_compression_pipeline(manager) feedback_result = check_feedback_mechanism(manager) time_aware_result = check_time_aware_ranking(manager) topic_drift_result = check_topic_drift_detector(manager) # 生成综合报告 report = { "timestamp": datetime.now().isoformat(), "check_duration": f"{time.time() - start_time:.2f}s", "overall_status": "healthy", "modules": { "capacity_manager": capacity_result, "compression_pipeline": compression_result, "feedback_mechanism": feedback_result, "time_aware_ranking": time_aware_result, "topic_drift_detector": topic_drift_result }, "summary": { "total_modules": 5, "enabled_modules": 0, "healthy_modules": 0, "failed_modules": 0 } } # 统计模块状态 for module_name, module_result in report["modules"].items(): if module_result.get("enabled", False): report["summary"]["enabled_modules"] += 1 if module_result.get("status") == "正常运行": report["summary"]["healthy_modules"] += 1 else: report["summary"]["failed_modules"] += 1 # 确定整体状态 if report["summary"]["failed_modules"] > 2: report["overall_status"] = "critical" elif report["summary"]["failed_modules"] > 0: report["overall_status"] = "warning" else: report["overall_status"] = "healthy" return report def print_summary_report(report: Dict[str, Any]): """打印摘要报告""" print("\n" + "=" * 60) print("📋 MemOS高级功能模块状态检查报告") print("=" * 60) print(f"🕐 检查时间: {report['timestamp']}") print(f"⏱️ 检查耗时: {report['check_duration']}") print(f"🏥 整体状态: {report['overall_status'].upper()}") summary = report['summary'] print(f"\n📊 模块统计:") print(f" 总模块数: {summary['total_modules']}") print(f" 已启用: {summary['enabled_modules']}") print(f" 运行正常: {summary['healthy_modules']}") print(f" 检查失败: {summary['failed_modules']}") print(f"\n🔍 各模块状态:") for module_name, module_result in report['modules'].items(): status_icon = "✅" if module_result.get('enabled') and module_result.get('status') == '正常运行' else "❌" status_text = module_result.get('status', '未知') print(f" {status_icon} {module_name}: {status_text}") if not module_result.get('enabled') and 'error' in module_result: print(f" 错误: {module_result['error']}") def save_detailed_report(report: Dict[str, Any], output_file: str = None): """保存详细报告到文件""" if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"memos_data/advanced_modules_report_{timestamp}.json" try: output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"\n💾 详细报告已保存: {output_path}") return str(output_path) except Exception as e: print(f"❌ 保存报告失败: {e}") return None def main(): """主函数""" try: # 导入MVP记忆管理器 from mvp_memory import create_mvp_memory_manager # 创建管理器实例 print("🔧 初始化MVP记忆管理器...") manager = create_mvp_memory_manager() # 生成检查报告 report = generate_advanced_modules_report(manager) # 打印摘要报告 print_summary_report(report) # 保存详细报告 report_file = save_detailed_report(report) print(f"\n✅ MemOS高级功能模块状态检查完成!") return report except Exception as e: print(f"❌ 检查过程中发生错误: {e}") import traceback traceback.print_exc() return None if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server