test_standalone_mcp.py•3.23 kB
#!/usr/bin/env python3
"""
测试独立MCP服务器
"""
import json
import subprocess
import sys
def test_mcp_flow():
"""测试完整的MCP流程"""
print("🧪 测试独立MCP服务器")
print("=" * 50)
# 测试请求序列
requests = [
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
},
{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
},
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "query_memos_context",
"arguments": {
"query": "项目技术",
"max_memories": 3
}
}
},
{
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": {
"name": "add_memos_memory",
"arguments": {
"content": "独立MCP服务器测试成功",
"tags": ["测试", "MCP", "成功"]
}
}
}
]
for i, request in enumerate(requests, 1):
print(f"\n{i}. 测试 {request['method']}...")
try:
# 创建输入
input_data = json.dumps(request) + "\n"
# 运行服务器
process = subprocess.Popen(
["python3", "standalone_mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd="/home/qqinshu/视频/MemOS"
)
# 发送请求
stdout, stderr = process.communicate(input=input_data, timeout=10)
if stderr:
print(f"❌ 错误: {stderr}")
continue
if stdout:
try:
response = json.loads(stdout.strip())
print(f"✅ 成功")
if request['method'] == "tools/list":
tools = response.get("result", {}).get("tools", [])
print(f" 发现 {len(tools)} 个工具:")
for tool in tools:
print(f" - {tool['name']}")
elif request['method'] == "tools/call":
content = response.get("result", {}).get("content", [])
if content:
text = content[0].get("text", "")
print(f" 返回内容: {text[:100]}...")
except json.JSONDecodeError:
print(f"❌ 响应格式错误: {stdout}")
else:
print("❌ 无响应")
except subprocess.TimeoutExpired:
print("❌ 超时")
process.kill()
except Exception as e:
print(f"❌ 异常: {e}")
if __name__ == "__main__":
test_mcp_flow()