test_client.py•6.51 kB
#!/usr/bin/env python3
"""
MCP客户端测试脚本
用于测试与员工管理MCP服务器的交互
"""
import asyncio
import json
import sys
from typing import Any, Dict, List
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class EmployeeMCPClient:
"""员工管理MCP客户端"""
def __init__(self):
self.session = None
async def connect(self):
"""连接到MCP服务器"""
print("🔌 正在连接到MCP服务器...")
# 连接到服务器
server_params = StdioServerParameters(
command="python",
args=["src/server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
# 初始化连接
await session.initialize()
print("✅ 已连接到MCP服务器")
# 获取可用工具
tools = await session.list_tools()
print(f"📋 发现 {len(tools.tools)} 个可用工具:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
# 运行交互式界面
await self.interactive_mode()
async def interactive_mode(self):
"""交互式模式"""
print("\n🎯 交互式模式已启动!")
print("💡 你可以使用以下命令:")
print(" - 'list' - 查看所有员工")
print(" - 'get <id>' - 获取指定ID的员工信息")
print(" - 'add <name> <surname> <salary> <level>' - 添加新员工")
print(" - 'search <lastname>' - 搜索员工")
print(" - 'active' - 查看在职员工")
print(" - 'level <level>' - 查看指定级别的员工")
print(" - 'sync' - 同步员工数据")
print(" - 'quit' - 退出")
print("\n" + "="*50)
while True:
try:
command = input("\n请输入命令: ").strip()
if command.lower() in ['quit', 'exit', 'q']:
print("👋 再见!")
break
elif command.lower() == 'list':
await self.list_employees()
elif command.startswith('get '):
employee_id = command.split(' ', 1)[1]
await self.get_employee(employee_id)
elif command.startswith('add '):
parts = command.split(' ', 4)
if len(parts) >= 5:
await self.add_employee(parts[1], parts[2], parts[3], parts[4])
else:
print("❌ 格式错误,请使用: add <name> <surname> <salary> <level>")
elif command.startswith('search '):
lastname = command.split(' ', 1)[1]
await self.search_employees(lastname)
elif command.lower() == 'active':
await self.get_active_employees()
elif command.startswith('level '):
level = command.split(' ', 1)[1]
await self.get_employees_by_level(level)
elif command.lower() == 'sync':
await self.sync_employees()
else:
print("❌ 未知命令,请输入 'quit' 查看帮助")
except KeyboardInterrupt:
print("\n👋 再见!")
break
except Exception as e:
print(f"❌ 错误: {e}")
async def list_employees(self):
"""获取所有员工"""
print("📋 正在获取所有员工...")
result = await self.session.call_tool("get_all_employees", {})
self._print_result(result)
async def get_employee(self, employee_id: str):
"""获取指定员工"""
print(f"👤 正在获取员工 ID: {employee_id}")
try:
result = await self.session.call_tool("get_employee_by_id", {"userId": int(employee_id)})
self._print_result(result)
except ValueError:
print("❌ 员工ID必须是数字")
async def add_employee(self, firstname: str, lastname: str, salary: str, level: str):
"""添加新员工"""
print(f"➕ 正在添加员工: {firstname} {lastname}")
try:
result = await self.session.call_tool("add_employee", {
"firstName": firstname,
"lastName": lastname,
"salary": float(salary),
"currency": "CNY",
"birthdate": "1990-01-01", # 默认生日
"isActive": True,
"level": level
})
self._print_result(result)
except ValueError:
print("❌ 工资必须是数字")
async def search_employees(self, lastname: str):
"""搜索员工"""
print(f"🔍 正在搜索姓氏为 '{lastname}' 的员工...")
result = await self.session.call_tool("search_employees", {"lastName": lastname})
self._print_result(result)
async def get_active_employees(self):
"""获取在职员工"""
print("💼 正在获取在职员工...")
result = await self.session.call_tool("get_active_employees", {})
self._print_result(result)
async def get_employees_by_level(self, level: str):
"""根据级别获取员工"""
print(f"📊 正在获取级别为 '{level}' 的员工...")
result = await self.session.call_tool("get_employees_by_level", {"level": level})
self._print_result(result)
async def sync_employees(self):
"""同步员工数据"""
print("🔄 正在同步员工数据...")
result = await self.session.call_tool("sync_employees", {})
self._print_result(result)
def _print_result(self, result):
"""打印结果"""
if result.content:
for content in result.content:
if hasattr(content, 'text'):
print(content.text)
else:
print(content)
async def main():
"""主函数"""
client = EmployeeMCPClient()
await client.connect()
if __name__ == "__main__":
asyncio.run(main())