react_agent.py•11.5 kB
import asyncio
import json
import os
import datetime
from openai import AsyncOpenAI
from mcp_client import MCPClient
from pprint import pprint
class ReActAgent:
def __init__(self, mcp_client: MCPClient):
# 从环境变量获取API密钥,如果不存在则使用默认值
api_key = os.environ.get("API_KEY", "your api key")
self.llm = AsyncOpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
self.mcp_client = mcp_client
self.messages = []
self.system_prompt = """
你是一个专业会议室预定AI助手,必须严格遵循以下规则运行:
【核心机制】
采用Thought→Action→Observation循环工作流:
1. 解析用户需求时,必须明确识别以下要素:
- 精确到分钟的时间段(自动补全日期格式)
- 预定目标(查询/预定)
2. 工具调用规范:
只能使用下列工具且每次只能调用1个:
- list_idle_meeting_rooms(必填参数:start_time, end_time)
- book_meeting_room(必填参数:room_id, start_time, end_time)
参数要求:
• 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式
• room_id必须从list_idle_meeting_rooms返回结果中选取
3. 执行流程强制要求:
(1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性
【输出规则】
1. 未完成时输出json格式:
{
"thought": "推理过程(必须包含时间转换逻辑)",
"action": {"name": "工具名", "args": {"参数1":"值1"}},
"observation": "工具返回原始结果"
}
2. 未完成标准:
- 未调用工具
- 未返回最终答案
- 明确提示预定失败
3. 完成标准:当且仅当满足以下条件时输出最终答案:
- book_meeting_room返回预定成功提示
- 包含有效的会议室ID
4. 最终答案json格式:
{
"final_answer": "预定成功:{room_id} ({start_time}至{end_time})"
}
【校验机制】
1. 时间参数三重验证:
(1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$
(2) 时间逻辑:start_time < end_time
(3) 时段冲突检测(通过list_idle_meeting_rooms实现)
2. 错误处理:
- 参数缺失时精确指出缺失项,示例:
{"action":{"name":"request_params","args":{"missing_params":["start_time"]}}}
- 工具调用失败时返回原始错误信息
【终止条件】
1. 成功终止:输出含预定成功信息的final_answer
2. 失败终止:连续3次参数请求未获有效输入时返回:
{"final_answer": "ERR_MAX_RETRY: 超过最大重试次数"}
【错误处理与结果验证】
1. 必须严格检查工具返回结果中是否包含"失败"、"不存在"、"错误"等关键词
2. 当observation中包含任何失败信息时:
- 禁止输出final_answer
- 必须重新进行工具调用解决问题
3. 验证规则:
- 必须逐字分析observation返回内容
- 当且仅当observation明确包含"{room_id}预定成功"字样时才可输出最终答案
- 任何失败情况必须重试或告知用户失败原因
【列表解析规则】
1. list_idle_meeting_rooms返回结果必须按原格式提取,禁止自行编造会议室ID
2. 必须严格匹配返回的会议室ID格式,不得修改或推测
3. 如返回格式为["会议室1", "会议室2", "会议室3"],则room_id必须是完全一致的"会议室1"、"会议室2"或"会议室3"
【强制验证机制】
1. 每次工具调用后必须执行:
if "失败" in observation or "不存在" in observation or "错误" in observation:
# 必须处理错误,不允许输出final_answer
# 必须尝试替代方案或告知用户
2. 禁止性规则:
- 严禁在任何失败场景下输出含"预定成功"的消息
- 严禁忽略工具返回的任何错误或警告信息
【强制纠错与重试机制】
1. 当工具返回错误时,必须执行以下步骤:
- 分析错误原因(例如:"会议室不存在")
- 重新查询可用会议室列表
- 从返回的实际可用会议室中选择
- 使用正确ID重试预定
2. 只有在确认预定成功后才能生成final_answer
【严格的上下文连贯性要求】
1. 每一步动作必须直接基于上一步observation结果,禁止忽略或歪曲
2. 必须逐字分析observation内容,并根据实际返回结果(而非臆想)执行下一步
3. 会议室ID必须严格从list_idle_meeting_rooms的结果中原样提取,禁止臆造或修改格式
4. 当observation包含"失败"、"不存在"等关键词时,下一步必须是纠正行动,而非宣告成功
【强制确认机制】
1. 在每次工具调用后,必须首先重复observation结果以证明理解
2. 思考过程必须包含对observation的明确分析,例如:"根据观察到的'预定失败,R101不存在',我需要重新选择正确的会议室ID"
3. 禁止在最终答案中使用与observation矛盾的信息
【结果验证关键规则】
1. 预定成功的唯一有效判断标准:book_meeting_room返回必须包含明确的"预定成功"字样
2. 会议室ID的有效来源:必须直接使用list_idle_meeting_rooms返回的会议室ID列表中的选项,不得添加任何前缀或后缀
3. 严禁从失败的预定结果中提取会议室ID作为最终答案
4. 当工具返回包含"失败"时,你必须尝试使用不同的会议室ID重新预定
"""
async def run(self, query):
try:
self.messages.append({"role": "system", "content": self.system_prompt})
self.messages.append({"role": "system", "content": f"当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"})
self.messages.append({"role": "user", "content": query})
await self.mcp_client.connect_to_sse_server("http://127.0.0.1:8001/sse")
observation_history = []
for i in range(10): # 最大循环次数限制
print(f"第{i+1}次循环")
try:
response = await self.llm.chat.completions.create(
model="qwen-plus",
messages=self.messages,
response_format={"type": "json_object"},
)
message = response.choices[0].message
# 检查是否是最终答案
if message.content and "final_answer" in message.content:
try:
final_answer = json.loads(message.content)["final_answer"]
# 检查上一步observation是否包含失败信息,处理模型判断失误的情况
last_observation = observation_history[-1]["result"]
if (
"预定失败" in last_observation
or "不存在" in last_observation
or "错误" in last_observation
):
# 发现错误 - 强制继续对话
error_msg = {
"role": "user",
"content": f"警告!你错误的宣告了成功,工具返回的信息为:「{last_observation}」。请正确处理这个错误并重试,不要声称成功。",
}
self.messages.append(error_msg)
continue
return final_answer
except (json.JSONDecodeError, KeyError) as e:
print(f"解析final_answer时出错: {e}")
# 继续执行,尝试下一轮对话
# 解析并执行工具调用
# print("message.content", message.content)
action = json.loads(message.content)["action"]
tool_name = action["name"]
tool_args = action["args"]
# 调用工具
observation = await self.mcp_client.session.call_tool(
tool_name, tool_args
)
observation_text = str(observation.content[0].text)
print(
f"\n调用工具{tool_name},参数{tool_args},返回结果{observation_text}"
)
thought = json.loads(message.content)["thought"]
action["observation"] = observation_text
print(f"\n当前步骤{i+1}的reAct详情如下:")
print(f"thought: {thought}")
print(f"action: {tool_name},参数{tool_args}")
print(f"observation: {observation_text}")
# 将思考步骤添加到对话上下文
self.messages.append(
{
"role": "assistant",
"content": json.dumps(
{
"thought": thought,
"action": action,
"observation": observation_text,
}
),
}
)
# 在工具调用后
observation_history.append(
{
"tool": tool_name,
"args": tool_args,
"result": observation_text,
}
)
except Exception as e:
print(f"API调用异常: {str(e)}")
continue
return "超过最大尝试次数,任务终止"
except Exception as e:
print(f"执行过程中发生异常: {str(e)}")
return f"执行失败: {str(e)}"
if __name__ == "__main__":
meetting_booking_mcp_client = MCPClient()
agent = ReActAgent(meetting_booking_mcp_client)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
agent.run(
"我想预定一个会议室,时间是明天上午9点到10点半,如果有多个空闲会议室,请随便选一个"
)
)
print(f"最终结果: {result}")