Meeting Room MCP Server

简介

该项目实现了一个入门的MCP Client和MCP Server交互的功能。 MCP Client中使用了React Agent模式来实现工具的调用。 MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。

创建Agent

使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。 在项目目录下创建‘react_agent.py’文件

import asyncio import json import os import datetime from openai import AsyncOpenAI from mcp_client import MCPClient from pprint import pprint class ReActAgent: def __init__(self, mcp_client: MCPClient): # 从环境变量获取API密钥,如果不存在则使用默认值 # 修改base_url来使用不同的大模型供应商 # React模式一次使用一个工具 api_key = os.environ.get("API_KEY", "your llm api key") self.llm = AsyncOpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) self.mcp_client = mcp_client self.messages = [] self.system_prompt = """ 你是一个专业会议室预定AI助手,必须严格遵循以下规则运行: 【核心机制】 采用Thought→Action→Observation循环工作流: 1. 解析用户需求时,必须明确识别以下要素: - 精确到分钟的时间段(自动补全日期格式) - 预定目标(查询/预定) 2. 工具调用规范: 只能使用下列工具且每次只能调用1个: - list_idle_meeting_rooms(必填参数:start_time, end_time) - book_meeting_room(必填参数:room_id, start_time, end_time) 参数要求: • 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式 • room_id必须从list_idle_meeting_rooms返回结果中选取 3. 执行流程强制要求: (1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性 【输出规则】 1. 未完成时输出json格式: { "thought": "推理过程(必须包含时间转换逻辑)", "action": {"name": "工具名", "args": {"参数1":"值1"}}, "observation": "工具返回原始结果" } 2. 未完成标准: - 未调用工具 - 未返回最终答案 - 明确提示预定失败 3. 完成标准:当且仅当满足以下条件时输出最终答案: - book_meeting_room返回预定成功提示 - 包含有效的会议室ID 4. 最终答案json格式: { "final_answer": "预定成功:{room_id} ({start_time}至{end_time})" } 【校验机制】 1. 时间参数三重验证: (1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$ (2) 时间逻辑:start_time < end_time (3) 时段冲突检测(通过list_idle_meeting_rooms实现) 2. 错误处理: - 参数缺失时精确指出缺失项,示例: {"action":{"name":"request_params","args":{"missing_params":["start_time"]}}} - 工具调用失败时返回原始错误信息 【终止条件】 1. 成功终止:输出含预定成功信息的final_answer 2. 失败终止:连续3次参数请求未获有效输入时返回: {"final_answer": "ERR_MAX_RETRY: 超过最大重试次数"} 【错误处理与结果验证】 1. 必须严格检查工具返回结果中是否包含"失败"、"不存在"、"错误"等关键词 2. 当observation中包含任何失败信息时: - 禁止输出final_answer - 必须重新进行工具调用解决问题 3. 验证规则: - 必须逐字分析observation返回内容 - 当且仅当observation明确包含"{room_id}预定成功"字样时才可输出最终答案 - 任何失败情况必须重试或告知用户失败原因 【列表解析规则】 1. list_idle_meeting_rooms返回结果必须按原格式提取,禁止自行编造会议室ID 2. 必须严格匹配返回的会议室ID格式,不得修改或推测 3. 如返回格式为["会议室1", "会议室2", "会议室3"],则room_id必须是完全一致的"会议室1"、"会议室2"或"会议室3" 【强制验证机制】 1. 每次工具调用后必须执行: if "失败" in observation or "不存在" in observation or "错误" in observation: # 必须处理错误,不允许输出final_answer # 必须尝试替代方案或告知用户 2. 禁止性规则: - 严禁在任何失败场景下输出含"预定成功"的消息 - 严禁忽略工具返回的任何错误或警告信息 【强制纠错与重试机制】 1. 当工具返回错误时,必须执行以下步骤: - 分析错误原因(例如:"会议室不存在") - 重新查询可用会议室列表 - 从返回的实际可用会议室中选择 - 使用正确ID重试预定 2. 只有在确认预定成功后才能生成final_answer 【严格的上下文连贯性要求】 1. 每一步动作必须直接基于上一步observation结果,禁止忽略或歪曲 2. 必须逐字分析observation内容,并根据实际返回结果(而非臆想)执行下一步 3. 会议室ID必须严格从list_idle_meeting_rooms的结果中原样提取,禁止臆造或修改格式 4. 当observation包含"失败"、"不存在"等关键词时,下一步必须是纠正行动,而非宣告成功 【强制确认机制】 1. 在每次工具调用后,必须首先重复observation结果以证明理解 2. 思考过程必须包含对observation的明确分析,例如:"根据观察到的'预定失败,R101不存在',我需要重新选择正确的会议室ID" 3. 禁止在最终答案中使用与observation矛盾的信息 【结果验证关键规则】 1. 预定成功的唯一有效判断标准:book_meeting_room返回必须包含明确的"预定成功"字样 2. 会议室ID的有效来源:必须直接使用list_idle_meeting_rooms返回的会议室ID列表中的选项,不得添加任何前缀或后缀 3. 严禁从失败的预定结果中提取会议室ID作为最终答案 4. 当工具返回包含"失败"时,你必须尝试使用不同的会议室ID重新预定 """ async def run(self, query): try: self.messages.append({"role": "system", "content": self.system_prompt}) self.messages.append({"role": "system", "content": f"当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"}) self.messages.append({"role": "user", "content": query}) await self.mcp_client.connect_to_sse_server("http://127.0.0.1:8001/sse") observation_history = [] for i in range(10): # 最大循环次数限制 print(f"第{i+1}次循环") try: response = await self.llm.chat.completions.create( model="qwen-plus", messages=self.messages, response_format={"type": "json_object"}, ) message = response.choices[0].message # 检查是否是最终答案 if message.content and "final_answer" in message.content: try: final_answer = json.loads(message.content)["final_answer"] # 检查上一步observation是否包含失败信息,处理模型判断失误的情况 last_observation = observation_history[-1]["result"] if ( "预定失败" in last_observation or "不存在" in last_observation or "错误" in last_observation ): # 发现错误 - 强制继续对话 error_msg = { "role": "user", "content": f"警告!你错误的宣告了成功,工具返回的信息为:「{last_observation}」。请正确处理这个错误并重试,不要声称成功。", } self.messages.append(error_msg) continue return final_answer except (json.JSONDecodeError, KeyError) as e: print(f"解析final_answer时出错: {e}") # 继续执行,尝试下一轮对话 # 解析并执行工具调用 # print("message.content", message.content) action = json.loads(message.content)["action"] tool_name = action["name"] tool_args = action["args"] # 调用工具 observation = await self.mcp_client.session.call_tool( tool_name, tool_args ) observation_text = str(observation.content[0].text) print( f"\n调用工具{tool_name},参数{tool_args},返回结果{observation_text}" ) thought = json.loads(message.content)["thought"] action["observation"] = observation_text print(f"\n当前步骤{i+1}的reAct详情如下:") print(f"thought: {thought}") print(f"action: {tool_name},参数{tool_args}") print(f"observation: {observation_text}") # 将思考步骤添加到对话上下文 self.messages.append( { "role": "assistant", "content": json.dumps( { "thought": thought, "action": action, "observation": observation_text, } ), } ) # 在工具调用后 observation_history.append( { "tool": tool_name, "args": tool_args, "result": observation_text, } ) except Exception as e: print(f"API调用异常: {str(e)}") continue return "超过最大尝试次数,任务终止" except Exception as e: print(f"执行过程中发生异常: {str(e)}") return f"执行失败: {str(e)}" if __name__ == "__main__": meetting_booking_mcp_client = MCPClient() agent = ReActAgent(meetting_booking_mcp_client) loop = asyncio.get_event_loop() result = loop.run_until_complete( agent.run( "我想预定一个会议室,时间是明天上午9点到10点半,如果有多个空闲会议室,请随便选一个" ) ) print(f"最终结果: {result}")

