test_maxkb_integration.py•3.47 kB
#!/usr/bin/env python3
"""
测试MaxKB MCP集成
模拟MaxKB的请求流程来验证服务器响应
"""
import requests
import json
import time
def test_mcp_server():
"""测试MCP服务器"""
base_url = "http://172.20.10.2:8000"
print("🔍 测试MaxKB MCP集成...")
# 1. 健康检查
print("\n1. 健康检查...")
try:
response = requests.get(f"{base_url}/health")
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
except Exception as e:
print(f"❌ 健康检查失败: {e}")
return
# 2. MCP信息检查
print("\n2. MCP信息检查...")
try:
response = requests.get(f"{base_url}/mcp")
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
except Exception as e:
print(f"❌ MCP信息检查失败: {e}")
# 3. 初始化请求
print("\n3. 初始化请求...")
try:
init_request = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "maxkb",
"version": "1.0.0"
}
},
"id": "init"
}
response = requests.post(f"{base_url}/mcp", json=init_request)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
except Exception as e:
print(f"❌ 初始化请求失败: {e}")
# 4. 工具列表请求
print("\n4. 工具列表请求...")
try:
tools_request = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": "tools_list"
}
response = requests.post(f"{base_url}/mcp", json=tools_request)
print(f"状态码: {response.status_code}")
result = response.json()
print(f"工具数量: {len(result.get('result', {}).get('tools', []))}")
print(f"前3个工具: {[tool['name'] for tool in result.get('result', {}).get('tools', [])[:3]]}")
except Exception as e:
print(f"❌ 工具列表请求失败: {e}")
# 5. 工具调用测试
print("\n5. 工具调用测试...")
try:
call_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "get_all_employees",
"arguments": {}
},
"id": "tool_call"
}
response = requests.post(f"{base_url}/mcp", json=call_request)
print(f"状态码: {response.status_code}")
result = response.json()
print(f"调用结果: {result.get('result', {}).get('content', [{}])[0].get('text', '')[:100]}...")
except Exception as e:
print(f"❌ 工具调用测试失败: {e}")
# 6. Ping测试
print("\n6. Ping测试...")
try:
ping_request = {
"jsonrpc": "2.0",
"method": "ping",
"params": {},
"id": "ping"
}
response = requests.post(f"{base_url}/mcp", json=ping_request)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
except Exception as e:
print(f"❌ Ping测试失败: {e}")
print("\n✅ 测试完成!")
if __name__ == "__main__":
test_mcp_server()