Skip to main content
Glama
aigo666

MCP Development Framework

maxkb

Query MaxKB API to process messages and manage conversation flow within the MCP Development Framework.

Instructions

请求MaxKB API并返回处理后的结果

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYes要发送的消息内容
re_chatNo是否重新开始对话
streamNo是否使用流式响应

Implementation Reference

  • The main asynchronous handler function that executes the MaxKB API request using httpx, handles streaming SSE responses, extracts content and reasoning_content, concatenates unique parts, and returns TextContent or error messages.
    async def execute(self, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """执行API请求并处理返回结果""" try: # 等待服务器初始化完成 logger.debug("等待服务器初始化...") await asyncio.sleep(2) logger.debug(f"收到请求参数: {arguments}") if "message" not in arguments: return [types.TextContent( type="text", text="错误: 缺少必要参数 'message'" )] # 检查环境变量 env_vars = self._check_env_variables() # 准备请求参数 url = f"{env_vars['MAXKB_HOST']}/api/application/chat_message/{env_vars['MAXKB_CHAT_ID']}" headers = { "accept": "application/json", "AUTHORIZATION": env_vars['MAXKB_AUTHORIZATION'], "Content-Type": "application/json" } data = { "message": arguments["message"], "re_chat": arguments.get("re_chat", False), "stream": arguments.get("stream", True) } logger.debug(f"准备发送请求到: {url}") logger.debug(f"请求头: {headers}") logger.debug(f"请求数据: {data}") try: # 发送请求 logger.debug("开始创建HTTP客户端,超时设置为60秒...") limits = httpx.Limits(max_keepalive_connections=5, max_connections=10, keepalive_expiry=60.0) timeout = httpx.Timeout( timeout=60.0, connect=60.0, read=60.0, write=60.0, pool=60.0 ) async with httpx.AsyncClient( timeout=timeout, limits=limits, transport=httpx.AsyncHTTPTransport( retries=1, verify=False, http1=True, http2=False, socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] ) ) as client: logger.debug("开始发送POST请求...") try: response = await client.post(url, headers=headers, json=data) logger.debug(f"收到响应状态码: {response.status_code}") logger.debug(f"响应头: {response.headers}") # 处理响应内容 content_parts = [] last_content = "" received_data = [] async for line in response.aiter_lines(): logger.debug(f"收到行: {line}") if line.startswith('data: '): try: # 解析JSON数据 data = json.loads(line[6:]) # 去掉'data: '前缀 logger.debug(f"解析到的数据: {data}") received_data.append(data) # 检查是否有非空的content if isinstance(data, dict): current_content = data.get("content", "") if current_content and current_content != last_content: last_content = current_content content_parts.append(current_content) logger.debug(f"添加新内容: {current_content}") # 检查reasoning_content reasoning_content = data.get("reasoning_content", "") if reasoning_content and reasoning_content != last_content: last_content = reasoning_content content_parts.append(reasoning_content) logger.debug(f"添加推理内容: {reasoning_content}") except json.JSONDecodeError as e: logger.error(f"JSON解析错误: {e}, 行内容: {line}") continue # 忽略无法解析的行 # 拼接所有内容 result = ''.join(content_parts) if content_parts else "" logger.debug(f"最终结果: {result}") if not result: logger.warning("未获取到有效内容") error_details = f"收到 {len(received_data)} 条数据" if received_data: error_details += "\n最后一条数据:\n" + json.dumps(received_data[-1], ensure_ascii=False, indent=2) return [types.TextContent( type="text", text=f"请求错误: {error_details}" )] return [types.TextContent( type="text", text=result )] except httpx.TimeoutException as e: logger.error(f"请求超时: {str(e)}") logger.error(f"超时配置: {client.timeout}") return [types.TextContent( type="text", text=f"请求超时(60秒): {str(e)}" )] except httpx.ConnectError as e: logger.error(f"连接错误: {str(e)}") logger.error(f"目标URL: {url}") return [types.TextContent( type="text", text=f"连接错误({url}): {str(e)}" )] try: response.raise_for_status() except httpx.HTTPStatusError as e: logger.error(f"HTTP状态错误: {str(e)}") logger.error(f"响应状态码: {e.response.status_code}") logger.error(f"响应内容: {e.response.text}") return [types.TextContent( type="text", text=f"HTTP状态错误: {str(e)}" )] except httpx.HTTPError as e: error_msg = f"HTTP请求错误: {str(e)}\n状态码: {getattr(e.response, 'status_code', 'N/A')}\n响应内容: {getattr(e.response, 'text', 'N/A')}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except ValueError as e: error_msg = f"配置错误: {str(e)}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except Exception as e: error_msg = f"处理错误: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )]
  • Input schema defining the expected arguments: 'message' (required string), 're_chat' (optional boolean, default False), 'stream' (optional boolean, default True).
    input_schema = { "type": "object", "required": ["message"], "properties": { "message": { "type": "string", "description": "要发送的消息内容", }, "re_chat": { "type": "boolean", "description": "是否重新开始对话", "default": False }, "stream": { "type": "boolean", "description": "是否使用流式响应", "default": True } }, }
  • Tool registration via @ToolRegistry.register decorator on MaxKbTool class, with name='maxkb' and description.
    @ToolRegistry.register class MaxKbTool(BaseTool): """MaxKB API请求工具""" name = "maxkb" description = "请求MaxKB API并返回处理后的结果"
  • Helper method to validate and retrieve required environment variables (MAXKB_HOST, MAXKB_CHAT_ID, MAXKB_APPLICATION_ID, MAXKB_AUTHORIZATION). Called within execute.
    def _check_env_variables(self): """检查必要的环境变量是否存在""" required_vars = [ 'MAXKB_HOST', 'MAXKB_CHAT_ID', 'MAXKB_APPLICATION_ID', 'MAXKB_AUTHORIZATION' ] env_values = {} missing_vars = [] for var in required_vars: value = os.getenv(var) if not value: missing_vars.append(var) else: env_values[var] = value if missing_vars: raise ValueError(f"缺少必要的环境变量: {', '.join(missing_vars)}") logger.debug(f"环境变量检查通过: {env_values}") return env_values

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aigo666/mcp-framework'

If you have feedback or need assistance with the MCP directory API, please join our Discord server