access_tracker.py•10.3 kB
#!/usr/bin/env python3
"""
MemOS访问统计追踪器
追踪记忆的访问频率和最后访问时间,为30天未访问归档提供数据支持
"""
import json
import time
from pathlib import Path
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta
from collections import defaultdict
class AccessTracker:
"""记忆访问统计追踪器"""
def __init__(self, data_dir: str = "./memos_data"):
self.data_dir = Path(data_dir)
self.access_log_file = self.data_dir / "access_stats.json"
self.daily_stats_file = self.data_dir / "daily_access_stats.json"
# 内存中的访问统计
self.access_stats: Dict[str, Dict] = {}
self.daily_stats: Dict[str, Dict] = {}
# 加载现有统计数据
self._load_access_stats()
self._load_daily_stats()
print(f"📊 访问追踪器初始化完成")
print(f" 已追踪记忆数量: {len(self.access_stats)}")
def _load_access_stats(self):
"""加载访问统计数据"""
if self.access_log_file.exists():
try:
with open(self.access_log_file, 'r', encoding='utf-8') as f:
self.access_stats = json.load(f)
except Exception as e:
print(f"⚠️ 加载访问统计失败: {e}")
self.access_stats = {}
else:
self.access_stats = {}
def _load_daily_stats(self):
"""加载每日统计数据"""
if self.daily_stats_file.exists():
try:
with open(self.daily_stats_file, 'r', encoding='utf-8') as f:
self.daily_stats = json.load(f)
except Exception as e:
print(f"⚠️ 加载每日统计失败: {e}")
self.daily_stats = {}
else:
self.daily_stats = {}
def _save_access_stats(self):
"""保存访问统计数据"""
try:
with open(self.access_log_file, 'w', encoding='utf-8') as f:
json.dump(self.access_stats, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 保存访问统计失败: {e}")
def _save_daily_stats(self):
"""保存每日统计数据"""
try:
# 转换set为list以便JSON序列化
serializable_stats = {}
for date, stats in self.daily_stats.items():
serializable_stats[date] = stats.copy()
if isinstance(stats.get("unique_memories"), set):
serializable_stats[date]["unique_memories"] = list(stats["unique_memories"])
with open(self.daily_stats_file, 'w', encoding='utf-8') as f:
json.dump(serializable_stats, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 保存每日统计失败: {e}")
def record_access(self, memory_id: str, access_type: str = "search"):
"""记录记忆访问"""
current_time = datetime.now().isoformat()
today = datetime.now().strftime("%Y-%m-%d")
# 更新访问统计
if memory_id not in self.access_stats:
self.access_stats[memory_id] = {
"first_access": current_time,
"last_access": current_time,
"access_count": 0,
"access_types": defaultdict(int)
}
stats = self.access_stats[memory_id]
stats["last_access"] = current_time
stats["access_count"] += 1
stats["access_types"][access_type] += 1
# 更新每日统计
if today not in self.daily_stats:
self.daily_stats[today] = {
"total_accesses": 0,
"unique_memories": set(),
"access_types": defaultdict(int)
}
daily = self.daily_stats[today]
daily["total_accesses"] += 1
# 确保unique_memories是set类型
if not isinstance(daily["unique_memories"], set):
daily["unique_memories"] = set(daily["unique_memories"])
daily["unique_memories"].add(memory_id)
daily["access_types"][access_type] += 1
def record_batch_access(self, memory_ids: List[str], access_type: str = "search"):
"""批量记录访问"""
for memory_id in memory_ids:
self.record_access(memory_id, access_type)
def get_inactive_memories(self, days_threshold: int = 30) -> List[str]:
"""获取超过指定天数未访问的记忆ID列表"""
cutoff_date = datetime.now() - timedelta(days=days_threshold)
inactive_memories = []
for memory_id, stats in self.access_stats.items():
last_access = datetime.fromisoformat(stats["last_access"])
if last_access < cutoff_date:
inactive_memories.append(memory_id)
return inactive_memories
def get_memory_stats(self, memory_id: str) -> Optional[Dict]:
"""获取特定记忆的访问统计"""
return self.access_stats.get(memory_id)
def get_access_summary(self) -> Dict:
"""获取访问统计摘要"""
total_memories = len(self.access_stats)
total_accesses = sum(stats["access_count"] for stats in self.access_stats.values())
# 计算不同时间段的未访问记忆数量
inactive_7d = len(self.get_inactive_memories(7))
inactive_30d = len(self.get_inactive_memories(30))
inactive_90d = len(self.get_inactive_memories(90))
# 最活跃的记忆
most_accessed = sorted(
self.access_stats.items(),
key=lambda x: x[1]["access_count"],
reverse=True
)[:5]
return {
"total_memories_tracked": total_memories,
"total_accesses": total_accesses,
"average_accesses_per_memory": total_accesses / total_memories if total_memories > 0 else 0,
"inactive_memories": {
"7_days": inactive_7d,
"30_days": inactive_30d,
"90_days": inactive_90d
},
"most_accessed": [
{
"memory_id": mid,
"access_count": stats["access_count"],
"last_access": stats["last_access"]
}
for mid, stats in most_accessed
]
}
def cleanup_old_daily_stats(self, keep_days: int = 90):
"""清理旧的每日统计数据"""
cutoff_date = datetime.now() - timedelta(days=keep_days)
cutoff_str = cutoff_date.strftime("%Y-%m-%d")
old_dates = [date for date in self.daily_stats.keys() if date < cutoff_str]
for date in old_dates:
del self.daily_stats[date]
if old_dates:
print(f"🧹 清理了 {len(old_dates)} 天的旧统计数据")
self._save_daily_stats()
def mark_as_archived(self, memory_ids: List[str]):
"""标记记忆为已归档"""
for memory_id in memory_ids:
if memory_id in self.access_stats:
self.access_stats[memory_id]["archived"] = True
self.access_stats[memory_id]["archived_at"] = datetime.now().isoformat()
def get_archival_candidates(self, days_threshold: int = 30, min_access_count: int = 0) -> List[Dict]:
"""获取归档候选记忆(详细信息)"""
inactive_ids = self.get_inactive_memories(days_threshold)
candidates = []
for memory_id in inactive_ids:
stats = self.access_stats[memory_id]
# 跳过访问次数过少的记忆(可能是重要记忆)
if stats["access_count"] < min_access_count:
continue
# 跳过已归档的记忆
if stats.get("archived", False):
continue
last_access = datetime.fromisoformat(stats["last_access"])
days_since_access = (datetime.now() - last_access).days
candidates.append({
"memory_id": memory_id,
"last_access": stats["last_access"],
"days_since_access": days_since_access,
"access_count": stats["access_count"],
"first_access": stats["first_access"]
})
# 按最后访问时间排序(最久未访问的在前)
candidates.sort(key=lambda x: x["last_access"])
return candidates
def save_all(self):
"""保存所有统计数据"""
self._save_access_stats()
self._save_daily_stats()
print("💾 访问统计数据已保存")
def print_stats(self):
"""打印统计信息"""
summary = self.get_access_summary()
print("📊 访问统计摘要:")
print(f" 追踪记忆总数: {summary['total_memories_tracked']}")
print(f" 总访问次数: {summary['total_accesses']}")
print(f" 平均访问次数: {summary['average_accesses_per_memory']:.2f}")
print("")
print("⏰ 未访问记忆统计:")
print(f" 7天未访问: {summary['inactive_memories']['7_days']}")
print(f" 30天未访问: {summary['inactive_memories']['30_days']}")
print(f" 90天未访问: {summary['inactive_memories']['90_days']}")
if summary['most_accessed']:
print("")
print("🔥 最活跃记忆:")
for item in summary['most_accessed']:
print(f" ID {item['memory_id']}: {item['access_count']}次访问")
# 全局访问追踪器实例
access_tracker = AccessTracker()
def track_memory_access(memory_id: str, access_type: str = "search"):
"""便捷函数:记录记忆访问"""
access_tracker.record_access(memory_id, access_type)
def track_batch_access(memory_ids: List[str], access_type: str = "search"):
"""便捷函数:批量记录访问"""
access_tracker.record_batch_access(memory_ids, access_type)
if __name__ == "__main__":
# 测试访问追踪器
tracker = AccessTracker()
tracker.print_stats()