maxkb
Send messages to the MaxKB API and receive processed results with optional re-chat and streaming support.
Instructions
请求MaxKB API并返回处理后的结果
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message | Yes | 要发送的消息内容 | |
| re_chat | No | 是否重新开始对话 | |
| stream | No | 是否使用流式响应 |
Implementation Reference
- mcp_tool/tools/maxkb_tool.py:7-234 (handler)The MaxKbTool class with the execute() method (lines 72-234) that performs the actual MaxKB API call. It sends POST requests to the MaxKB API endpoint, handles streaming responses, and returns results.
import os import traceback import mcp.types as types from . import BaseTool, ToolRegistry import logging import asyncio import socket # 设置日志 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # 设置httpcore的超时 os.environ['HTTPCORE_TIMEOUT'] = '60' os.environ['HTTPX_TIMEOUT'] = '60' @ToolRegistry.register class MaxKbTool(BaseTool): """MaxKB API请求工具""" name = "maxkb" description = "请求MaxKB API并返回处理后的结果" input_schema = { "type": "object", "required": ["message"], "properties": { "message": { "type": "string", "description": "要发送的消息内容", }, "re_chat": { "type": "boolean", "description": "是否重新开始对话", "default": False }, "stream": { "type": "boolean", "description": "是否使用流式响应", "default": True } }, } def _check_env_variables(self): """检查必要的环境变量是否存在""" required_vars = [ 'MAXKB_HOST', 'MAXKB_CHAT_ID', 'MAXKB_APPLICATION_ID', 'MAXKB_AUTHORIZATION' ] env_values = {} missing_vars = [] for var in required_vars: value = os.getenv(var) if not value: missing_vars.append(var) else: env_values[var] = value if missing_vars: raise ValueError(f"缺少必要的环境变量: {', '.join(missing_vars)}") logger.debug(f"环境变量检查通过: {env_values}") return env_values async def execute(self, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """执行API请求并处理返回结果""" try: # 等待服务器初始化完成 logger.debug("等待服务器初始化...") await asyncio.sleep(2) logger.debug(f"收到请求参数: {arguments}") if "message" not in arguments: return [types.TextContent( type="text", text="错误: 缺少必要参数 'message'" )] # 检查环境变量 env_vars = self._check_env_variables() # 准备请求参数 url = f"{env_vars['MAXKB_HOST']}/api/application/chat_message/{env_vars['MAXKB_CHAT_ID']}" headers = { "accept": "application/json", "AUTHORIZATION": env_vars['MAXKB_AUTHORIZATION'], "Content-Type": "application/json" } data = { "message": arguments["message"], "re_chat": arguments.get("re_chat", False), "stream": arguments.get("stream", True) } logger.debug(f"准备发送请求到: {url}") logger.debug(f"请求头: {headers}") logger.debug(f"请求数据: {data}") try: # 发送请求 logger.debug("开始创建HTTP客户端,超时设置为60秒...") limits = httpx.Limits(max_keepalive_connections=5, max_connections=10, keepalive_expiry=60.0) timeout = httpx.Timeout( timeout=60.0, connect=60.0, read=60.0, write=60.0, pool=60.0 ) async with httpx.AsyncClient( timeout=timeout, limits=limits, transport=httpx.AsyncHTTPTransport( retries=1, verify=False, http1=True, http2=False, socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] ) ) as client: logger.debug("开始发送POST请求...") try: response = await client.post(url, headers=headers, json=data) logger.debug(f"收到响应状态码: {response.status_code}") logger.debug(f"响应头: {response.headers}") # 处理响应内容 content_parts = [] last_content = "" received_data = [] async for line in response.aiter_lines(): logger.debug(f"收到行: {line}") if line.startswith('data: '): try: # 解析JSON数据 data = json.loads(line[6:]) # 去掉'data: '前缀 logger.debug(f"解析到的数据: {data}") received_data.append(data) # 检查是否有非空的content if isinstance(data, dict): current_content = data.get("content", "") if current_content and current_content != last_content: last_content = current_content content_parts.append(current_content) logger.debug(f"添加新内容: {current_content}") # 检查reasoning_content reasoning_content = data.get("reasoning_content", "") if reasoning_content and reasoning_content != last_content: last_content = reasoning_content content_parts.append(reasoning_content) logger.debug(f"添加推理内容: {reasoning_content}") except json.JSONDecodeError as e: logger.error(f"JSON解析错误: {e}, 行内容: {line}") continue # 忽略无法解析的行 # 拼接所有内容 result = ''.join(content_parts) if content_parts else "" logger.debug(f"最终结果: {result}") if not result: logger.warning("未获取到有效内容") error_details = f"收到 {len(received_data)} 条数据" if received_data: error_details += "\n最后一条数据:\n" + json.dumps(received_data[-1], ensure_ascii=False, indent=2) return [types.TextContent( type="text", text=f"请求错误: {error_details}" )] return [types.TextContent( type="text", text=result )] except httpx.TimeoutException as e: logger.error(f"请求超时: {str(e)}") logger.error(f"超时配置: {client.timeout}") return [types.TextContent( type="text", text=f"请求超时(60秒): {str(e)}" )] except httpx.ConnectError as e: logger.error(f"连接错误: {str(e)}") logger.error(f"目标URL: {url}") return [types.TextContent( type="text", text=f"连接错误({url}): {str(e)}" )] try: response.raise_for_status() except httpx.HTTPStatusError as e: logger.error(f"HTTP状态错误: {str(e)}") logger.error(f"响应状态码: {e.response.status_code}") logger.error(f"响应内容: {e.response.text}") return [types.TextContent( type="text", text=f"HTTP状态错误: {str(e)}" )] except httpx.HTTPError as e: error_msg = f"HTTP请求错误: {str(e)}\n状态码: {getattr(e.response, 'status_code', 'N/A')}\n响应内容: {getattr(e.response, 'text', 'N/A')}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except ValueError as e: error_msg = f"配置错误: {str(e)}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except Exception as e: error_msg = f"处理错误: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] - mcp_tool/tools/maxkb_tool.py:28-47 (schema)Input schema for the maxkb tool defining required 'message' (string) and optional 're_chat' (boolean) and 'stream' (boolean) parameters.
input_schema = { "type": "object", "required": ["message"], "properties": { "message": { "type": "string", "description": "要发送的消息内容", }, "re_chat": { "type": "boolean", "description": "是否重新开始对话", "default": False }, "stream": { "type": "boolean", "description": "是否使用流式响应", "default": True } }, } - mcp_tool/tools/maxkb_tool.py:23-27 (registration)The @ToolRegistry.register decorator registers the MaxKbTool class with name='maxkb' and description='请求MaxKB API并返回处理后的结果'.
@ToolRegistry.register class MaxKbTool(BaseTool): """MaxKB API请求工具""" name = "maxkb" description = "请求MaxKB API并返回处理后的结果" - mcp_tool/tools/__init__.py:42-67 (helper)The ToolRegistry class that manages tool registration via the @register decorator, and provides list_tools/get_tool/has_tool methods used by the server.
class ToolRegistry: """工具注册器,用于管理所有可用工具""" _tools: Dict[str, Type[BaseTool]] = {} @classmethod def register(cls, tool_class: Type[BaseTool]) -> Type[BaseTool]: """注册工具""" cls._tools[tool_class.name] = tool_class return tool_class @classmethod def get_tool(cls, name: str) -> Type[BaseTool]: """获取工具类""" if name not in cls._tools: raise ValueError(f"Unknown tool: {name}") return cls._tools[name] @classmethod def list_tools(cls) -> List[types.Tool]: """列出所有可用工具""" return [tool_class.get_tool_definition() for tool_class in cls._tools.values()] @classmethod def has_tool(cls, name: str) -> bool: """检查工具是否存在""" return name in cls._tools - mcp_tool/tools/loader.py:12-38 (helper)The load_tools() function that auto-discovers and imports all tool modules (including maxkb_tool.py) from the tools directory, triggering their registration.
def load_tools() -> List[Type[BaseTool]]: """ 自动加载tools目录下的所有工具模块 Returns: List[Type[BaseTool]]: 已加载的工具类列表 """ # 获取当前模块的路径 package_path = os.path.dirname(__file__) # 获取所有子模块 for _, name, is_pkg in pkgutil.iter_modules([package_path]): # 跳过__init__.py和loader.py if name in ['__init__', 'loader']: continue # 导入模块 module_name = f"{__package__}.{name}" try: importlib.import_module(module_name) except ImportError as e: print(f"Warning: Failed to import module {module_name}: {e}") # 收集所有已注册的工具类 tools = list(ToolRegistry._tools.values()) return tools