MCP Development Framework
by aigo666
"""
MaxKB工具,用于请求MaxKB API并处理返回结果
"""
import httpx
import json
import os
import traceback
import mcp.types as types
from . import BaseTool, ToolRegistry
import logging
import asyncio
import socket
# 设置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# 设置httpcore的超时
os.environ['HTTPCORE_TIMEOUT'] = '60'
os.environ['HTTPX_TIMEOUT'] = '60'
@ToolRegistry.register
class MaxKbTool(BaseTool):
"""MaxKB API请求工具"""
name = "maxkb"
description = "请求MaxKB API并返回处理后的结果"
input_schema = {
"type": "object",
"required": ["message"],
"properties": {
"message": {
"type": "string",
"description": "要发送的消息内容",
},
"re_chat": {
"type": "boolean",
"description": "是否重新开始对话",
"default": False
},
"stream": {
"type": "boolean",
"description": "是否使用流式响应",
"default": True
}
},
}
def _check_env_variables(self):
"""检查必要的环境变量是否存在"""
required_vars = [
'MAXKB_HOST',
'MAXKB_CHAT_ID',
'MAXKB_APPLICATION_ID',
'MAXKB_AUTHORIZATION'
]
env_values = {}
missing_vars = []
for var in required_vars:
value = os.getenv(var)
if not value:
missing_vars.append(var)
else:
env_values[var] = value
if missing_vars:
raise ValueError(f"缺少必要的环境变量: {', '.join(missing_vars)}")
logger.debug(f"环境变量检查通过: {env_values}")
return env_values
async def execute(self, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""执行API请求并处理返回结果"""
try:
# 等待服务器初始化完成
logger.debug("等待服务器初始化...")
await asyncio.sleep(2)
logger.debug(f"收到请求参数: {arguments}")
if "message" not in arguments:
return [types.TextContent(
type="text",
text="错误: 缺少必要参数 'message'"
)]
# 检查环境变量
env_vars = self._check_env_variables()
# 准备请求参数
url = f"{env_vars['MAXKB_HOST']}/api/application/chat_message/{env_vars['MAXKB_CHAT_ID']}"
headers = {
"accept": "application/json",
"AUTHORIZATION": env_vars['MAXKB_AUTHORIZATION'],
"Content-Type": "application/json"
}
data = {
"message": arguments["message"],
"re_chat": arguments.get("re_chat", False),
"stream": arguments.get("stream", True)
}
logger.debug(f"准备发送请求到: {url}")
logger.debug(f"请求头: {headers}")
logger.debug(f"请求数据: {data}")
try:
# 发送请求
logger.debug("开始创建HTTP客户端,超时设置为60秒...")
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10, keepalive_expiry=60.0)
timeout = httpx.Timeout(
timeout=60.0,
connect=60.0,
read=60.0,
write=60.0,
pool=60.0
)
async with httpx.AsyncClient(
timeout=timeout,
limits=limits,
transport=httpx.AsyncHTTPTransport(
retries=1,
verify=False,
http1=True,
http2=False,
socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
)
) as client:
logger.debug("开始发送POST请求...")
try:
response = await client.post(url, headers=headers, json=data)
logger.debug(f"收到响应状态码: {response.status_code}")
logger.debug(f"响应头: {response.headers}")
# 处理响应内容
content_parts = []
last_content = ""
received_data = []
async for line in response.aiter_lines():
logger.debug(f"收到行: {line}")
if line.startswith('data: '):
try:
# 解析JSON数据
data = json.loads(line[6:]) # 去掉'data: '前缀
logger.debug(f"解析到的数据: {data}")
received_data.append(data)
# 检查是否有非空的content
if isinstance(data, dict):
current_content = data.get("content", "")
if current_content and current_content != last_content:
last_content = current_content
content_parts.append(current_content)
logger.debug(f"添加新内容: {current_content}")
# 检查reasoning_content
reasoning_content = data.get("reasoning_content", "")
if reasoning_content and reasoning_content != last_content:
last_content = reasoning_content
content_parts.append(reasoning_content)
logger.debug(f"添加推理内容: {reasoning_content}")
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 行内容: {line}")
continue # 忽略无法解析的行
# 拼接所有内容
result = ''.join(content_parts) if content_parts else ""
logger.debug(f"最终结果: {result}")
if not result:
logger.warning("未获取到有效内容")
error_details = f"收到 {len(received_data)} 条数据"
if received_data:
error_details += "\n最后一条数据:\n" + json.dumps(received_data[-1], ensure_ascii=False, indent=2)
return [types.TextContent(
type="text",
text=f"请求错误: {error_details}"
)]
return [types.TextContent(
type="text",
text=result
)]
except httpx.TimeoutException as e:
logger.error(f"请求超时: {str(e)}")
logger.error(f"超时配置: {client.timeout}")
return [types.TextContent(
type="text",
text=f"请求超时(60秒): {str(e)}"
)]
except httpx.ConnectError as e:
logger.error(f"连接错误: {str(e)}")
logger.error(f"目标URL: {url}")
return [types.TextContent(
type="text",
text=f"连接错误({url}): {str(e)}"
)]
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP状态错误: {str(e)}")
logger.error(f"响应状态码: {e.response.status_code}")
logger.error(f"响应内容: {e.response.text}")
return [types.TextContent(
type="text",
text=f"HTTP状态错误: {str(e)}"
)]
except httpx.HTTPError as e:
error_msg = f"HTTP请求错误: {str(e)}\n状态码: {getattr(e.response, 'status_code', 'N/A')}\n响应内容: {getattr(e.response, 'text', 'N/A')}"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=error_msg
)]
except ValueError as e:
error_msg = f"配置错误: {str(e)}"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=error_msg
)]
except Exception as e:
error_msg = f"处理错误: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=error_msg
)]