memos_cli.py•31 kB
#!/usr/bin/env python3
"""
MemOS 命令行启动器
提供简单的菜单来使用MemOS的各种功能
"""
import os
import sys
from pathlib import Path
from concurrent_manager import MemOSConcurrentManager
def load_env_file():
"""加载环境变量"""
env_file = Path(".env")
if env_file.exists():
with open(env_file) as f:
for line in f:
if line.strip() and not line.startswith('#'):
key, value = line.strip().split('=', 1)
os.environ[key] = value
def show_menu():
"""显示主菜单"""
print("\n" + "="*50)
print("🧠 MemOS 智能记忆管理系统")
print("="*50)
print("请选择功能:")
print("1. 🧪 运行系统测试")
print("2. 📝 基础功能演示")
print("3. 🚀 高级功能演示")
print("4. 💬 交互式AI对话")
print("5. 🤖 智能助手模式")
print("6. 📊 查看使用指南")
print("7. 📝 一键记忆")
print("8. 🔍 智能检索")
print("9. 📅 每日回顾")
print("F. 📊 反馈统计")
print("C. 📈 容量管理")
print("P. 🗜️ 压缩管线")
print("Q. 🔧 测试Qwen模型集成")
print("0. 退出")
print("="*50)
def run_system_test():
"""运行系统测试"""
print("🧪 运行MemOS系统测试...")
os.system("poetry run python simple_test.py")
def run_basic_demo():
"""运行基础演示"""
print("📝 运行基础功能演示...")
os.system("poetry run python usage_examples.py")
def run_advanced_demo():
"""运行高级演示"""
print("🚀 运行高级功能演示...")
os.system("poetry run python advanced_examples.py")
def run_interactive_chat():
"""运行交互式对话"""
print("💬 启动交互式AI对话...")
print("提示: 输入 'add:内容' 添加记忆,输入 'quit' 退出")
try:
from usage_examples import demo_interactive_chat
demo_interactive_chat()
except Exception as e:
print(f"❌ 启动失败: {e}")
def run_smart_assistant():
"""运行智能助手"""
print("🤖 启动智能助手模式...")
print("提示: 支持多种命令,输入 'quit' 退出")
try:
from advanced_examples import demo_interactive_assistant
demo_interactive_assistant()
except Exception as e:
print(f"❌ 启动失败: {e}")
def show_usage_guide():
"""显示使用指南"""
print("📊 MemOS 使用指南")
print("="*50)
print("📖 详细使用指南请查看: USAGE_GUIDE.md")
print("\n🎯 快速开始:")
print("1. 确保API密钥已配置在 .env 文件中")
print("2. 运行系统测试验证环境")
print("3. 尝试基础演示了解功能")
print("4. 使用交互式模式进行实际操作")
print("\n💡 主要功能:")
print("- 智能记忆存储和检索")
print("- AI对话增强")
print("- 学习笔记管理")
print("- 工作任务跟踪")
print("- 个人助手服务")
print("- 一键记忆和智能检索")
print("- 每日回顾和备份")
def remember_memory():
"""一键记忆功能"""
print("\n📝 一键记忆")
print("-" * 30)
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
# 获取用户输入
print("请输入要记忆的内容:")
content = input("> ").strip()
if not content:
print("❌ 内容不能为空")
return
# 可选标签
print("\n请输入标签 (可选,多个标签用逗号分隔):")
tags_input = input("> ").strip()
tags = [tag.strip() for tag in tags_input.split(",") if tag.strip()] if tags_input else None
# 保存记忆
print("\n💾 正在保存记忆...")
success = mvp_manager.remember(content, tags=tags)
if success:
print("✅ 记忆保存成功!")
if tags:
print(f"🏷️ 标签: {', '.join(tags)}")
else:
print("❌ 记忆保存失败")
except Exception as e:
print(f"❌ 一键记忆功能出错: {e}")
def recall_memories():
"""智能检索功能"""
print("\n🔍 智能检索")
print("-" * 30)
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
# 获取查询内容
print("请输入要检索的内容:")
query = input("> ").strip()
if not query:
print("❌ 查询内容不能为空")
return
# 获取返回数量
print("\n请输入返回结果数量 (默认5):")
top_k_input = input("> ").strip()
try:
top_k = int(top_k_input) if top_k_input else 5
top_k = max(1, min(top_k, 20)) # 限制在1-20之间
except ValueError:
top_k = 5
# 询问是否启用时间感知排序
print("\n是否启用时间感知排序? (y/N, 默认启用):")
time_aware_input = input("> ").strip().lower()
use_time_decay = time_aware_input in ['y', 'yes', '是', '启用', ''] or time_aware_input == ''
# 如果启用时间感知,询问时间衰减参数
time_decay_tau = 30.0 # 默认30天
if use_time_decay:
print(f"\n时间衰减参数τ (天数, 默认{time_decay_tau}):")
tau_input = input("> ").strip()
try:
if tau_input:
time_decay_tau = float(tau_input)
time_decay_tau = max(1.0, min(time_decay_tau, 365.0)) # 限制在1-365天之间
except ValueError:
time_decay_tau = 30.0
# 询问是否启用主题漂移检测
print("\n是否启用主题漂移检测? (Y/n, 默认启用):")
drift_input = input("> ").strip().lower()
use_topic_drift = drift_input not in ['n', 'no', '否', '禁用']
# 执行检索
time_info = f", τ={time_decay_tau}天" if use_time_decay else ", 无时间感知"
drift_info = ", 主题漂移检测" if use_topic_drift else ", 无主题检测"
print(f"\n🔍 正在检索相关记忆 (top {top_k}{time_info}{drift_info})...")
results = mvp_manager.recall(
query,
top_k=top_k,
use_time_decay=use_time_decay,
time_decay_tau=time_decay_tau,
use_topic_drift_detection=use_topic_drift
)
if results:
print(f"\n✅ 找到 {len(results)} 条相关记忆:")
print("=" * 50)
for i, result in enumerate(results, 1):
print(f"\n📄 记忆 {i}:")
print(f"ID: {result.get('id', 'N/A')}")
print(f"内容: {result.get('content', 'N/A')}")
# 显示反馈信息
metadata = result.get('metadata', {})
usage_score = metadata.get('usage_score', 0)
feedback_count = metadata.get('feedback_count', 0)
if usage_score != 0 or feedback_count > 0:
print(f"反馈分数: {usage_score} (共{feedback_count}次反馈)")
if 'tags' in metadata and metadata['tags']:
print(f"标签: {', '.join(metadata['tags'])}")
if 'timestamp' in metadata:
print(f"时间: {metadata['timestamp']}")
# 显示相似度分数和时间信息
if 'score' in result:
print(f"相似度: {result['score']:.3f}")
# 显示时间感知信息
if 'time_decay_factor' in result:
time_decay_factor = result['time_decay_factor']
days_ago = result.get('days_ago', 0)
if days_ago != float('inf'):
print(f"时间权重: {time_decay_factor:.3f} ({days_ago:.1f}天前)")
else:
print(f"时间权重: {time_decay_factor:.3f} (无时间戳)")
print("-" * 30)
# 提供反馈选项
print("\n💬 您可以对检索结果提供反馈:")
print("输入格式: <记忆序号> <反馈类型>")
print("反馈类型: 👍 (或 +) 表示有用, 👎 (或 -) 表示无用")
print("示例: 1 👍 或 2 - 或 直接按回车跳过")
feedback_input = input("\n> ").strip()
if feedback_input:
try:
# 解析反馈输入
parts = feedback_input.split()
if len(parts) >= 2:
memory_index = int(parts[0]) - 1 # 转换为0基索引
feedback_type = parts[1]
if 0 <= memory_index < len(results):
memory_id = results[memory_index].get('id')
# 标准化反馈类型
if feedback_type in ['+', '👍', 'up', 'good']:
feedback_type = 'thumbs_up'
elif feedback_type in ['-', '👎', 'down', 'bad']:
feedback_type = 'thumbs_down'
# 提供反馈
success = mvp_manager.provide_feedback(str(memory_id), feedback_type)
if success:
print("✅ 反馈已保存,将影响后续检索排序")
else:
print("❌ 反馈保存失败")
else:
print("❌ 无效的记忆序号")
else:
print("❌ 反馈格式错误")
except ValueError:
print("❌ 反馈格式错误,请输入数字序号")
except Exception as e:
print(f"❌ 反馈处理失败: {e}")
else:
print("❌ 未找到相关记忆")
except Exception as e:
print(f"❌ 智能检索功能出错: {e}")
def daily_review():
"""每日回顾功能"""
print("\n📅 每日回顾")
print("-" * 30)
try:
# 导入每日回顾生成器
from daily_review import DailyReviewGenerator
# 初始化回顾生成器
generator = DailyReviewGenerator()
# 显示菜单
print("请选择回顾类型:")
print("1. 📅 今日回顾")
print("2. 📊 周总结")
print("3. 💾 备份记忆")
print("4. 📝 自定义日期回顾")
choice = input("\n请选择 (1-4): ").strip()
if choice == "1":
# 今日回顾
review_file = generator.generate_daily_review()
print(f"\n✅ 今日回顾已生成: {review_file}")
# 询问是否查看内容
view_choice = input("\n是否查看回顾内容? (y/n): ").strip().lower()
if view_choice == 'y':
with open(review_file, 'r', encoding='utf-8') as f:
content = f.read()
print("\n" + "="*50)
print(content)
print("="*50)
elif choice == "2":
# 周总结
summary_file = generator.generate_weekly_summary()
print(f"\n✅ 周总结已生成: {summary_file}")
# 询问是否查看内容
view_choice = input("\n是否查看总结内容? (y/n): ").strip().lower()
if view_choice == 'y':
with open(summary_file, 'r', encoding='utf-8') as f:
content = f.read()
print("\n" + "="*50)
print(content)
print("="*50)
elif choice == "3":
# 备份记忆
backup_name = input("\n请输入备份名称 (留空使用默认名称): ").strip()
backup_name = backup_name if backup_name else None
backup_path = generator.backup_memories(backup_name)
if backup_path:
print(f"\n✅ 记忆备份完成: {backup_path}")
else:
print("\n❌ 记忆备份失败")
elif choice == "4":
# 自定义日期回顾
date_input = input("\n请输入日期 (YYYY-MM-DD格式,留空为今日): ").strip()
if date_input:
# 验证日期格式
try:
from datetime import datetime
datetime.strptime(date_input, "%Y-%m-%d")
review_file = generator.generate_daily_review(date_input)
print(f"\n✅ {date_input} 回顾已生成: {review_file}")
# 询问是否查看内容
view_choice = input("\n是否查看回顾内容? (y/n): ").strip().lower()
if view_choice == 'y':
with open(review_file, 'r', encoding='utf-8') as f:
content = f.read()
print("\n" + "="*50)
print(content)
print("="*50)
except ValueError:
print("❌ 日期格式错误,请使用 YYYY-MM-DD 格式")
else:
# 默认今日
review_file = generator.generate_daily_review()
print(f"\n✅ 今日回顾已生成: {review_file}")
else:
print("❌ 无效选项")
except Exception as e:
print(f"❌ 每日回顾功能出错: {e}")
import traceback
traceback.print_exc()
def show_feedback_stats():
"""显示反馈统计信息"""
print("\n📊 反馈统计")
print("-" * 30)
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
# 获取反馈统计
stats = mvp_manager.get_feedback_stats()
if stats:
print(f"\n📈 反馈统计概览:")
print(f" 总记忆数: {stats.get('total_memories', 0)}")
print(f" 有反馈的记忆: {stats.get('memories_with_feedback', 0)}")
print(f" 正面反馈: {stats.get('positive_feedback', 0)} 👍")
print(f" 负面反馈: {stats.get('negative_feedback', 0)} 👎")
print(f" 平均分数: {stats.get('average_score', 0)}")
print(f" 需要重写: {stats.get('needs_rewrite', 0)}")
# 计算反馈覆盖率
total = stats.get('total_memories', 0)
with_feedback = stats.get('memories_with_feedback', 0)
if total > 0:
coverage = (with_feedback / total) * 100
print(f" 反馈覆盖率: {coverage:.1f}%")
print("\n💡 提示:")
print("- 在智能检索(选项8)中可以对结果提供👍/👎反馈")
print("- 正面反馈会提升记忆在检索中的排序")
print("- 负面反馈会降低记忆的排序权重")
print("- 分数过低(-3以下)的记忆会被标记为需要重写")
else:
print("❌ 无法获取反馈统计信息")
except Exception as e:
print(f"❌ 反馈统计功能出错: {e}")
def show_capacity_management():
"""显示容量管理功能"""
print("\n📈 智能容量管理")
print("-" * 30)
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
while True:
print("\n📊 容量管理选项:")
print("1. 查看容量报告")
print("2. 查看性能指标")
print("3. 手动触发优化")
print("4. 查看系统状态")
print("0. 返回主菜单")
choice = input("\n请选择功能 (0-4): ").strip()
if choice == "0":
break
elif choice == "1":
# 查看容量报告
print("\n📋 容量管理报告:")
print("=" * 40)
report = mvp_manager.get_capacity_report()
if "error" not in report:
# 显示容量配置
config = report.get("capacity_config", {})
print(f"📊 容量配置:")
print(f" 工作内存: {config.get('working_memory_capacity', 0)}")
print(f" 用户内存: {config.get('user_memory_capacity', 0)}")
print(f" 长期内存: {config.get('long_term_memory_capacity', 0)}")
# 显示当前使用情况
memory_stats = report.get("memory_stats", {})
if memory_stats:
print(f"\n📈 内存使用情况:")
for mem_type, stats in memory_stats.items():
usage = stats.get("usage_percentage", 0)
current = stats.get("current_count", 0)
capacity = stats.get("capacity", 0)
status = "🔴" if usage >= 90 else "🟡" if usage >= 80 else "🟢"
print(f" {status} {mem_type}: {current}/{capacity} ({usage:.1f}%)")
# 显示系统健康状态
health = report.get("system_health", "unknown")
health_icon = "🟢" if health == "healthy" else "🟡" if health == "warning" else "🔴"
print(f"\n{health_icon} 系统健康状态: {health}")
else:
print(f"❌ {report.get('error', '未知错误')}")
elif choice == "2":
# 查看性能指标
print("\n⚡ 性能指标:")
print("=" * 40)
metrics = mvp_manager.get_performance_metrics()
if "error" not in metrics:
print(f"📊 操作性能:")
print(f" 记忆添加平均时间: {metrics.get('memory_add_avg_time', 0):.3f}s")
print(f" 记忆检索平均时间: {metrics.get('memory_search_avg_time', 0):.3f}s")
print(f" 总记忆数量: {metrics.get('memory_total_count', 0)}")
print(f"\n💻 系统资源:")
print(f" CPU使用率: {metrics.get('cpu_usage', 0):.1f}%")
print(f" 内存使用: {metrics.get('memory_usage_mb', 0):.1f}MB")
print(f" 磁盘使用: {metrics.get('disk_usage_mb', 0):.1f}MB")
health = metrics.get('system_health', 'unknown')
health_icon = "🟢" if health == "healthy" else "🟡" if health == "warning" else "🔴"
print(f"\n{health_icon} 系统健康: {health}")
else:
print(f"❌ {metrics.get('error', '未知错误')}")
elif choice == "3":
# 手动触发优化
print("\n🔧 手动触发系统优化...")
success = mvp_manager.trigger_manual_optimization()
if success:
print("✅ 系统优化完成")
else:
print("❌ 系统优化失败")
elif choice == "4":
# 查看系统状态
print("\n🔍 系统状态:")
print("=" * 40)
status = mvp_manager.get_status_info()
# 基本信息
print(f"📊 基本信息:")
print(f" 运行模式: {status.get('mode', 'unknown')}")
print(f" 嵌入模型: {status.get('model', 'unknown')}")
print(f" 重排器: {status.get('reranker', 'unknown')}")
print(f" 运行状态: {status.get('status', 'unknown')}")
# 官方配置信息
if status.get('official_config'):
print(f"\n✅ 官方配置已启用:")
print(f" 配置类型: {status.get('config_type', 'unknown')}")
print(f" 内存后端: {status.get('memory_backend', 'unknown')}")
# 容量管理信息
capacity_mgmt = status.get('capacity_management', {})
if capacity_mgmt.get('enabled'):
print(f"\n📈 容量管理:")
print(f" 监控状态: {capacity_mgmt.get('monitoring_status', 'unknown')}")
print(f" 系统健康: {capacity_mgmt.get('system_health', 'unknown')}")
current_metrics = capacity_mgmt.get('current_metrics', {})
print(f" 总记忆数: {current_metrics.get('total_memories', 0)}")
print(f" 添加平均时间: {current_metrics.get('add_avg_time', 0):.3f}s")
print(f" 检索平均时间: {current_metrics.get('search_avg_time', 0):.3f}s")
else:
print(f"\n❌ 容量管理未启用")
else:
print("❌ 无效选项,请重新选择")
except Exception as e:
print(f"❌ 容量管理功能出错: {e}")
def show_compression_pipeline():
"""显示压缩管线功能"""
print("\n🗜️ 自动摘要压缩管线")
print("-" * 30)
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
while True:
print("\n🗜️ 压缩管线选项:")
print("1. 查看压缩统计")
print("2. 手动触发压缩")
print("3. 测试模式压缩")
print("4. 清理旧归档")
print("5. 查看压缩配置")
print("0. 返回主菜单")
choice = input("\n请选择功能 (0-5): ").strip()
if choice == "0":
break
elif choice == "1":
# 查看压缩统计
print("\n📊 压缩统计信息:")
print("=" * 40)
stats = mvp_manager.get_compression_stats()
if "error" not in stats:
print(f"📈 压缩统计:")
print(f" 总压缩数: {stats.get('total_compressed', 0)}")
print(f" 总节省tokens: {stats.get('total_tokens_saved', 0)}")
print(f" 最后压缩时间: {stats.get('last_compression_time', '从未')}")
history = stats.get('compression_history', [])
if history:
print(f"\n📋 最近压缩记录:")
for i, record in enumerate(history[-5:], 1): # 显示最近5条
print(f" {i}. ID:{record.original_id} "
f"节省:{record.token_saved}tokens "
f"比例:{record.compression_ratio:.1%}")
else:
print(f"❌ {stats.get('error', '未知错误')}")
elif choice == "2":
# 手动触发压缩
print("\n🔧 手动触发压缩...")
max_memories = input("请输入最大处理数量 (默认50): ").strip()
try:
max_memories = int(max_memories) if max_memories else 50
except ValueError:
max_memories = 50
result = mvp_manager.trigger_compression(max_memories=max_memories)
if "error" not in result:
print(f"✅ 压缩完成:")
print(f" 处理记忆数: {result.get('processed', 0)}")
print(f" 成功压缩: {result.get('compressed', 0)}")
print(f" 节省tokens: {result.get('tokens_saved', 0)}")
print(f" 处理时间: {result.get('duration', 0):.2f}s")
else:
print(f"❌ 压缩失败: {result.get('error', '未知错误')}")
elif choice == "3":
# 测试模式压缩
print("\n🧪 测试模式压缩(忽略时间限制)...")
max_memories = input("请输入最大处理数量 (默认10): ").strip()
try:
max_memories = int(max_memories) if max_memories else 10
except ValueError:
max_memories = 10
result = mvp_manager.trigger_compression(max_memories=max_memories, test_mode=True)
if "error" not in result:
print(f"✅ 测试压缩完成:")
print(f" 处理记忆数: {result.get('processed', 0)}")
print(f" 成功压缩: {result.get('compressed', 0)}")
print(f" 节省tokens: {result.get('tokens_saved', 0)}")
print(f" 处理时间: {result.get('duration', 0):.2f}s")
# 显示详细结果
compression_results = result.get('compression_results', [])
if compression_results:
print(f"\n📋 压缩详情:")
for i, comp_result in enumerate(compression_results, 1):
print(f" {i}. ID:{comp_result['id']} "
f"节省:{comp_result['tokens_saved']}tokens "
f"比例:{comp_result['compression_ratio']:.1%}")
else:
print(f"❌ 测试压缩失败: {result.get('error', '未知错误')}")
elif choice == "4":
# 清理旧归档
print("\n🧹 清理旧归档文件...")
days = input("请输入保留天数 (默认30天): ").strip()
try:
days = int(days) if days else 30
except ValueError:
days = 30
success = mvp_manager.cleanup_old_archives(days)
if success:
print(f"✅ 清理完成,删除了超过{days}天的归档文件")
else:
print(f"❌ 清理失败")
elif choice == "5":
# 查看压缩配置
print("\n⚙️ 压缩配置信息:")
print("=" * 40)
status = mvp_manager.get_status_info()
compression_info = status.get("compression_pipeline", {})
if compression_info.get("enabled"):
print(f"📊 压缩配置:")
print(f" 自动调度: {compression_info.get('auto_schedule', False)}")
print(f" Token阈值: {compression_info.get('token_threshold', 0)}")
print(f" 压缩比例: {compression_info.get('compression_ratio', 0):.1%}")
print(f" 总压缩数: {compression_info.get('total_compressed', 0)}")
print(f" 总节省tokens: {compression_info.get('total_tokens_saved', 0)}")
print(f" 最后压缩: {compression_info.get('last_compression', '从未')}")
else:
print(f"❌ 压缩管线未启用")
if "error" in compression_info:
print(f" 错误: {compression_info['error']}")
else:
print("❌ 无效选项,请重新选择")
except Exception as e:
print(f"❌ 压缩管线功能出错: {e}")
def check_environment():
"""检查环境配置"""
print("🔍 检查环境配置...")
api_key = os.getenv("SILICONFLOW_API_KEY")
if not api_key:
print("⚠️ 未找到 SILICONFLOW_API_KEY")
print("💡 请在 .env 文件中配置API密钥")
return False
try:
import openai
import qdrant_client
print("✅ 核心依赖检查通过")
return True
except ImportError as e:
print(f"❌ 依赖检查失败: {e}")
print("💡 请运行: poetry install")
return False
def main():
"""主函数"""
# 初始化并发管理器
data_dir = "./memos_data"
print(f"[DEBUG] CLI using data_dir = {data_dir}")
concurrent_manager = MemOSConcurrentManager(data_dir)
# 注册CLI进程
if not concurrent_manager.register_process("cli"):
print("❌ 无法启动MemOS CLI,可能已达到并发限制")
sys.exit(1)
try:
load_env_file()
if not check_environment():
print("\n❌ 环境检查失败,请先配置环境")
return
while True:
show_menu()
try:
choice = input("\n请输入选项 (0-9): ").strip()
if choice == "0":
print("👋 感谢使用MemOS!")
break
elif choice == "1":
run_system_test()
elif choice == "2":
run_basic_demo()
elif choice == "3":
run_advanced_demo()
elif choice == "4":
run_interactive_chat()
elif choice == "5":
run_smart_assistant()
elif choice == "6":
show_usage_guide()
elif choice == "7":
remember_memory()
elif choice == "8":
recall_memories()
elif choice == "9":
daily_review()
elif choice.upper() == "F":
show_feedback_stats()
elif choice.upper() == "C":
show_capacity_management()
elif choice.upper() == "P":
show_compression_pipeline()
elif choice.upper() == "Q":
# 保留原有的Qwen测试功能
print("🔧 Qwen模型集成测试功能暂未实现")
else:
print("❌ 无效选项,请重新选择")
except KeyboardInterrupt:
print("\n\n👋 用户中断,退出程序")
break
except Exception as e:
print(f"❌ 发生错误: {e}")
input("\n按回车键继续...")
finally:
# 确保清理资源
concurrent_manager.cleanup()
if __name__ == "__main__":
main()