memos_mcp_server.py•17.2 kB
#!/usr/bin/env python3
"""
MemOS MCP服务器 v2.0
基于官方GeneralTextMemory标准重新设计,完全兼容当前MVP管理器
"""
import asyncio
import json
import sys
import os
from typing import Dict, Any, List, Optional
from datetime import datetime
import traceback
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from mvp_memory import MVPMemoryManager
class MemOSMCPServerV2:
"""MemOS MCP服务器 v2.0 - 基于官方标准重新设计"""
def __init__(self):
"""初始化MCP服务器"""
self.server_info = {
"name": "memos-v2",
"version": "2.0.0",
"description": "MemOS智能记忆管理系统 v2.0 - 基于官方GeneralTextMemory标准"
}
# 初始化MVP管理器
try:
self.mvp_manager = MVPMemoryManager()
print("✅ MVP管理器初始化成功")
except Exception as e:
print(f"❌ MVP管理器初始化失败: {e}")
raise
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP请求"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
print(f"🔄 处理请求: {method}")
if method == "initialize":
return await self.initialize(params, request_id)
elif method == "tools/list":
return await self.list_tools(params, request_id)
elif method == "tools/call":
return await self.call_tool(params, request_id)
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"未知方法: {method}"
}
}
except Exception as e:
print(f"❌ 请求处理失败: {e}")
traceback.print_exc()
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
async def initialize(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""初始化MCP服务器"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": self.server_info
}
}
async def list_tools(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""列出可用工具"""
tools = [
{
"name": "query_memos_context",
"description": "查询MemOS记忆数据库并获取LLM组织的上下文",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要查询的问题或关键词"
},
"max_memories": {
"type": "integer",
"description": "最大返回记忆数量",
"default": 5
},
"use_time_decay": {
"type": "boolean",
"description": "是否启用时间感知排序",
"default": True
},
"time_decay_tau": {
"type": "number",
"description": "时间衰减参数τ(天数)",
"default": 30.0
},
"use_topic_drift_detection": {
"type": "boolean",
"description": "是否启用主题漂移检测",
"default": True
}
},
"required": ["query"]
}
},
{
"name": "add_memos_memory",
"description": "向MemOS添加新的记忆",
"inputSchema": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "记忆内容"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "记忆标签",
"default": []
}
},
"required": ["content"]
}
},
{
"name": "provide_memory_feedback",
"description": "为记忆提供👍/👎反馈,影响后续检索排序",
"inputSchema": {
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "记忆ID"
},
"feedback_type": {
"type": "string",
"enum": ["thumbs_up", "👍", "thumbs_down", "👎"],
"description": "反馈类型: thumbs_up(👍) 或 thumbs_down(👎)"
}
},
"required": ["memory_id", "feedback_type"]
}
},
{
"name": "get_feedback_stats",
"description": "获取记忆反馈统计信息",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
}
]
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
async def call_tool(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""调用工具"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
print(f"🔧 调用工具: {tool_name}")
print(f"📝 参数: {arguments}")
print(f"🔍 完整params: {params}") # 添加完整参数日志
try:
if tool_name == "query_memos_context":
return await self.query_memos_context(arguments, request_id)
elif tool_name == "add_memos_memory":
return await self.add_memos_memory(arguments, request_id)
elif tool_name == "provide_memory_feedback":
return await self.provide_memory_feedback(arguments, request_id)
elif tool_name == "get_feedback_stats":
return await self.get_feedback_stats(arguments, request_id)
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"未知工具: {tool_name}"
}
}
except Exception as e:
print(f"❌ 工具调用失败: {e}")
traceback.print_exc()
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"工具执行错误: {str(e)}"
}
}
async def query_memos_context(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""查询MemOS上下文 - v2.0优化版"""
query = args.get("query", "")
max_memories = args.get("max_memories", 5)
use_time_decay = args.get("use_time_decay", True)
time_decay_tau = args.get("time_decay_tau", 30.0)
use_topic_drift_detection = args.get("use_topic_drift_detection", True)
if not query:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "错误: 查询内容不能为空"
}
]
}
}
print(f"🔍 执行查询: {query}")
print(f"📊 参数: max_memories={max_memories}, time_decay={use_time_decay}, tau={time_decay_tau}")
# 使用MVP管理器进行智能查询
memories = self.mvp_manager.recall(
query,
top_k=max_memories,
use_reranker=True,
use_time_decay=use_time_decay,
time_decay_tau=time_decay_tau,
use_topic_drift_detection=use_topic_drift_detection
)
print(f"✅ 查询完成,找到 {len(memories)} 条记忆")
# 获取详细状态信息
status_info = self.mvp_manager.get_status_info()
model_info = f"{status_info['model']} + {status_info['reranker']}" if status_info['mode'] == 'enhanced' else status_info['model']
# 格式化返回结果
context_text = f"""MemOS查询结果 v2.0:
查询: {query}
结果: 找到{len(memories)}条相关记忆
技术栈: {model_info}
检索模式: {'增强版(含重排器)' if status_info['mode'] == 'enhanced' else '基础版'}
相关记忆详情:
"""
for i, memory in enumerate(memories, 1):
tags_str = ", ".join(memory.get('tags', [])) if memory.get('tags') else "无标签"
score_info = f"向量分数: {memory.get('score', 0):.4f}"
if 'rerank_score' in memory:
score_info += f", 重排分数: {memory['rerank_score']:.4f}"
# 添加时间感知信息
if 'time_decay_factor' in memory:
time_decay_factor = memory['time_decay_factor']
days_ago = memory.get('days_ago', 0)
if days_ago != float('inf'):
score_info += f", 时间权重: {time_decay_factor:.3f}({days_ago:.1f}天前)"
else:
score_info += f", 时间权重: {time_decay_factor:.3f}(无时间戳)"
context_text += f"{i}. {memory['content']} ({score_info}, 标签: {tags_str})\n"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": context_text
}
]
}
}
async def add_memos_memory(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""添加新记忆 - v2.0优化版"""
content = args.get("content", "")
tags = args.get("tags", [])
if not content:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "错误: 记忆内容不能为空"
}
]
}
}
print(f"💾 添加记忆: {content[:50]}...")
print(f"🏷️ 标签: {tags}")
# 使用MVP管理器添加记忆
memory_id = self.mvp_manager.remember(content, tags)
if memory_id:
result_text = f"""✅ 记忆添加成功
记忆ID: {memory_id}
内容: {content}
标签: {', '.join(tags) if tags else '无标签'}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
该记忆已使用Qwen3-Embedding-0.6B模型生成1024维向量并存储到Qdrant向量数据库中。"""
else:
result_text = "❌ 记忆添加失败,请检查系统状态"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": result_text
}
]
}
}
async def provide_memory_feedback(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""提供记忆反馈 - v2.0优化版"""
memory_id = args.get("memory_id", "")
feedback_type = args.get("feedback_type", "")
if not memory_id or not feedback_type:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "错误: 记忆ID和反馈类型不能为空"
}
]
}
}
# 标准化反馈类型
if feedback_type in ["👍", "thumbs_up"]:
feedback_type = "thumbs_up"
elif feedback_type in ["👎", "thumbs_down"]:
feedback_type = "thumbs_down"
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": f"错误: 无效的反馈类型 '{feedback_type}',请使用 'thumbs_up' 或 'thumbs_down'"
}
]
}
}
print(f"👍👎 记忆反馈: ID={memory_id}, 类型={feedback_type}")
# 使用MVP管理器提供反馈
success = self.mvp_manager.provide_feedback(memory_id, feedback_type)
if success:
emoji = "👍" if feedback_type == "thumbs_up" else "👎"
result_text = f"""✅ 反馈记录成功
记忆ID: {memory_id}
反馈类型: {feedback_type} {emoji}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
该反馈将影响后续检索时的排序权重,帮助系统学习您的偏好。"""
else:
result_text = f"❌ 反馈记录失败,记忆ID '{memory_id}' 可能不存在"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": result_text
}
]
}
}
async def get_feedback_stats(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""获取反馈统计 - v2.0优化版"""
print("📊 获取反馈统计信息")
# 使用MVP管理器获取统计信息
stats = self.mvp_manager.get_feedback_stats()
result_text = f"""📊 MemOS反馈统计信息
👍 正面反馈: {stats.get('thumbs_up', 0)} 条
👎 负面反馈: {stats.get('thumbs_down', 0)} 条
📝 总记忆数: {stats.get('total_memories', 0)} 条
📈 反馈覆盖率: {stats.get('feedback_coverage', 0):.1f}%
反馈分布:
- 高质量记忆 (👍): {stats.get('high_quality_memories', 0)} 条
- 低质量记忆 (👎): {stats.get('low_quality_memories', 0)} 条
- 未评价记忆: {stats.get('unrated_memories', 0)} 条
统计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": result_text
}
]
}
}
async def main():
"""主函数"""
print("🚀 启动MemOS MCP服务器 v2.0")
server = MemOSMCPServerV2()
# 读取stdin并处理请求
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
request = json.loads(line)
response = await server.handle_request(request)
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
print(f"❌ JSON解析错误: {e}", file=sys.stderr)
except Exception as e:
print(f"❌ 处理错误: {e}", file=sys.stderr)
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())