Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
memos_mcp_server.py17.2 kB
#!/usr/bin/env python3 """ MemOS MCP服务器 v2.0 基于官方GeneralTextMemory标准重新设计,完全兼容当前MVP管理器 """ import asyncio import json import sys import os from typing import Dict, Any, List, Optional from datetime import datetime import traceback # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from mvp_memory import MVPMemoryManager class MemOSMCPServerV2: """MemOS MCP服务器 v2.0 - 基于官方标准重新设计""" def __init__(self): """初始化MCP服务器""" self.server_info = { "name": "memos-v2", "version": "2.0.0", "description": "MemOS智能记忆管理系统 v2.0 - 基于官方GeneralTextMemory标准" } # 初始化MVP管理器 try: self.mvp_manager = MVPMemoryManager() print("✅ MVP管理器初始化成功") except Exception as e: print(f"❌ MVP管理器初始化失败: {e}") raise async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """处理MCP请求""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") print(f"🔄 处理请求: {method}") if method == "initialize": return await self.initialize(params, request_id) elif method == "tools/list": return await self.list_tools(params, request_id) elif method == "tools/call": return await self.call_tool(params, request_id) else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"未知方法: {method}" } } except Exception as e: print(f"❌ 请求处理失败: {e}") traceback.print_exc() return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32603, "message": f"内部错误: {str(e)}" } } async def initialize(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """初始化MCP服务器""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": self.server_info } } async def list_tools(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """列出可用工具""" tools = [ { "name": "query_memos_context", "description": "查询MemOS记忆数据库并获取LLM组织的上下文", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "要查询的问题或关键词" }, "max_memories": { "type": "integer", "description": "最大返回记忆数量", "default": 5 }, "use_time_decay": { "type": "boolean", "description": "是否启用时间感知排序", "default": True }, "time_decay_tau": { "type": "number", "description": "时间衰减参数τ(天数)", "default": 30.0 }, "use_topic_drift_detection": { "type": "boolean", "description": "是否启用主题漂移检测", "default": True } }, "required": ["query"] } }, { "name": "add_memos_memory", "description": "向MemOS添加新的记忆", "inputSchema": { "type": "object", "properties": { "content": { "type": "string", "description": "记忆内容" }, "tags": { "type": "array", "items": {"type": "string"}, "description": "记忆标签", "default": [] } }, "required": ["content"] } }, { "name": "provide_memory_feedback", "description": "为记忆提供👍/👎反馈,影响后续检索排序", "inputSchema": { "type": "object", "properties": { "memory_id": { "type": "string", "description": "记忆ID" }, "feedback_type": { "type": "string", "enum": ["thumbs_up", "👍", "thumbs_down", "👎"], "description": "反馈类型: thumbs_up(👍) 或 thumbs_down(👎)" } }, "required": ["memory_id", "feedback_type"] } }, { "name": "get_feedback_stats", "description": "获取记忆反馈统计信息", "inputSchema": { "type": "object", "properties": {}, "required": [] } } ] return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools } } async def call_tool(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """调用工具""" tool_name = params.get("name") arguments = params.get("arguments", {}) print(f"🔧 调用工具: {tool_name}") print(f"📝 参数: {arguments}") print(f"🔍 完整params: {params}") # 添加完整参数日志 try: if tool_name == "query_memos_context": return await self.query_memos_context(arguments, request_id) elif tool_name == "add_memos_memory": return await self.add_memos_memory(arguments, request_id) elif tool_name == "provide_memory_feedback": return await self.provide_memory_feedback(arguments, request_id) elif tool_name == "get_feedback_stats": return await self.get_feedback_stats(arguments, request_id) else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"未知工具: {tool_name}" } } except Exception as e: print(f"❌ 工具调用失败: {e}") traceback.print_exc() return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"工具执行错误: {str(e)}" } } async def query_memos_context(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """查询MemOS上下文 - v2.0优化版""" query = args.get("query", "") max_memories = args.get("max_memories", 5) use_time_decay = args.get("use_time_decay", True) time_decay_tau = args.get("time_decay_tau", 30.0) use_topic_drift_detection = args.get("use_topic_drift_detection", True) if not query: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": "错误: 查询内容不能为空" } ] } } print(f"🔍 执行查询: {query}") print(f"📊 参数: max_memories={max_memories}, time_decay={use_time_decay}, tau={time_decay_tau}") # 使用MVP管理器进行智能查询 memories = self.mvp_manager.recall( query, top_k=max_memories, use_reranker=True, use_time_decay=use_time_decay, time_decay_tau=time_decay_tau, use_topic_drift_detection=use_topic_drift_detection ) print(f"✅ 查询完成,找到 {len(memories)} 条记忆") # 获取详细状态信息 status_info = self.mvp_manager.get_status_info() model_info = f"{status_info['model']} + {status_info['reranker']}" if status_info['mode'] == 'enhanced' else status_info['model'] # 格式化返回结果 context_text = f"""MemOS查询结果 v2.0: 查询: {query} 结果: 找到{len(memories)}条相关记忆 技术栈: {model_info} 检索模式: {'增强版(含重排器)' if status_info['mode'] == 'enhanced' else '基础版'} 相关记忆详情: """ for i, memory in enumerate(memories, 1): tags_str = ", ".join(memory.get('tags', [])) if memory.get('tags') else "无标签" score_info = f"向量分数: {memory.get('score', 0):.4f}" if 'rerank_score' in memory: score_info += f", 重排分数: {memory['rerank_score']:.4f}" # 添加时间感知信息 if 'time_decay_factor' in memory: time_decay_factor = memory['time_decay_factor'] days_ago = memory.get('days_ago', 0) if days_ago != float('inf'): score_info += f", 时间权重: {time_decay_factor:.3f}({days_ago:.1f}天前)" else: score_info += f", 时间权重: {time_decay_factor:.3f}(无时间戳)" context_text += f"{i}. {memory['content']} ({score_info}, 标签: {tags_str})\n" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": context_text } ] } } async def add_memos_memory(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """添加新记忆 - v2.0优化版""" content = args.get("content", "") tags = args.get("tags", []) if not content: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": "错误: 记忆内容不能为空" } ] } } print(f"💾 添加记忆: {content[:50]}...") print(f"🏷️ 标签: {tags}") # 使用MVP管理器添加记忆 memory_id = self.mvp_manager.remember(content, tags) if memory_id: result_text = f"""✅ 记忆添加成功 记忆ID: {memory_id} 内容: {content} 标签: {', '.join(tags) if tags else '无标签'} 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 该记忆已使用Qwen3-Embedding-0.6B模型生成1024维向量并存储到Qdrant向量数据库中。""" else: result_text = "❌ 记忆添加失败,请检查系统状态" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": result_text } ] } } async def provide_memory_feedback(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """提供记忆反馈 - v2.0优化版""" memory_id = args.get("memory_id", "") feedback_type = args.get("feedback_type", "") if not memory_id or not feedback_type: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": "错误: 记忆ID和反馈类型不能为空" } ] } } # 标准化反馈类型 if feedback_type in ["👍", "thumbs_up"]: feedback_type = "thumbs_up" elif feedback_type in ["👎", "thumbs_down"]: feedback_type = "thumbs_down" else: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": f"错误: 无效的反馈类型 '{feedback_type}',请使用 'thumbs_up' 或 'thumbs_down'" } ] } } print(f"👍👎 记忆反馈: ID={memory_id}, 类型={feedback_type}") # 使用MVP管理器提供反馈 success = self.mvp_manager.provide_feedback(memory_id, feedback_type) if success: emoji = "👍" if feedback_type == "thumbs_up" else "👎" result_text = f"""✅ 反馈记录成功 记忆ID: {memory_id} 反馈类型: {feedback_type} {emoji} 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 该反馈将影响后续检索时的排序权重,帮助系统学习您的偏好。""" else: result_text = f"❌ 反馈记录失败,记忆ID '{memory_id}' 可能不存在" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": result_text } ] } } async def get_feedback_stats(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """获取反馈统计 - v2.0优化版""" print("📊 获取反馈统计信息") # 使用MVP管理器获取统计信息 stats = self.mvp_manager.get_feedback_stats() result_text = f"""📊 MemOS反馈统计信息 👍 正面反馈: {stats.get('thumbs_up', 0)} 条 👎 负面反馈: {stats.get('thumbs_down', 0)} 条 📝 总记忆数: {stats.get('total_memories', 0)} 条 📈 反馈覆盖率: {stats.get('feedback_coverage', 0):.1f}% 反馈分布: - 高质量记忆 (👍): {stats.get('high_quality_memories', 0)} 条 - 低质量记忆 (👎): {stats.get('low_quality_memories', 0)} 条 - 未评价记忆: {stats.get('unrated_memories', 0)} 条 统计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}""" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": result_text } ] } } async def main(): """主函数""" print("🚀 启动MemOS MCP服务器 v2.0") server = MemOSMCPServerV2() # 读取stdin并处理请求 while True: try: line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline) if not line: break line = line.strip() if not line: continue request = json.loads(line) response = await server.handle_request(request) print(json.dumps(response), flush=True) except json.JSONDecodeError as e: print(f"❌ JSON解析错误: {e}", file=sys.stderr) except Exception as e: print(f"❌ 处理错误: {e}", file=sys.stderr) traceback.print_exc() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server