创建一个MCP Server

在项目目录下创建一个新的文件‘meeting_room.py’,粘贴下面的内容。 MCP Server使用sse方式的情况,需要在MCP Client启动前启动,所以可以使用‘python meeting_room.py sse 8001’启动服务。

import random from mcp.server.sse import SseServerTransport from mcp import types from mcp.server.lowlevel import Server import logging from starlette.applications import Starlette from starlette.routing import Mount, Route import uvicorn app = Server("meeting_room") async def list_idle_meeting_rooms(start_time: str, end_time: str) -> list[types.TextContent]: return ["会议室1", "会议室2", "会议室3"] async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]: # 随机返回预定成功或失败 print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}") if room_id in ["会议室1", "会议室2", "会议室3"]: return f"{room_id}预定成功" if random.random() < 0.5 else f"{room_id}预定失败,请重新选择会议室" else: return f"预定失败,{room_id}不存在,请重新选择会议室" @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="list_idle_meeting_rooms", description="list idle meeting rooms", inputSchema={ "type": "object", "properties": { "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["start_time", "end_time"] } ), types.Tool( name="book_meeting_room", description="book meeting room", inputSchema={ "type": "object", "properties": { "room_id": { "type": "string", "description": "会议室ID" }, "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["room_id", "start_time", "end_time"] } ) ] @app.call_tool() async def call_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "list_idle_meeting_rooms": start_time = arguments["start_time"] end_time = arguments["end_time"] result = await list_idle_meeting_rooms(start_time, end_time) return [types.TextContent(type="text", text=str(result))] elif name == "book_meeting_room": print(arguments) room_id = arguments["room_id"] start_time = arguments["start_time"] end_time = arguments["end_time"] result = await book_meeting_room(room_id, start_time, end_time) return [types.TextContent(type="text", text=str(result))] raise ValueError(f"Tool not found: {name}") if __name__ == "__main__": # Run the FastAPI app with uvicorn import sys # 默认使用标准输入输出传输 transport = "stdio" port = 8000 # 检查命令行参数 if len(sys.argv) > 1: if sys.argv[1] == "sse": transport = "sse" if len(sys.argv) > 2: try: port = int(sys.argv[2]) except ValueError: pass if transport == "sse": # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("weather-mcp") # 配置 SSE 传输,增加超时和重试参数 sse = SseServerTransport( "/message/", ) async def handle_sse(request): try: logger.info(f"建立新的SSE连接: {request.client}") async with sse.connect_sse(request.scope, request.receive, request._send) as streams: try: await app.run(streams[0], streams[1], app.create_initialization_options()) except Exception as e: logger.error(f"处理SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) except Exception as e: logger.error(f"建立SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) return starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/message/", app=sse.handle_post_message), ] ) print(f"在端口{port}上启动MCP服务器,使用SSE传输") print(f"连接URL: http://127.0.0.1:{port}/sse") uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info") else: from mcp.server.stdio import stdio_server import anyio async def run_stdio(): async with stdio_server() as streams: await app.run(streams[0], streams[1], app.create_initialization_options()) print("使用标准输入输出传输启动MCP服务器") anyio.run(run_stdio)

创建一个MCP Server管理类

在项目目录创建‘mcp_client.py’文件

from typing import Optional from contextlib import AsyncExitStack from mcp.client.sse import sse_client from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() async def connect_to_server(self, server_script_path: str, server_interpreter_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith(".py") is_js = server_script_path.endswith(".js") if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" command = server_interpreter_path server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context( stdio_client(server_params) ) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context( ClientSession(self.stdio, self.write) ) await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def connect_to_sse_server(self, server_url: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools])

启动MCP Client

使用命令启动agent,并从控制台观察输出。

python react_agent.py

控制台输出

第1次循环 调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3'] 当前步骤1的reAct详情如下: thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。 action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: ['会议室1', '会议室2', '会议室3'] 第2次循环 调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功 当前步骤2的reAct详情如下: thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。 action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: 会议室1预定成功 第3次循环 最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00)
-
security - not tested
-
license - not tested
-
quality - not tested

A simple MCP server that enables meeting room booking through an AI assistant, supporting room availability checks and booking operations with React Agent pattern for tool calling.

  1. 创建Agent
    1. 创建一个MCP Server
      1. 创建一个MCP Server管理类
        1. 启动MCP Client
          ID: yr91tq08fn