simple_mcp_server.py•10.7 kB
#!/usr/bin/env python3
"""
简化的MemOS MCP服务器
修复协议兼容性问题
"""
import json
import sys
import asyncio
from pathlib import Path
from typing import Any, Dict
# 添加当前目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from advanced_examples import AdvancedMemOS, load_env_file
class SimpleMCPServer:
"""简化的MCP服务器"""
def __init__(self):
# 加载环境变量
load_env_file()
# 初始化MemOS
self.memos = AdvancedMemOS("./mcp_memos_data")
# 添加一些示例数据
self._init_sample_data()
def _init_sample_data(self):
"""初始化示例数据"""
try:
sample_memories = [
("我正在使用MemOS进行智能记忆管理", ["MemOS", "项目"]),
("Claude可以通过MCP调用MemOS获取记忆上下文", ["Claude", "MCP", "技术"]),
("当前项目目标是实现AI记忆增强对话", ["项目目标", "AI"]),
("使用DeepSeek-V3作为LLM模型", ["技术栈", "LLM"]),
("SiliconFlow提供API服务", ["API", "服务商"]),
]
for content, tags in sample_memories:
self.memos.add_memory(content, tags=tags)
except Exception as e:
print(f"初始化示例数据失败: {e}", file=sys.stderr)
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP请求"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "memos",
"version": "1.0.0"
}
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [
{
"name": "query_memos_context",
"description": "查询MemOS记忆数据库并获取LLM组织的上下文",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要查询的问题或关键词"
},
"max_memories": {
"type": "integer",
"description": "最大返回记忆数量",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "add_memos_memory",
"description": "向MemOS添加新的记忆",
"inputSchema": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "记忆内容"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "记忆标签",
"default": []
}
},
"required": ["content"]
}
}
]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "query_memos_context":
return await self.query_memos_context(arguments, request_id)
elif tool_name == "add_memos_memory":
return await self.add_memos_memory(arguments, request_id)
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Tool not found: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def query_memos_context(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""查询MemOS上下文"""
try:
query = args.get("query", "")
max_memories = args.get("max_memories", 5)
if not query:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "错误: 查询内容不能为空"
}
]
}
}
# 调用MemOS智能查询
result = self.memos.intelligent_query(query, max_memories)
# 格式化返回结果
context_text = f"""MemOS查询结果:
查询: {query}
{result['query_summary']}
LLM组织的上下文:
{result['llm_context']}
相关记忆详情:
"""
for i, memory in enumerate(result['relevant_memories'], 1):
tags_str = ", ".join(memory['tags']) if memory['tags'] else "无标签"
context_text += f"{i}. {memory['content']} (相关度: {memory['score']:.2f}, 标签: {tags_str})\n"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": context_text
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": f"查询失败: {str(e)}"
}
]
}
}
async def add_memos_memory(self, args: Dict[str, Any], request_id: Any) -> Dict[str, Any]:
"""添加记忆到MemOS"""
try:
content = args.get("content", "")
tags = args.get("tags", [])
if not content:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "错误: 记忆内容不能为空"
}
]
}
}
# 添加记忆
memory_id = self.memos.add_memory(content, tags=tags)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": f"成功添加记忆 #{memory_id}: {content}"
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": f"添加记忆失败: {str(e)}"
}
]
}
}
async def main():
"""主函数"""
server = SimpleMCPServer()
# 读取stdin的JSON-RPC请求
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
request = json.loads(line.strip())
response = await server.handle_request(request)
# 输出响应
print(json.dumps(response), flush=True)
except json.JSONDecodeError:
continue
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Server error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
asyncio.run(main())