advanced_modules_check.py•17.9 kB
#!/usr/bin/env python3
"""
MemOS高级功能模块状态检查脚本
检查智能容量管理器、自动摘要压缩管线、反馈机制、时间感知排序和主题漂移检测器的工作状态
"""
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List
def check_capacity_manager(manager) -> Dict[str, Any]:
"""检查智能容量管理器状态"""
print("\n🔍 检查智能容量管理器...")
try:
if not hasattr(manager, 'capacity_manager') or not manager.capacity_manager:
return {
"enabled": False,
"status": "未启用",
"error": "容量管理器未初始化"
}
# 获取容量报告
capacity_report = manager.capacity_manager.get_capacity_report()
# 获取性能指标
performance_metrics = manager.capacity_manager.get_performance_metrics()
# 检查监控状态
monitoring_status = "active" if manager.capacity_manager.is_monitoring else "inactive"
result = {
"enabled": True,
"status": "正常运行",
"monitoring_status": monitoring_status,
"system_health": performance_metrics.system_health,
"capacity_config": {
"working_memory": manager.capacity_manager.config.working_memory_capacity,
"user_memory": manager.capacity_manager.config.user_memory_capacity,
"long_term_memory": manager.capacity_manager.config.long_term_memory_capacity,
"warning_threshold": manager.capacity_manager.config.warning_threshold,
"critical_threshold": manager.capacity_manager.config.critical_threshold
},
"current_metrics": {
"total_memories": performance_metrics.memory_total_count,
"add_avg_time": f"{performance_metrics.memory_add_avg_time:.3f}s",
"search_avg_time": f"{performance_metrics.memory_search_avg_time:.3f}s",
"cpu_usage": f"{performance_metrics.cpu_usage:.1f}%",
"memory_usage": f"{performance_metrics.memory_usage_mb:.1f}MB",
"disk_usage": f"{performance_metrics.disk_usage_mb:.1f}MB"
},
"memory_stats": capacity_report.get("memory_stats", {}),
"performance_history_count": capacity_report.get("performance_history_count", 0)
}
print(f" ✅ 容量管理器状态: {result['status']}")
print(f" 📊 监控状态: {monitoring_status}")
print(f" 🏥 系统健康: {performance_metrics.system_health}")
print(f" 📈 总记忆数: {performance_metrics.memory_total_count}")
print(f" ⏱️ 平均添加时间: {performance_metrics.memory_add_avg_time:.3f}s")
print(f" 🔍 平均检索时间: {performance_metrics.memory_search_avg_time:.3f}s")
return result
except Exception as e:
return {
"enabled": False,
"status": "检查失败",
"error": str(e)
}
def check_compression_pipeline(manager) -> Dict[str, Any]:
"""检查自动摘要压缩管线状态"""
print("\n🗜️ 检查自动摘要压缩管线...")
try:
if not hasattr(manager, 'compression_pipeline') or not manager.compression_pipeline:
return {
"enabled": False,
"status": "未启用",
"error": "压缩管线未初始化"
}
# 获取压缩统计信息
compression_stats = manager.compression_pipeline.get_compression_stats()
# 检查调度器状态
scheduler_status = "running" if manager.compression_pipeline.is_running else "stopped"
# 检查配置
config = manager.compression_pipeline.config
result = {
"enabled": True,
"status": "正常运行",
"scheduler_status": scheduler_status,
"auto_schedule": config.enable_auto_schedule,
"schedule_time": config.schedule_time,
"config": {
"token_threshold": config.token_threshold,
"compression_ratio": config.compression_ratio,
"min_content_length": config.min_content_length,
"max_content_length": config.max_content_length,
"keep_original_days": config.keep_original_days
},
"statistics": {
"total_compressed": compression_stats.get("total_compressed", 0),
"total_tokens_saved": compression_stats.get("total_tokens_saved", 0),
"last_compression_time": compression_stats.get("last_compression_time"),
"compression_history_count": len(compression_stats.get("compression_history", []))
},
"archive_dir": str(manager.compression_pipeline.archive_dir)
}
print(f" ✅ 压缩管线状态: {result['status']}")
print(f" 📅 调度器状态: {scheduler_status}")
print(f" 🔄 自动调度: {'启用' if config.enable_auto_schedule else '禁用'}")
print(f" 🗜️ 已压缩记忆: {compression_stats.get('total_compressed', 0)} 条")
print(f" 💾 节省tokens: {compression_stats.get('total_tokens_saved', 0)}")
print(f" 📂 归档目录: {manager.compression_pipeline.archive_dir}")
return result
except Exception as e:
return {
"enabled": False,
"status": "检查失败",
"error": str(e)
}
def check_feedback_mechanism(manager) -> Dict[str, Any]:
"""检查反馈机制状态"""
print("\n👍 检查反馈机制...")
try:
# 测试反馈功能
test_memory_id = None
# 先添加一个测试记忆
test_content = f"反馈机制测试记忆 - {time.time()}"
success = manager.remember(test_content, tags=["反馈测试"])
if success:
# 获取刚添加的记忆ID
recent_memories = manager.recall("反馈机制测试", top_k=1)
if recent_memories:
test_memory_id = recent_memories[0].get('id')
# 获取反馈统计
feedback_stats = manager.get_feedback_stats()
result = {
"enabled": True,
"status": "正常运行",
"test_memory_created": success,
"test_memory_id": test_memory_id,
"feedback_stats": feedback_stats,
"features": {
"thumbs_up_down": True,
"usage_score_tracking": True,
"feedback_boost_ranking": True,
"score_decay": True,
"needs_rewrite_marking": True
}
}
# 如果有测试记忆,测试反馈功能
if test_memory_id:
# 测试正面反馈
feedback_success = manager.provide_feedback(str(test_memory_id), 'thumbs_up')
result["test_feedback_success"] = feedback_success
if feedback_success:
print(f" ✅ 反馈功能测试成功 (记忆#{test_memory_id})")
else:
print(f" ⚠️ 反馈功能测试失败")
print(f" ✅ 反馈机制状态: {result['status']}")
print(f" 📊 总记忆数: {feedback_stats.get('total_memories', 0)}")
print(f" 👍 有反馈记忆: {feedback_stats.get('memories_with_feedback', 0)}")
print(f" 📈 正面反馈: {feedback_stats.get('positive_feedback', 0)}")
print(f" 📉 负面反馈: {feedback_stats.get('negative_feedback', 0)}")
print(f" ⭐ 平均分数: {feedback_stats.get('average_score', 0)}")
return result
except Exception as e:
return {
"enabled": False,
"status": "检查失败",
"error": str(e)
}
def check_time_aware_ranking(manager) -> Dict[str, Any]:
"""检查时间感知排序功能"""
print("\n⏰ 检查时间感知排序功能...")
try:
# 测试时间感知排序
test_query = "时间感知测试"
# 先进行普通检索
normal_results = manager.recall(test_query, top_k=3, use_time_decay=False)
# 再进行时间感知检索
time_aware_results = manager.recall(test_query, top_k=3, use_time_decay=True, time_decay_tau=30.0)
result = {
"enabled": True,
"status": "正常运行",
"test_query": test_query,
"normal_results_count": len(normal_results),
"time_aware_results_count": len(time_aware_results),
"time_decay_tau": 30.0,
"features": {
"time_decay_formula": "exp(-Δt/τ)",
"configurable_tau": True,
"automatic_timestamp_parsing": True,
"score_adjustment": True
}
}
# 分析时间感知效果
if time_aware_results:
time_aware_info = []
for i, result_item in enumerate(time_aware_results):
info = {
"rank": i + 1,
"id": result_item.get('id'),
"score": result_item.get('score', 0),
"time_decay_factor": result_item.get('time_decay_factor', 1.0),
"days_ago": result_item.get('days_ago', 0)
}
time_aware_info.append(info)
result["time_aware_analysis"] = time_aware_info
print(f" ✅ 时间感知排序状态: {result['status']}")
print(f" 🔍 测试查询: {test_query}")
print(f" 📊 普通检索结果: {len(normal_results)} 条")
print(f" ⏰ 时间感知结果: {len(time_aware_results)} 条")
print(f" 📈 时间衰减参数τ: {result['time_decay_tau']} 天")
# 显示前3个结果的时间感知信息
for info in time_aware_info[:3]:
print(f" #{info['rank']}: 记忆#{info['id']}, "
f"分数={info['score']:.3f}, "
f"衰减={info['time_decay_factor']:.3f}, "
f"{info['days_ago']:.1f}天前")
else:
print(f" ⚠️ 时间感知排序测试:无检索结果")
return result
except Exception as e:
return {
"enabled": False,
"status": "检查失败",
"error": str(e)
}
def check_topic_drift_detector(manager) -> Dict[str, Any]:
"""检查主题漂移检测器状态"""
print("\n🔄 检查主题漂移检测器...")
try:
if not hasattr(manager, 'topic_drift_detector') or not manager.topic_drift_detector:
return {
"enabled": False,
"status": "未启用",
"error": "主题漂移检测器未初始化"
}
detector = manager.topic_drift_detector
# 获取检测器配置
config = {
"window_size": detector.window_size,
"drift_threshold": detector.drift_threshold,
"min_similarity": detector.min_similarity
}
# 获取统计信息
stats = detector.get_statistics()
# 测试主题漂移检测
test_queries = [
"Python编程基础",
"Python数据结构",
"机器学习算法", # 主题开始漂移
"今天天气如何" # 完全不相关
]
drift_test_results = []
for query in test_queries:
should_clear, reason = detector.should_clear_candidates(query)
drift_test_results.append({
"query": query,
"should_clear": should_clear,
"reason": reason
})
result = {
"enabled": True,
"status": "正常运行",
"config": config,
"statistics": stats,
"test_results": drift_test_results,
"features": {
"sliding_window": True,
"tf_idf_analysis": True,
"cosine_similarity": True,
"automatic_candidate_clearing": True,
"configurable_thresholds": True
}
}
print(f" ✅ 主题漂移检测器状态: {result['status']}")
print(f" 📊 窗口大小: {config['window_size']}")
print(f" 🎯 漂移阈值: {config['drift_threshold']}")
print(f" 📉 最小相似度: {config['min_similarity']}")
print(f" 📈 总查询数: {stats['total_queries']}")
print(f" 🔄 漂移次数: {stats['drift_count']}")
print(f" 📊 漂移率: {stats['drift_rate']}")
# 显示测试结果
print(f" 🧪 漂移检测测试:")
for test in drift_test_results:
status = "🚨 清空候选集" if test['should_clear'] else "✅ 保持候选集"
print(f" {status}: {test['query']} - {test['reason']}")
return result
except Exception as e:
return {
"enabled": False,
"status": "检查失败",
"error": str(e)
}
def generate_advanced_modules_report(manager) -> Dict[str, Any]:
"""生成高级功能模块状态报告"""
print("🚀 开始MemOS高级功能模块状态检查")
print("=" * 60)
start_time = time.time()
# 执行各项检查
capacity_result = check_capacity_manager(manager)
compression_result = check_compression_pipeline(manager)
feedback_result = check_feedback_mechanism(manager)
time_aware_result = check_time_aware_ranking(manager)
topic_drift_result = check_topic_drift_detector(manager)
# 生成综合报告
report = {
"timestamp": datetime.now().isoformat(),
"check_duration": f"{time.time() - start_time:.2f}s",
"overall_status": "healthy",
"modules": {
"capacity_manager": capacity_result,
"compression_pipeline": compression_result,
"feedback_mechanism": feedback_result,
"time_aware_ranking": time_aware_result,
"topic_drift_detector": topic_drift_result
},
"summary": {
"total_modules": 5,
"enabled_modules": 0,
"healthy_modules": 0,
"failed_modules": 0
}
}
# 统计模块状态
for module_name, module_result in report["modules"].items():
if module_result.get("enabled", False):
report["summary"]["enabled_modules"] += 1
if module_result.get("status") == "正常运行":
report["summary"]["healthy_modules"] += 1
else:
report["summary"]["failed_modules"] += 1
# 确定整体状态
if report["summary"]["failed_modules"] > 2:
report["overall_status"] = "critical"
elif report["summary"]["failed_modules"] > 0:
report["overall_status"] = "warning"
else:
report["overall_status"] = "healthy"
return report
def print_summary_report(report: Dict[str, Any]):
"""打印摘要报告"""
print("\n" + "=" * 60)
print("📋 MemOS高级功能模块状态检查报告")
print("=" * 60)
print(f"🕐 检查时间: {report['timestamp']}")
print(f"⏱️ 检查耗时: {report['check_duration']}")
print(f"🏥 整体状态: {report['overall_status'].upper()}")
summary = report['summary']
print(f"\n📊 模块统计:")
print(f" 总模块数: {summary['total_modules']}")
print(f" 已启用: {summary['enabled_modules']}")
print(f" 运行正常: {summary['healthy_modules']}")
print(f" 检查失败: {summary['failed_modules']}")
print(f"\n🔍 各模块状态:")
for module_name, module_result in report['modules'].items():
status_icon = "✅" if module_result.get('enabled') and module_result.get('status') == '正常运行' else "❌"
status_text = module_result.get('status', '未知')
print(f" {status_icon} {module_name}: {status_text}")
if not module_result.get('enabled') and 'error' in module_result:
print(f" 错误: {module_result['error']}")
def save_detailed_report(report: Dict[str, Any], output_file: str = None):
"""保存详细报告到文件"""
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"memos_data/advanced_modules_report_{timestamp}.json"
try:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n💾 详细报告已保存: {output_path}")
return str(output_path)
except Exception as e:
print(f"❌ 保存报告失败: {e}")
return None
def main():
"""主函数"""
try:
# 导入MVP记忆管理器
from mvp_memory import create_mvp_memory_manager
# 创建管理器实例
print("🔧 初始化MVP记忆管理器...")
manager = create_mvp_memory_manager()
# 生成检查报告
report = generate_advanced_modules_report(manager)
# 打印摘要报告
print_summary_report(report)
# 保存详细报告
report_file = save_detailed_report(report)
print(f"\n✅ MemOS高级功能模块状态检查完成!")
return report
except Exception as e:
print(f"❌ 检查过程中发生错误: {e}")
import traceback
traceback.print_exc()
return None
if __name__ == "__main__":
main()