locomo_evaluation.py•18.6 kB
#!/usr/bin/env python3
"""
LoCoMo本地评测脚本
基于LoCoMo评测框架,为MemOS提供本地记忆系统评测功能
支持多种评测指标:准确率、ROUGE分数、BERTScore等
"""
import os
import sys
import json
import time
import random
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).parent))
try:
from memory_ops_sdk import MemoryOps
from mvp_memory import MVPMemoryManager
except ImportError as e:
print(f"❌ 无法导入MemOS模块: {e}")
sys.exit(1)
class LoCoMoEvaluator:
"""LoCoMo本地评测器"""
def __init__(self, data_dir: str = "./locomo_eval_data",
use_enhanced: bool = True, verbose: bool = True):
"""
初始化评测器
Args:
data_dir: 评测数据目录
use_enhanced: 是否使用增强版MemOS
verbose: 是否显示详细日志
"""
self.data_dir = data_dir
self.use_enhanced = use_enhanced
self.verbose = verbose
# 初始化MemOS实例
self.memory_ops = MemoryOps(
data_dir=data_dir,
use_enhanced=use_enhanced,
verbose=verbose
)
# 评测指标
self.metrics = {
'accuracy': [],
'rouge_1': [],
'rouge_2': [],
'rouge_l': [],
'bert_score_f1': [],
'response_time': [],
'recall_at_k': []
}
print(f"🧪 LoCoMo评测器初始化完成")
print(f" 数据目录: {data_dir}")
print(f" 增强模式: {use_enhanced}")
def create_test_dataset(self, size: int = 50) -> List[Dict[str, Any]]:
"""
创建测试数据集
Args:
size: 数据集大小
Returns:
List[Dict]: 测试数据集
"""
print(f"📝 创建测试数据集 (大小: {size})")
# 基础知识库
knowledge_base = [
{
"topic": "Python编程",
"facts": [
"Python是一种高级编程语言,由Guido van Rossum创建",
"Python支持面向对象、函数式和过程式编程范式",
"Python的语法简洁明了,易于学习和使用",
"Python有丰富的标准库和第三方包生态系统",
"Python广泛应用于数据科学、机器学习、Web开发等领域"
]
},
{
"topic": "机器学习",
"facts": [
"机器学习是人工智能的一个重要分支",
"监督学习需要标注数据来训练模型",
"无监督学习从未标注数据中发现模式",
"深度学习使用神经网络进行复杂模式识别",
"特征工程是机器学习项目的关键步骤"
]
},
{
"topic": "数据结构",
"facts": [
"数组是最基本的数据结构之一",
"链表允许动态内存分配",
"栈遵循后进先出(LIFO)原则",
"队列遵循先进先出(FIFO)原则",
"树是层次化的数据结构"
]
},
{
"topic": "算法",
"facts": [
"排序算法用于将数据按特定顺序排列",
"搜索算法用于在数据结构中查找特定元素",
"动态规划通过分解子问题来解决复杂问题",
"贪心算法在每步选择局部最优解",
"分治算法将问题分解为更小的子问题"
]
},
{
"topic": "数据库",
"facts": [
"关系数据库使用表格存储数据",
"SQL是结构化查询语言",
"索引可以提高查询性能",
"事务确保数据的一致性和完整性",
"NoSQL数据库适合处理非结构化数据"
]
}
]
dataset = []
for i in range(size):
# 随机选择主题
topic_data = random.choice(knowledge_base)
topic = topic_data["topic"]
facts = topic_data["facts"]
# 随机选择一个事实作为正确答案
correct_fact = random.choice(facts)
# 生成问题
question_templates = [
f"什么是{topic}?",
f"请介绍{topic}的特点",
f"关于{topic},你知道什么?",
f"{topic}有什么重要特征?",
f"能解释一下{topic}吗?"
]
question = random.choice(question_templates)
# 创建测试样本
sample = {
"id": f"test_{i:03d}",
"topic": topic,
"question": question,
"correct_answer": correct_fact,
"context_facts": facts,
"timestamp": datetime.now().isoformat()
}
dataset.append(sample)
print(f"✅ 测试数据集创建完成,共 {len(dataset)} 个样本")
return dataset
def populate_memory(self, dataset: List[Dict[str, Any]]) -> bool:
"""
向记忆系统中填充知识
Args:
dataset: 测试数据集
Returns:
bool: 是否成功
"""
print("📚 向记忆系统填充知识...")
try:
# 收集所有事实
all_facts = set()
for sample in dataset:
for fact in sample["context_facts"]:
all_facts.add((fact, sample["topic"]))
# 添加到记忆系统
success_count = 0
for fact, topic in all_facts:
success = self.memory_ops.add(
fact,
tags=[topic, "知识库"],
metadata={
"topic": topic,
"type": "fact",
"source": "locomo_evaluation"
}
)
if success:
success_count += 1
print(f"✅ 成功添加 {success_count}/{len(all_facts)} 条知识")
return success_count > 0
except Exception as e:
print(f"❌ 填充知识失败: {e}")
return False
def evaluate_sample(self, sample: Dict[str, Any]) -> Dict[str, Any]:
"""
评测单个样本
Args:
sample: 测试样本
Returns:
Dict: 评测结果
"""
question = sample["question"]
correct_answer = sample["correct_answer"]
# 记录开始时间
start_time = time.time()
# 查询记忆系统
results = self.memory_ops.query(question, limit=5)
# 记录响应时间
response_time = time.time() - start_time
# 提取最佳答案
predicted_answer = ""
if results:
predicted_answer = results[0].get('content', '')
# 计算评测指标
evaluation_result = {
"sample_id": sample["id"],
"question": question,
"correct_answer": correct_answer,
"predicted_answer": predicted_answer,
"response_time": response_time,
"num_results": len(results),
"results": results[:3] # 保存前3个结果
}
# 计算准确率(简单字符串匹配)
accuracy = self._calculate_accuracy(correct_answer, predicted_answer)
evaluation_result["accuracy"] = accuracy
# 计算ROUGE分数
rouge_scores = self._calculate_rouge_scores(correct_answer, predicted_answer)
evaluation_result.update(rouge_scores)
# 计算BERTScore
bert_score = self._calculate_bert_score(correct_answer, predicted_answer)
evaluation_result["bert_score_f1"] = bert_score
# 计算Recall@K
recall_at_k = self._calculate_recall_at_k(correct_answer, results)
evaluation_result["recall_at_k"] = recall_at_k
return evaluation_result
def _calculate_accuracy(self, correct: str, predicted: str) -> float:
"""计算准确率(基于关键词匹配)"""
if not correct or not predicted:
return 0.0
# 简单的关键词匹配
correct_words = set(correct.lower().split())
predicted_words = set(predicted.lower().split())
if len(correct_words) == 0:
return 0.0
# 计算交集比例
intersection = correct_words.intersection(predicted_words)
return len(intersection) / len(correct_words)
def _calculate_rouge_scores(self, correct: str, predicted: str) -> Dict[str, float]:
"""计算ROUGE分数"""
try:
from rouge_score import rouge_scorer
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
scores = scorer.score(correct, predicted)
return {
"rouge_1": scores['rouge1'].fmeasure,
"rouge_2": scores['rouge2'].fmeasure,
"rouge_l": scores['rougeL'].fmeasure
}
except ImportError:
print("⚠️ rouge-score未安装,跳过ROUGE评测")
return {"rouge_1": 0.0, "rouge_2": 0.0, "rouge_l": 0.0}
except Exception as e:
print(f"⚠️ ROUGE计算失败: {e}")
return {"rouge_1": 0.0, "rouge_2": 0.0, "rouge_l": 0.0}
def _calculate_bert_score(self, correct: str, predicted: str) -> float:
"""计算BERTScore"""
try:
from bert_score import score
# 计算BERTScore
P, R, F1 = score([predicted], [correct], lang='zh', verbose=False)
return F1.item()
except ImportError:
print("⚠️ bert-score未安装,跳过BERTScore评测")
return 0.0
except Exception as e:
print(f"⚠️ BERTScore计算失败: {e}")
return 0.0
def _calculate_recall_at_k(self, correct: str, results: List[Dict], k: int = 5) -> float:
"""计算Recall@K"""
if not results:
return 0.0
# 检查前K个结果中是否包含正确答案的关键信息
correct_words = set(correct.lower().split())
for i, result in enumerate(results[:k]):
content = result.get('content', '').lower()
content_words = set(content.split())
# 如果有足够的重叠,认为召回成功
intersection = correct_words.intersection(content_words)
if len(intersection) >= len(correct_words) * 0.5: # 50%重叠阈值
return 1.0
return 0.0
def run_evaluation(self, dataset_size: int = 50) -> Dict[str, Any]:
"""
运行完整评测
Args:
dataset_size: 数据集大小
Returns:
Dict: 评测结果
"""
print("🚀 开始LoCoMo评测")
print("=" * 50)
# 创建测试数据集
dataset = self.create_test_dataset(dataset_size)
# 填充记忆系统
if not self.populate_memory(dataset):
print("❌ 记忆系统填充失败")
return {}
# 等待系统稳定
print("⏳ 等待系统稳定...")
time.sleep(2)
# 评测每个样本
print(f"🔍 开始评测 {len(dataset)} 个样本...")
results = []
for i, sample in enumerate(dataset):
if self.verbose and (i + 1) % 10 == 0:
print(f" 进度: {i + 1}/{len(dataset)}")
result = self.evaluate_sample(sample)
results.append(result)
# 更新指标
self.metrics['accuracy'].append(result['accuracy'])
self.metrics['rouge_1'].append(result['rouge_1'])
self.metrics['rouge_2'].append(result['rouge_2'])
self.metrics['rouge_l'].append(result['rouge_l'])
self.metrics['bert_score_f1'].append(result['bert_score_f1'])
self.metrics['response_time'].append(result['response_time'])
self.metrics['recall_at_k'].append(result['recall_at_k'])
# 计算汇总统计
summary = self._calculate_summary_stats()
# 生成评测报告
evaluation_report = {
"evaluation_info": {
"dataset_size": dataset_size,
"timestamp": datetime.now().isoformat(),
"memory_system": "MemOS",
"enhanced_mode": self.use_enhanced,
"data_dir": self.data_dir
},
"summary_metrics": summary,
"detailed_results": results,
"dataset": dataset
}
print("✅ 评测完成")
return evaluation_report
def _calculate_summary_stats(self) -> Dict[str, float]:
"""计算汇总统计"""
import statistics
summary = {}
for metric_name, values in self.metrics.items():
if values:
summary[f"{metric_name}_mean"] = statistics.mean(values)
summary[f"{metric_name}_std"] = statistics.stdev(values) if len(values) > 1 else 0.0
summary[f"{metric_name}_min"] = min(values)
summary[f"{metric_name}_max"] = max(values)
return summary
def save_report(self, report: Dict[str, Any], filename: str = None) -> str:
"""
保存评测报告
Args:
report: 评测报告
filename: 文件名
Returns:
str: 保存的文件路径
"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"locomo_evaluation_report_{timestamp}.json"
filepath = Path(self.data_dir) / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"📄 评测报告已保存: {filepath}")
return str(filepath)
def print_summary(self, report: Dict[str, Any]):
"""打印评测摘要"""
summary = report.get("summary_metrics", {})
print("\n📊 LoCoMo评测结果摘要")
print("=" * 50)
# 核心指标
core_metrics = [
("准确率", "accuracy_mean"),
("ROUGE-1", "rouge_1_mean"),
("ROUGE-2", "rouge_2_mean"),
("ROUGE-L", "rouge_l_mean"),
("BERTScore F1", "bert_score_f1_mean"),
("Recall@5", "recall_at_k_mean"),
("平均响应时间", "response_time_mean")
]
for metric_name, key in core_metrics:
if key in summary:
value = summary[key]
if "time" in key:
print(f"{metric_name:15}: {value:.3f}秒")
else:
print(f"{metric_name:15}: {value:.3f}")
# 系统信息
info = report.get("evaluation_info", {})
print(f"\n📋 评测信息:")
print(f"数据集大小: {info.get('dataset_size', 'N/A')}")
print(f"增强模式: {info.get('enhanced_mode', 'N/A')}")
print(f"评测时间: {info.get('timestamp', 'N/A')}")
def close(self):
"""关闭评测器"""
if self.memory_ops:
self.memory_ops.close()
print("🔒 LoCoMo评测器已关闭")
def main():
"""主函数 - 运行LoCoMo评测"""
import argparse
parser = argparse.ArgumentParser(description="MemOS LoCoMo本地评测")
parser.add_argument("--dataset-size", type=int, default=20, help="测试数据集大小")
parser.add_argument("--data-dir", type=str, default="./locomo_eval_data", help="评测数据目录")
parser.add_argument("--enhanced", action="store_true", default=True, help="使用增强版MemOS")
parser.add_argument("--output", type=str, help="输出报告文件名")
parser.add_argument("--verbose", action="store_true", default=True, help="显示详细日志")
args = parser.parse_args()
print("🧠 MemOS LoCoMo本地评测")
print("=" * 50)
print(f"数据集大小: {args.dataset_size}")
print(f"数据目录: {args.data_dir}")
print(f"增强模式: {args.enhanced}")
try:
# 创建评测器
evaluator = LoCoMoEvaluator(
data_dir=args.data_dir,
use_enhanced=args.enhanced,
verbose=args.verbose
)
# 运行评测
report = evaluator.run_evaluation(dataset_size=args.dataset_size)
if report:
# 打印摘要
evaluator.print_summary(report)
# 保存报告
report_file = evaluator.save_report(report, args.output)
print(f"\n🎉 评测完成!报告已保存到: {report_file}")
else:
print("❌ 评测失败")
return False
return True
except KeyboardInterrupt:
print("\n🛑 评测被中断")
return False
except Exception as e:
print(f"❌ 评测异常: {e}")
import traceback
traceback.print_exc()
return False
finally:
if 'evaluator' in locals():
evaluator.close()
if __name__ == "__main__":
import sys
success = main()
sys.exit(0 if success else 1)