Skip to main content
Glama

Meeting Room MCP Server

by ma1238906
readme.md21.7 kB
# 简介 该项目实现了一个入门的MCP Client和MCP Server交互的功能。 MCP Client中使用了React Agent模式来实现工具的调用。 MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。 ## 创建Agent 使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。 在项目目录下创建‘react_agent.py’文件 ```python import asyncio import json import os import datetime from openai import AsyncOpenAI from mcp_client import MCPClient from pprint import pprint class ReActAgent: def __init__(self, mcp_client: MCPClient): # 从环境变量获取API密钥,如果不存在则使用默认值 # 修改base_url来使用不同的大模型供应商 # React模式一次使用一个工具 api_key = os.environ.get("API_KEY", "your llm api key") self.llm = AsyncOpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) self.mcp_client = mcp_client self.messages = [] self.system_prompt = """ 你是一个专业会议室预定AI助手,必须严格遵循以下规则运行: 【核心机制】 采用Thought→Action→Observation循环工作流: 1. 解析用户需求时,必须明确识别以下要素: - 精确到分钟的时间段(自动补全日期格式) - 预定目标(查询/预定) 2. 工具调用规范: 只能使用下列工具且每次只能调用1个: - list_idle_meeting_rooms(必填参数:start_time, end_time) - book_meeting_room(必填参数:room_id, start_time, end_time) 参数要求: • 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式 • room_id必须从list_idle_meeting_rooms返回结果中选取 3. 执行流程强制要求: (1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性 【输出规则】 1. 未完成时输出json格式: { "thought": "推理过程(必须包含时间转换逻辑)", "action": {"name": "工具名", "args": {"参数1":"值1"}}, "observation": "工具返回原始结果" } 2. 未完成标准: - 未调用工具 - 未返回最终答案 - 明确提示预定失败 3. 完成标准:当且仅当满足以下条件时输出最终答案: - book_meeting_room返回预定成功提示 - 包含有效的会议室ID 4. 最终答案json格式: { "final_answer": "预定成功:{room_id} ({start_time}至{end_time})" } 【校验机制】 1. 时间参数三重验证: (1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$ (2) 时间逻辑:start_time < end_time (3) 时段冲突检测(通过list_idle_meeting_rooms实现) 2. 错误处理: - 参数缺失时精确指出缺失项,示例: {"action":{"name":"request_params","args":{"missing_params":["start_time"]}}} - 工具调用失败时返回原始错误信息 【终止条件】 1. 成功终止:输出含预定成功信息的final_answer 2. 失败终止:连续3次参数请求未获有效输入时返回: {"final_answer": "ERR_MAX_RETRY: 超过最大重试次数"} 【错误处理与结果验证】 1. 必须严格检查工具返回结果中是否包含"失败"、"不存在"、"错误"等关键词 2. 当observation中包含任何失败信息时: - 禁止输出final_answer - 必须重新进行工具调用解决问题 3. 验证规则: - 必须逐字分析observation返回内容 - 当且仅当observation明确包含"{room_id}预定成功"字样时才可输出最终答案 - 任何失败情况必须重试或告知用户失败原因 【列表解析规则】 1. list_idle_meeting_rooms返回结果必须按原格式提取,禁止自行编造会议室ID 2. 必须严格匹配返回的会议室ID格式,不得修改或推测 3. 如返回格式为["会议室1", "会议室2", "会议室3"],则room_id必须是完全一致的"会议室1"、"会议室2"或"会议室3" 【强制验证机制】 1. 每次工具调用后必须执行: if "失败" in observation or "不存在" in observation or "错误" in observation: # 必须处理错误,不允许输出final_answer # 必须尝试替代方案或告知用户 2. 禁止性规则: - 严禁在任何失败场景下输出含"预定成功"的消息 - 严禁忽略工具返回的任何错误或警告信息 【强制纠错与重试机制】 1. 当工具返回错误时,必须执行以下步骤: - 分析错误原因(例如:"会议室不存在") - 重新查询可用会议室列表 - 从返回的实际可用会议室中选择 - 使用正确ID重试预定 2. 只有在确认预定成功后才能生成final_answer 【严格的上下文连贯性要求】 1. 每一步动作必须直接基于上一步observation结果,禁止忽略或歪曲 2. 必须逐字分析observation内容,并根据实际返回结果(而非臆想)执行下一步 3. 会议室ID必须严格从list_idle_meeting_rooms的结果中原样提取,禁止臆造或修改格式 4. 当observation包含"失败"、"不存在"等关键词时,下一步必须是纠正行动,而非宣告成功 【强制确认机制】 1. 在每次工具调用后,必须首先重复observation结果以证明理解 2. 思考过程必须包含对observation的明确分析,例如:"根据观察到的'预定失败,R101不存在',我需要重新选择正确的会议室ID" 3. 禁止在最终答案中使用与observation矛盾的信息 【结果验证关键规则】 1. 预定成功的唯一有效判断标准:book_meeting_room返回必须包含明确的"预定成功"字样 2. 会议室ID的有效来源:必须直接使用list_idle_meeting_rooms返回的会议室ID列表中的选项,不得添加任何前缀或后缀 3. 严禁从失败的预定结果中提取会议室ID作为最终答案 4. 当工具返回包含"失败"时,你必须尝试使用不同的会议室ID重新预定 """ async def run(self, query): try: self.messages.append({"role": "system", "content": self.system_prompt}) self.messages.append({"role": "system", "content": f"当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"}) self.messages.append({"role": "user", "content": query}) await self.mcp_client.connect_to_sse_server("http://127.0.0.1:8001/sse") observation_history = [] for i in range(10): # 最大循环次数限制 print(f"第{i+1}次循环") try: response = await self.llm.chat.completions.create( model="qwen-plus", messages=self.messages, response_format={"type": "json_object"}, ) message = response.choices[0].message # 检查是否是最终答案 if message.content and "final_answer" in message.content: try: final_answer = json.loads(message.content)["final_answer"] # 检查上一步observation是否包含失败信息,处理模型判断失误的情况 last_observation = observation_history[-1]["result"] if ( "预定失败" in last_observation or "不存在" in last_observation or "错误" in last_observation ): # 发现错误 - 强制继续对话 error_msg = { "role": "user", "content": f"警告!你错误的宣告了成功,工具返回的信息为:「{last_observation}」。请正确处理这个错误并重试,不要声称成功。", } self.messages.append(error_msg) continue return final_answer except (json.JSONDecodeError, KeyError) as e: print(f"解析final_answer时出错: {e}") # 继续执行,尝试下一轮对话 # 解析并执行工具调用 # print("message.content", message.content) action = json.loads(message.content)["action"] tool_name = action["name"] tool_args = action["args"] # 调用工具 observation = await self.mcp_client.session.call_tool( tool_name, tool_args ) observation_text = str(observation.content[0].text) print( f"\n调用工具{tool_name},参数{tool_args},返回结果{observation_text}" ) thought = json.loads(message.content)["thought"] action["observation"] = observation_text print(f"\n当前步骤{i+1}的reAct详情如下:") print(f"thought: {thought}") print(f"action: {tool_name},参数{tool_args}") print(f"observation: {observation_text}") # 将思考步骤添加到对话上下文 self.messages.append( { "role": "assistant", "content": json.dumps( { "thought": thought, "action": action, "observation": observation_text, } ), } ) # 在工具调用后 observation_history.append( { "tool": tool_name, "args": tool_args, "result": observation_text, } ) except Exception as e: print(f"API调用异常: {str(e)}") continue return "超过最大尝试次数,任务终止" except Exception as e: print(f"执行过程中发生异常: {str(e)}") return f"执行失败: {str(e)}" if __name__ == "__main__": meetting_booking_mcp_client = MCPClient() agent = ReActAgent(meetting_booking_mcp_client) loop = asyncio.get_event_loop() result = loop.run_until_complete( agent.run( "我想预定一个会议室,时间是明天上午9点到10点半,如果有多个空闲会议室,请随便选一个" ) ) print(f"最终结果: {result}") ``` ## 创建一个MCP Server 在项目目录下创建一个新的文件‘meeting_room.py’,粘贴下面的内容。 MCP Server使用sse方式的情况,需要在MCP Client启动前启动,所以可以使用‘python meeting_room.py sse 8001’启动服务。 ```python import random from mcp.server.sse import SseServerTransport from mcp import types from mcp.server.lowlevel import Server import logging from starlette.applications import Starlette from starlette.routing import Mount, Route import uvicorn app = Server("meeting_room") async def list_idle_meeting_rooms(start_time: str, end_time: str) -> list[types.TextContent]: return ["会议室1", "会议室2", "会议室3"] async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]: # 随机返回预定成功或失败 print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}") if room_id in ["会议室1", "会议室2", "会议室3"]: return f"{room_id}预定成功" if random.random() < 0.5 else f"{room_id}预定失败,请重新选择会议室" else: return f"预定失败,{room_id}不存在,请重新选择会议室" @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="list_idle_meeting_rooms", description="list idle meeting rooms", inputSchema={ "type": "object", "properties": { "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["start_time", "end_time"] } ), types.Tool( name="book_meeting_room", description="book meeting room", inputSchema={ "type": "object", "properties": { "room_id": { "type": "string", "description": "会议室ID" }, "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["room_id", "start_time", "end_time"] } ) ] @app.call_tool() async def call_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "list_idle_meeting_rooms": start_time = arguments["start_time"] end_time = arguments["end_time"] result = await list_idle_meeting_rooms(start_time, end_time) return [types.TextContent(type="text", text=str(result))] elif name == "book_meeting_room": print(arguments) room_id = arguments["room_id"] start_time = arguments["start_time"] end_time = arguments["end_time"] result = await book_meeting_room(room_id, start_time, end_time) return [types.TextContent(type="text", text=str(result))] raise ValueError(f"Tool not found: {name}") if __name__ == "__main__": # Run the FastAPI app with uvicorn import sys # 默认使用标准输入输出传输 transport = "stdio" port = 8000 # 检查命令行参数 if len(sys.argv) > 1: if sys.argv[1] == "sse": transport = "sse" if len(sys.argv) > 2: try: port = int(sys.argv[2]) except ValueError: pass if transport == "sse": # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("weather-mcp") # 配置 SSE 传输,增加超时和重试参数 sse = SseServerTransport( "/message/", ) async def handle_sse(request): try: logger.info(f"建立新的SSE连接: {request.client}") async with sse.connect_sse(request.scope, request.receive, request._send) as streams: try: await app.run(streams[0], streams[1], app.create_initialization_options()) except Exception as e: logger.error(f"处理SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) except Exception as e: logger.error(f"建立SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) return starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/message/", app=sse.handle_post_message), ] ) print(f"在端口{port}上启动MCP服务器,使用SSE传输") print(f"连接URL: http://127.0.0.1:{port}/sse") uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info") else: from mcp.server.stdio import stdio_server import anyio async def run_stdio(): async with stdio_server() as streams: await app.run(streams[0], streams[1], app.create_initialization_options()) print("使用标准输入输出传输启动MCP服务器") anyio.run(run_stdio) ``` ## 创建一个MCP Server管理类 在项目目录创建‘mcp_client.py’文件 ```python from typing import Optional from contextlib import AsyncExitStack from mcp.client.sse import sse_client from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() async def connect_to_server(self, server_script_path: str, server_interpreter_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith(".py") is_js = server_script_path.endswith(".js") if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" command = server_interpreter_path server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context( stdio_client(server_params) ) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context( ClientSession(self.stdio, self.write) ) await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def connect_to_sse_server(self, server_url: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) ``` ## 启动MCP Client 使用命令启动agent,并从控制台观察输出。 ``` python react_agent.py ``` 控制台输出 ``` 第1次循环 调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3'] 当前步骤1的reAct详情如下: thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。 action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: ['会议室1', '会议室2', '会议室3'] 第2次循环 调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功 当前步骤2的reAct详情如下: thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。 action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: 会议室1预定成功 第3次循环 最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00) ```

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ma1238906/MCP_Demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server