Skip to main content
Glama

Employee Management MCP Server

by y735832496
client_example.py4.83 kB
#!/usr/bin/env python3 """ MCP客户端示例 演示如何连接和使用员工管理系统MCP服务器 """ import asyncio import json import sys import os # 添加src目录到Python路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from mcp.client import ClientSession from mcp.types import TextContent class MCPClientExample: """MCP客户端示例""" def __init__(self, host: str = "localhost", port: int = 8000): self.host = host self.port = port self.session = None async def connect(self): """连接到MCP服务器""" try: print(f"🔗 正在连接到MCP服务器: {self.host}:{self.port}") self.session = await ClientSession.connect(f"http://{self.host}:{self.port}") print("✅ 连接成功!") return True except Exception as e: print(f"❌ 连接失败: {e}") return False async def list_tools(self): """列出所有可用工具""" try: print("\n📋 获取可用工具列表...") tools = await self.session.list_tools() print(f"✅ 找到 {len(tools.tools)} 个工具:") for tool in tools.tools: print(f" 🔧 {tool.name}") print(f" 描述: {tool.description}") print(f" 参数: {json.dumps(tool.inputSchema, indent=6, ensure_ascii=False)}") print() return tools.tools except Exception as e: print(f"❌ 获取工具列表失败: {e}") return [] async def call_tool(self, name: str, arguments: dict): """调用工具""" try: print(f"\n🔄 调用工具: {name}") print(f" 参数: {json.dumps(arguments, indent=2, ensure_ascii=False)}") result = await self.session.call_tool(name, arguments) print("✅ 工具调用成功!") for content in result.content: if isinstance(content, TextContent): print(f" 结果: {content.text}") return result except Exception as e: print(f"❌ 工具调用失败: {e}") return None async def demo_employee_operations(self): """演示员工管理操作""" print("\n🎯 开始员工管理操作演示") print("=" * 50) # 1. 查询所有员工 print("\n1️⃣ 查询所有员工") await self.call_tool("get_all_employees", {}) # 2. 新增员工 print("\n2️⃣ 新增员工") new_employee = { "firstName": "小明", "lastName": "李", "salary": 7500.0, "currency": "CNY", "birthdate": "1992-05-15", "isActive": True, "level": "2" } await self.call_tool("add_employee", new_employee) # 3. 搜索员工 print("\n3️⃣ 搜索姓李的员工") await self.call_tool("search_employees", {"lastName": "李"}) # 4. 查询在职员工 print("\n4️⃣ 查询在职员工") await self.call_tool("get_active_employees", {}) # 5. 根据级别查询员工 print("\n5️⃣ 查询级别为2的员工") await self.call_tool("get_employees_by_level", {"level": "2"}) print("\n✅ 演示完成!") async def close(self): """关闭连接""" if self.session: await self.session.close() print("🔌 连接已关闭") async def main(): """主函数""" print("🚀 MCP客户端示例") print("=" * 50) # 创建客户端 client = MCPClientExample() try: # 连接到服务器 if not await client.connect(): return # 列出工具 tools = await client.list_tools() if not tools: return # 演示员工管理操作 await client.demo_employee_operations() except KeyboardInterrupt: print("\n🛑 用户中断") except Exception as e: print(f"❌ 程序异常: {e}") finally: await client.close() if __name__ == "__main__": print("📝 使用说明:") print("1. 确保MCP服务器已启动 (运行 python start_server.py)") print("2. 确保后端API服务已启动 (http://localhost:10086)") print("3. 运行此脚本进行测试") print("=" * 50) try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 程序已退出") except Exception as e: print(f"❌ 程序异常退出: {e}") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y735832496/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server