Skip to main content
Glama
mcp_client_example.py4.09 kB
#!/usr/bin/env python3 """ MCP 客户端示例 演示如何与微信 MCP 服务器交互。 """ import asyncio import json import subprocess import sys from typing import Dict, Any, Optional class MCPClient: """用于测试微信 MCP 服务器的简单 MCP 客户端。""" def __init__(self, server_path: str): self.server_path = server_path self.process = None self.request_id = 0 async def start_server(self): """启动 MCP 服务器进程。""" self.process = await asyncio.create_subprocess_exec( sys.executable, self.server_path, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) print("MCP Server started") async def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """向服务器发送 JSON-RPC 请求。""" self.request_id += 1 request = { "jsonrpc": "2.0", "method": method, "params": params or {}, "id": self.request_id } request_json = json.dumps(request) + "\n" self.process.stdin.write(request_json.encode()) await self.process.stdin.drain() # 读取响应 response_line = await self.process.stdout.readline() response = json.loads(response_line.decode().strip()) return response async def initialize(self): """初始化 MCP 服务器。""" params = { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "wechat-mcp-client", "version": "1.0.0" } } response = await self.send_request("initialize", params) print(f"Initialize response: {response}") return response async def list_tools(self): """列出可用工具。""" response = await self.send_request("tools/list") print(f"Tools list: {response}") return response async def call_tool(self, tool_name: str, arguments: Dict[str, Any]): """调用特定工具。""" params = { "name": tool_name, "arguments": arguments } response = await self.send_request("tools/call", params) print(f"Tool call response: {response}") return response async def close(self): """关闭客户端并终止服务器。""" if self.process: self.process.terminate() await self.process.wait() async def main(): """主示例函数。""" # MCP 服务器路径 server_path = "../src/mcp_server.py" client = MCPClient(server_path) try: # 启动服务器 await client.start_server() # 初始化连接 await client.initialize() # 列出可用工具 await client.list_tools() # 示例 1:发送简单消息 print("\n=== Example 1: Send Message ===") await client.call_tool("send_wechat_message", { "contact_name": "文件传输助手", "message": "Hello from MCP! This is a test message." }) # 示例 2:安排消息 print("\n=== Example 2: Schedule Message ===") await client.call_tool("schedule_wechat_message", { "contact_name": "文件传输助手", "message": "This is a scheduled message sent 5 seconds later!", "delay_seconds": 5 }) # 等待一段时间查看安排的消息 print("Waiting for scheduled message...") await asyncio.sleep(6) except Exception as e: print(f"Error: {e}") finally: await client.close() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/1052666/WeChat-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server