Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
memos_cli.py31 kB
#!/usr/bin/env python3 """ MemOS 命令行启动器 提供简单的菜单来使用MemOS的各种功能 """ import os import sys from pathlib import Path from concurrent_manager import MemOSConcurrentManager def load_env_file(): """加载环境变量""" env_file = Path(".env") if env_file.exists(): with open(env_file) as f: for line in f: if line.strip() and not line.startswith('#'): key, value = line.strip().split('=', 1) os.environ[key] = value def show_menu(): """显示主菜单""" print("\n" + "="*50) print("🧠 MemOS 智能记忆管理系统") print("="*50) print("请选择功能:") print("1. 🧪 运行系统测试") print("2. 📝 基础功能演示") print("3. 🚀 高级功能演示") print("4. 💬 交互式AI对话") print("5. 🤖 智能助手模式") print("6. 📊 查看使用指南") print("7. 📝 一键记忆") print("8. 🔍 智能检索") print("9. 📅 每日回顾") print("F. 📊 反馈统计") print("C. 📈 容量管理") print("P. 🗜️ 压缩管线") print("Q. 🔧 测试Qwen模型集成") print("0. 退出") print("="*50) def run_system_test(): """运行系统测试""" print("🧪 运行MemOS系统测试...") os.system("poetry run python simple_test.py") def run_basic_demo(): """运行基础演示""" print("📝 运行基础功能演示...") os.system("poetry run python usage_examples.py") def run_advanced_demo(): """运行高级演示""" print("🚀 运行高级功能演示...") os.system("poetry run python advanced_examples.py") def run_interactive_chat(): """运行交互式对话""" print("💬 启动交互式AI对话...") print("提示: 输入 'add:内容' 添加记忆,输入 'quit' 退出") try: from usage_examples import demo_interactive_chat demo_interactive_chat() except Exception as e: print(f"❌ 启动失败: {e}") def run_smart_assistant(): """运行智能助手""" print("🤖 启动智能助手模式...") print("提示: 支持多种命令,输入 'quit' 退出") try: from advanced_examples import demo_interactive_assistant demo_interactive_assistant() except Exception as e: print(f"❌ 启动失败: {e}") def show_usage_guide(): """显示使用指南""" print("📊 MemOS 使用指南") print("="*50) print("📖 详细使用指南请查看: USAGE_GUIDE.md") print("\n🎯 快速开始:") print("1. 确保API密钥已配置在 .env 文件中") print("2. 运行系统测试验证环境") print("3. 尝试基础演示了解功能") print("4. 使用交互式模式进行实际操作") print("\n💡 主要功能:") print("- 智能记忆存储和检索") print("- AI对话增强") print("- 学习笔记管理") print("- 工作任务跟踪") print("- 个人助手服务") print("- 一键记忆和智能检索") print("- 每日回顾和备份") def remember_memory(): """一键记忆功能""" print("\n📝 一键记忆") print("-" * 30) try: # 导入MVP记忆管理器 from mvp_memory import MVPMemoryManager # 初始化记忆管理器 mvp_manager = MVPMemoryManager() # 获取用户输入 print("请输入要记忆的内容:") content = input("> ").strip() if not content: print("❌ 内容不能为空") return # 可选标签 print("\n请输入标签 (可选,多个标签用逗号分隔):") tags_input = input("> ").strip() tags = [tag.strip() for tag in tags_input.split(",") if tag.strip()] if tags_input else None # 保存记忆 print("\n💾 正在保存记忆...") success = mvp_manager.remember(content, tags=tags) if success: print("✅ 记忆保存成功!") if tags: print(f"🏷️ 标签: {', '.join(tags)}") else: print("❌ 记忆保存失败") except Exception as e: print(f"❌ 一键记忆功能出错: {e}") def recall_memories(): """智能检索功能""" print("\n🔍 智能检索") print("-" * 30) try: # 导入MVP记忆管理器 from mvp_memory import MVPMemoryManager # 初始化记忆管理器 mvp_manager = MVPMemoryManager() # 获取查询内容 print("请输入要检索的内容:") query = input("> ").strip() if not query: print("❌ 查询内容不能为空") return # 获取返回数量 print("\n请输入返回结果数量 (默认5):") top_k_input = input("> ").strip() try: top_k = int(top_k_input) if top_k_input else 5 top_k = max(1, min(top_k, 20)) # 限制在1-20之间 except ValueError: top_k = 5 # 询问是否启用时间感知排序 print("\n是否启用时间感知排序? (y/N, 默认启用):") time_aware_input = input("> ").strip().lower() use_time_decay = time_aware_input in ['y', 'yes', '是', '启用', ''] or time_aware_input == '' # 如果启用时间感知,询问时间衰减参数 time_decay_tau = 30.0 # 默认30天 if use_time_decay: print(f"\n时间衰减参数τ (天数, 默认{time_decay_tau}):") tau_input = input("> ").strip() try: if tau_input: time_decay_tau = float(tau_input) time_decay_tau = max(1.0, min(time_decay_tau, 365.0)) # 限制在1-365天之间 except ValueError: time_decay_tau = 30.0 # 询问是否启用主题漂移检测 print("\n是否启用主题漂移检测? (Y/n, 默认启用):") drift_input = input("> ").strip().lower() use_topic_drift = drift_input not in ['n', 'no', '否', '禁用'] # 执行检索 time_info = f", τ={time_decay_tau}天" if use_time_decay else ", 无时间感知" drift_info = ", 主题漂移检测" if use_topic_drift else ", 无主题检测" print(f"\n🔍 正在检索相关记忆 (top {top_k}{time_info}{drift_info})...") results = mvp_manager.recall( query, top_k=top_k, use_time_decay=use_time_decay, time_decay_tau=time_decay_tau, use_topic_drift_detection=use_topic_drift ) if results: print(f"\n✅ 找到 {len(results)} 条相关记忆:") print("=" * 50) for i, result in enumerate(results, 1): print(f"\n📄 记忆 {i}:") print(f"ID: {result.get('id', 'N/A')}") print(f"内容: {result.get('content', 'N/A')}") # 显示反馈信息 metadata = result.get('metadata', {}) usage_score = metadata.get('usage_score', 0) feedback_count = metadata.get('feedback_count', 0) if usage_score != 0 or feedback_count > 0: print(f"反馈分数: {usage_score} (共{feedback_count}次反馈)") if 'tags' in metadata and metadata['tags']: print(f"标签: {', '.join(metadata['tags'])}") if 'timestamp' in metadata: print(f"时间: {metadata['timestamp']}") # 显示相似度分数和时间信息 if 'score' in result: print(f"相似度: {result['score']:.3f}") # 显示时间感知信息 if 'time_decay_factor' in result: time_decay_factor = result['time_decay_factor'] days_ago = result.get('days_ago', 0) if days_ago != float('inf'): print(f"时间权重: {time_decay_factor:.3f} ({days_ago:.1f}天前)") else: print(f"时间权重: {time_decay_factor:.3f} (无时间戳)") print("-" * 30) # 提供反馈选项 print("\n💬 您可以对检索结果提供反馈:") print("输入格式: <记忆序号> <反馈类型>") print("反馈类型: 👍 (或 +) 表示有用, 👎 (或 -) 表示无用") print("示例: 1 👍 或 2 - 或 直接按回车跳过") feedback_input = input("\n> ").strip() if feedback_input: try: # 解析反馈输入 parts = feedback_input.split() if len(parts) >= 2: memory_index = int(parts[0]) - 1 # 转换为0基索引 feedback_type = parts[1] if 0 <= memory_index < len(results): memory_id = results[memory_index].get('id') # 标准化反馈类型 if feedback_type in ['+', '👍', 'up', 'good']: feedback_type = 'thumbs_up' elif feedback_type in ['-', '👎', 'down', 'bad']: feedback_type = 'thumbs_down' # 提供反馈 success = mvp_manager.provide_feedback(str(memory_id), feedback_type) if success: print("✅ 反馈已保存,将影响后续检索排序") else: print("❌ 反馈保存失败") else: print("❌ 无效的记忆序号") else: print("❌ 反馈格式错误") except ValueError: print("❌ 反馈格式错误,请输入数字序号") except Exception as e: print(f"❌ 反馈处理失败: {e}") else: print("❌ 未找到相关记忆") except Exception as e: print(f"❌ 智能检索功能出错: {e}") def daily_review(): """每日回顾功能""" print("\n📅 每日回顾") print("-" * 30) try: # 导入每日回顾生成器 from daily_review import DailyReviewGenerator # 初始化回顾生成器 generator = DailyReviewGenerator() # 显示菜单 print("请选择回顾类型:") print("1. 📅 今日回顾") print("2. 📊 周总结") print("3. 💾 备份记忆") print("4. 📝 自定义日期回顾") choice = input("\n请选择 (1-4): ").strip() if choice == "1": # 今日回顾 review_file = generator.generate_daily_review() print(f"\n✅ 今日回顾已生成: {review_file}") # 询问是否查看内容 view_choice = input("\n是否查看回顾内容? (y/n): ").strip().lower() if view_choice == 'y': with open(review_file, 'r', encoding='utf-8') as f: content = f.read() print("\n" + "="*50) print(content) print("="*50) elif choice == "2": # 周总结 summary_file = generator.generate_weekly_summary() print(f"\n✅ 周总结已生成: {summary_file}") # 询问是否查看内容 view_choice = input("\n是否查看总结内容? (y/n): ").strip().lower() if view_choice == 'y': with open(summary_file, 'r', encoding='utf-8') as f: content = f.read() print("\n" + "="*50) print(content) print("="*50) elif choice == "3": # 备份记忆 backup_name = input("\n请输入备份名称 (留空使用默认名称): ").strip() backup_name = backup_name if backup_name else None backup_path = generator.backup_memories(backup_name) if backup_path: print(f"\n✅ 记忆备份完成: {backup_path}") else: print("\n❌ 记忆备份失败") elif choice == "4": # 自定义日期回顾 date_input = input("\n请输入日期 (YYYY-MM-DD格式,留空为今日): ").strip() if date_input: # 验证日期格式 try: from datetime import datetime datetime.strptime(date_input, "%Y-%m-%d") review_file = generator.generate_daily_review(date_input) print(f"\n✅ {date_input} 回顾已生成: {review_file}") # 询问是否查看内容 view_choice = input("\n是否查看回顾内容? (y/n): ").strip().lower() if view_choice == 'y': with open(review_file, 'r', encoding='utf-8') as f: content = f.read() print("\n" + "="*50) print(content) print("="*50) except ValueError: print("❌ 日期格式错误,请使用 YYYY-MM-DD 格式") else: # 默认今日 review_file = generator.generate_daily_review() print(f"\n✅ 今日回顾已生成: {review_file}") else: print("❌ 无效选项") except Exception as e: print(f"❌ 每日回顾功能出错: {e}") import traceback traceback.print_exc() def show_feedback_stats(): """显示反馈统计信息""" print("\n📊 反馈统计") print("-" * 30) try: # 导入MVP记忆管理器 from mvp_memory import MVPMemoryManager # 初始化记忆管理器 mvp_manager = MVPMemoryManager() # 获取反馈统计 stats = mvp_manager.get_feedback_stats() if stats: print(f"\n📈 反馈统计概览:") print(f" 总记忆数: {stats.get('total_memories', 0)}") print(f" 有反馈的记忆: {stats.get('memories_with_feedback', 0)}") print(f" 正面反馈: {stats.get('positive_feedback', 0)} 👍") print(f" 负面反馈: {stats.get('negative_feedback', 0)} 👎") print(f" 平均分数: {stats.get('average_score', 0)}") print(f" 需要重写: {stats.get('needs_rewrite', 0)}") # 计算反馈覆盖率 total = stats.get('total_memories', 0) with_feedback = stats.get('memories_with_feedback', 0) if total > 0: coverage = (with_feedback / total) * 100 print(f" 反馈覆盖率: {coverage:.1f}%") print("\n💡 提示:") print("- 在智能检索(选项8)中可以对结果提供👍/👎反馈") print("- 正面反馈会提升记忆在检索中的排序") print("- 负面反馈会降低记忆的排序权重") print("- 分数过低(-3以下)的记忆会被标记为需要重写") else: print("❌ 无法获取反馈统计信息") except Exception as e: print(f"❌ 反馈统计功能出错: {e}") def show_capacity_management(): """显示容量管理功能""" print("\n📈 智能容量管理") print("-" * 30) try: # 导入MVP记忆管理器 from mvp_memory import MVPMemoryManager # 初始化记忆管理器 mvp_manager = MVPMemoryManager() while True: print("\n📊 容量管理选项:") print("1. 查看容量报告") print("2. 查看性能指标") print("3. 手动触发优化") print("4. 查看系统状态") print("0. 返回主菜单") choice = input("\n请选择功能 (0-4): ").strip() if choice == "0": break elif choice == "1": # 查看容量报告 print("\n📋 容量管理报告:") print("=" * 40) report = mvp_manager.get_capacity_report() if "error" not in report: # 显示容量配置 config = report.get("capacity_config", {}) print(f"📊 容量配置:") print(f" 工作内存: {config.get('working_memory_capacity', 0)}") print(f" 用户内存: {config.get('user_memory_capacity', 0)}") print(f" 长期内存: {config.get('long_term_memory_capacity', 0)}") # 显示当前使用情况 memory_stats = report.get("memory_stats", {}) if memory_stats: print(f"\n📈 内存使用情况:") for mem_type, stats in memory_stats.items(): usage = stats.get("usage_percentage", 0) current = stats.get("current_count", 0) capacity = stats.get("capacity", 0) status = "🔴" if usage >= 90 else "🟡" if usage >= 80 else "🟢" print(f" {status} {mem_type}: {current}/{capacity} ({usage:.1f}%)") # 显示系统健康状态 health = report.get("system_health", "unknown") health_icon = "🟢" if health == "healthy" else "🟡" if health == "warning" else "🔴" print(f"\n{health_icon} 系统健康状态: {health}") else: print(f"❌ {report.get('error', '未知错误')}") elif choice == "2": # 查看性能指标 print("\n⚡ 性能指标:") print("=" * 40) metrics = mvp_manager.get_performance_metrics() if "error" not in metrics: print(f"📊 操作性能:") print(f" 记忆添加平均时间: {metrics.get('memory_add_avg_time', 0):.3f}s") print(f" 记忆检索平均时间: {metrics.get('memory_search_avg_time', 0):.3f}s") print(f" 总记忆数量: {metrics.get('memory_total_count', 0)}") print(f"\n💻 系统资源:") print(f" CPU使用率: {metrics.get('cpu_usage', 0):.1f}%") print(f" 内存使用: {metrics.get('memory_usage_mb', 0):.1f}MB") print(f" 磁盘使用: {metrics.get('disk_usage_mb', 0):.1f}MB") health = metrics.get('system_health', 'unknown') health_icon = "🟢" if health == "healthy" else "🟡" if health == "warning" else "🔴" print(f"\n{health_icon} 系统健康: {health}") else: print(f"❌ {metrics.get('error', '未知错误')}") elif choice == "3": # 手动触发优化 print("\n🔧 手动触发系统优化...") success = mvp_manager.trigger_manual_optimization() if success: print("✅ 系统优化完成") else: print("❌ 系统优化失败") elif choice == "4": # 查看系统状态 print("\n🔍 系统状态:") print("=" * 40) status = mvp_manager.get_status_info() # 基本信息 print(f"📊 基本信息:") print(f" 运行模式: {status.get('mode', 'unknown')}") print(f" 嵌入模型: {status.get('model', 'unknown')}") print(f" 重排器: {status.get('reranker', 'unknown')}") print(f" 运行状态: {status.get('status', 'unknown')}") # 官方配置信息 if status.get('official_config'): print(f"\n✅ 官方配置已启用:") print(f" 配置类型: {status.get('config_type', 'unknown')}") print(f" 内存后端: {status.get('memory_backend', 'unknown')}") # 容量管理信息 capacity_mgmt = status.get('capacity_management', {}) if capacity_mgmt.get('enabled'): print(f"\n📈 容量管理:") print(f" 监控状态: {capacity_mgmt.get('monitoring_status', 'unknown')}") print(f" 系统健康: {capacity_mgmt.get('system_health', 'unknown')}") current_metrics = capacity_mgmt.get('current_metrics', {}) print(f" 总记忆数: {current_metrics.get('total_memories', 0)}") print(f" 添加平均时间: {current_metrics.get('add_avg_time', 0):.3f}s") print(f" 检索平均时间: {current_metrics.get('search_avg_time', 0):.3f}s") else: print(f"\n❌ 容量管理未启用") else: print("❌ 无效选项,请重新选择") except Exception as e: print(f"❌ 容量管理功能出错: {e}") def show_compression_pipeline(): """显示压缩管线功能""" print("\n🗜️ 自动摘要压缩管线") print("-" * 30) try: # 导入MVP记忆管理器 from mvp_memory import MVPMemoryManager # 初始化记忆管理器 mvp_manager = MVPMemoryManager() while True: print("\n🗜️ 压缩管线选项:") print("1. 查看压缩统计") print("2. 手动触发压缩") print("3. 测试模式压缩") print("4. 清理旧归档") print("5. 查看压缩配置") print("0. 返回主菜单") choice = input("\n请选择功能 (0-5): ").strip() if choice == "0": break elif choice == "1": # 查看压缩统计 print("\n📊 压缩统计信息:") print("=" * 40) stats = mvp_manager.get_compression_stats() if "error" not in stats: print(f"📈 压缩统计:") print(f" 总压缩数: {stats.get('total_compressed', 0)}") print(f" 总节省tokens: {stats.get('total_tokens_saved', 0)}") print(f" 最后压缩时间: {stats.get('last_compression_time', '从未')}") history = stats.get('compression_history', []) if history: print(f"\n📋 最近压缩记录:") for i, record in enumerate(history[-5:], 1): # 显示最近5条 print(f" {i}. ID:{record.original_id} " f"节省:{record.token_saved}tokens " f"比例:{record.compression_ratio:.1%}") else: print(f"❌ {stats.get('error', '未知错误')}") elif choice == "2": # 手动触发压缩 print("\n🔧 手动触发压缩...") max_memories = input("请输入最大处理数量 (默认50): ").strip() try: max_memories = int(max_memories) if max_memories else 50 except ValueError: max_memories = 50 result = mvp_manager.trigger_compression(max_memories=max_memories) if "error" not in result: print(f"✅ 压缩完成:") print(f" 处理记忆数: {result.get('processed', 0)}") print(f" 成功压缩: {result.get('compressed', 0)}") print(f" 节省tokens: {result.get('tokens_saved', 0)}") print(f" 处理时间: {result.get('duration', 0):.2f}s") else: print(f"❌ 压缩失败: {result.get('error', '未知错误')}") elif choice == "3": # 测试模式压缩 print("\n🧪 测试模式压缩(忽略时间限制)...") max_memories = input("请输入最大处理数量 (默认10): ").strip() try: max_memories = int(max_memories) if max_memories else 10 except ValueError: max_memories = 10 result = mvp_manager.trigger_compression(max_memories=max_memories, test_mode=True) if "error" not in result: print(f"✅ 测试压缩完成:") print(f" 处理记忆数: {result.get('processed', 0)}") print(f" 成功压缩: {result.get('compressed', 0)}") print(f" 节省tokens: {result.get('tokens_saved', 0)}") print(f" 处理时间: {result.get('duration', 0):.2f}s") # 显示详细结果 compression_results = result.get('compression_results', []) if compression_results: print(f"\n📋 压缩详情:") for i, comp_result in enumerate(compression_results, 1): print(f" {i}. ID:{comp_result['id']} " f"节省:{comp_result['tokens_saved']}tokens " f"比例:{comp_result['compression_ratio']:.1%}") else: print(f"❌ 测试压缩失败: {result.get('error', '未知错误')}") elif choice == "4": # 清理旧归档 print("\n🧹 清理旧归档文件...") days = input("请输入保留天数 (默认30天): ").strip() try: days = int(days) if days else 30 except ValueError: days = 30 success = mvp_manager.cleanup_old_archives(days) if success: print(f"✅ 清理完成,删除了超过{days}天的归档文件") else: print(f"❌ 清理失败") elif choice == "5": # 查看压缩配置 print("\n⚙️ 压缩配置信息:") print("=" * 40) status = mvp_manager.get_status_info() compression_info = status.get("compression_pipeline", {}) if compression_info.get("enabled"): print(f"📊 压缩配置:") print(f" 自动调度: {compression_info.get('auto_schedule', False)}") print(f" Token阈值: {compression_info.get('token_threshold', 0)}") print(f" 压缩比例: {compression_info.get('compression_ratio', 0):.1%}") print(f" 总压缩数: {compression_info.get('total_compressed', 0)}") print(f" 总节省tokens: {compression_info.get('total_tokens_saved', 0)}") print(f" 最后压缩: {compression_info.get('last_compression', '从未')}") else: print(f"❌ 压缩管线未启用") if "error" in compression_info: print(f" 错误: {compression_info['error']}") else: print("❌ 无效选项,请重新选择") except Exception as e: print(f"❌ 压缩管线功能出错: {e}") def check_environment(): """检查环境配置""" print("🔍 检查环境配置...") api_key = os.getenv("SILICONFLOW_API_KEY") if not api_key: print("⚠️ 未找到 SILICONFLOW_API_KEY") print("💡 请在 .env 文件中配置API密钥") return False try: import openai import qdrant_client print("✅ 核心依赖检查通过") return True except ImportError as e: print(f"❌ 依赖检查失败: {e}") print("💡 请运行: poetry install") return False def main(): """主函数""" # 初始化并发管理器 data_dir = "./memos_data" print(f"[DEBUG] CLI using data_dir = {data_dir}") concurrent_manager = MemOSConcurrentManager(data_dir) # 注册CLI进程 if not concurrent_manager.register_process("cli"): print("❌ 无法启动MemOS CLI,可能已达到并发限制") sys.exit(1) try: load_env_file() if not check_environment(): print("\n❌ 环境检查失败,请先配置环境") return while True: show_menu() try: choice = input("\n请输入选项 (0-9): ").strip() if choice == "0": print("👋 感谢使用MemOS!") break elif choice == "1": run_system_test() elif choice == "2": run_basic_demo() elif choice == "3": run_advanced_demo() elif choice == "4": run_interactive_chat() elif choice == "5": run_smart_assistant() elif choice == "6": show_usage_guide() elif choice == "7": remember_memory() elif choice == "8": recall_memories() elif choice == "9": daily_review() elif choice.upper() == "F": show_feedback_stats() elif choice.upper() == "C": show_capacity_management() elif choice.upper() == "P": show_compression_pipeline() elif choice.upper() == "Q": # 保留原有的Qwen测试功能 print("🔧 Qwen模型集成测试功能暂未实现") else: print("❌ 无效选项,请重新选择") except KeyboardInterrupt: print("\n\n👋 用户中断,退出程序") break except Exception as e: print(f"❌ 发生错误: {e}") input("\n按回车键继续...") finally: # 确保清理资源 concurrent_manager.cleanup() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server