client_example.py•4.83 kB
#!/usr/bin/env python3
"""
MCP客户端示例
演示如何连接和使用员工管理系统MCP服务器
"""
import asyncio
import json
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from mcp.client import ClientSession
from mcp.types import TextContent
class MCPClientExample:
"""MCP客户端示例"""
def __init__(self, host: str = "localhost", port: int = 8000):
self.host = host
self.port = port
self.session = None
async def connect(self):
"""连接到MCP服务器"""
try:
print(f"🔗 正在连接到MCP服务器: {self.host}:{self.port}")
self.session = await ClientSession.connect(f"http://{self.host}:{self.port}")
print("✅ 连接成功!")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
async def list_tools(self):
"""列出所有可用工具"""
try:
print("\n📋 获取可用工具列表...")
tools = await self.session.list_tools()
print(f"✅ 找到 {len(tools.tools)} 个工具:")
for tool in tools.tools:
print(f" 🔧 {tool.name}")
print(f" 描述: {tool.description}")
print(f" 参数: {json.dumps(tool.inputSchema, indent=6, ensure_ascii=False)}")
print()
return tools.tools
except Exception as e:
print(f"❌ 获取工具列表失败: {e}")
return []
async def call_tool(self, name: str, arguments: dict):
"""调用工具"""
try:
print(f"\n🔄 调用工具: {name}")
print(f" 参数: {json.dumps(arguments, indent=2, ensure_ascii=False)}")
result = await self.session.call_tool(name, arguments)
print("✅ 工具调用成功!")
for content in result.content:
if isinstance(content, TextContent):
print(f" 结果: {content.text}")
return result
except Exception as e:
print(f"❌ 工具调用失败: {e}")
return None
async def demo_employee_operations(self):
"""演示员工管理操作"""
print("\n🎯 开始员工管理操作演示")
print("=" * 50)
# 1. 查询所有员工
print("\n1️⃣ 查询所有员工")
await self.call_tool("get_all_employees", {})
# 2. 新增员工
print("\n2️⃣ 新增员工")
new_employee = {
"firstName": "小明",
"lastName": "李",
"salary": 7500.0,
"currency": "CNY",
"birthdate": "1992-05-15",
"isActive": True,
"level": "2"
}
await self.call_tool("add_employee", new_employee)
# 3. 搜索员工
print("\n3️⃣ 搜索姓李的员工")
await self.call_tool("search_employees", {"lastName": "李"})
# 4. 查询在职员工
print("\n4️⃣ 查询在职员工")
await self.call_tool("get_active_employees", {})
# 5. 根据级别查询员工
print("\n5️⃣ 查询级别为2的员工")
await self.call_tool("get_employees_by_level", {"level": "2"})
print("\n✅ 演示完成!")
async def close(self):
"""关闭连接"""
if self.session:
await self.session.close()
print("🔌 连接已关闭")
async def main():
"""主函数"""
print("🚀 MCP客户端示例")
print("=" * 50)
# 创建客户端
client = MCPClientExample()
try:
# 连接到服务器
if not await client.connect():
return
# 列出工具
tools = await client.list_tools()
if not tools:
return
# 演示员工管理操作
await client.demo_employee_operations()
except KeyboardInterrupt:
print("\n🛑 用户中断")
except Exception as e:
print(f"❌ 程序异常: {e}")
finally:
await client.close()
if __name__ == "__main__":
print("📝 使用说明:")
print("1. 确保MCP服务器已启动 (运行 python start_server.py)")
print("2. 确保后端API服务已启动 (http://localhost:10086)")
print("3. 运行此脚本进行测试")
print("=" * 50)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 程序已退出")
except Exception as e:
print(f"❌ 程序异常退出: {e}")
sys.exit(1)