Skip to main content
Glama
aigo666

MCP Development Framework

maxkb

Query MaxKB API to process messages and manage conversation flow within the MCP Development Framework.

Instructions

请求MaxKB API并返回处理后的结果

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYes要发送的消息内容
re_chatNo是否重新开始对话
streamNo是否使用流式响应

Implementation Reference

  • The main asynchronous handler function that executes the MaxKB API request using httpx, handles streaming SSE responses, extracts content and reasoning_content, concatenates unique parts, and returns TextContent or error messages.
    async def execute(self, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        """执行API请求并处理返回结果"""
        try:
            # 等待服务器初始化完成
            logger.debug("等待服务器初始化...")
            await asyncio.sleep(2)
            
            logger.debug(f"收到请求参数: {arguments}")
            
            if "message" not in arguments:
                return [types.TextContent(
                    type="text",
                    text="错误: 缺少必要参数 'message'"
                )]
                
            # 检查环境变量
            env_vars = self._check_env_variables()
            
            # 准备请求参数
            url = f"{env_vars['MAXKB_HOST']}/api/application/chat_message/{env_vars['MAXKB_CHAT_ID']}"
            headers = {
                "accept": "application/json",
                "AUTHORIZATION": env_vars['MAXKB_AUTHORIZATION'],
                "Content-Type": "application/json"
            }
            data = {
                "message": arguments["message"],
                "re_chat": arguments.get("re_chat", False),
                "stream": arguments.get("stream", True)
            }
            
            logger.debug(f"准备发送请求到: {url}")
            logger.debug(f"请求头: {headers}")
            logger.debug(f"请求数据: {data}")
            
            try:
                # 发送请求
                logger.debug("开始创建HTTP客户端,超时设置为60秒...")
                limits = httpx.Limits(max_keepalive_connections=5, max_connections=10, keepalive_expiry=60.0)
                timeout = httpx.Timeout(
                    timeout=60.0,
                    connect=60.0,
                    read=60.0,
                    write=60.0,
                    pool=60.0
                )
                async with httpx.AsyncClient(
                    timeout=timeout,
                    limits=limits,
                    transport=httpx.AsyncHTTPTransport(
                        retries=1,
                        verify=False,
                        http1=True,
                        http2=False,
                        socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
                    )
                ) as client:
                    logger.debug("开始发送POST请求...")
                    try:
                        response = await client.post(url, headers=headers, json=data)
                        logger.debug(f"收到响应状态码: {response.status_code}")
                        logger.debug(f"响应头: {response.headers}")
                        
                        # 处理响应内容
                        content_parts = []
                        last_content = ""
                        received_data = []
                        
                        async for line in response.aiter_lines():
                            logger.debug(f"收到行: {line}")
                            if line.startswith('data: '):
                                try:
                                    # 解析JSON数据
                                    data = json.loads(line[6:])  # 去掉'data: '前缀
                                    logger.debug(f"解析到的数据: {data}")
                                    received_data.append(data)
                                    
                                    # 检查是否有非空的content
                                    if isinstance(data, dict):
                                        current_content = data.get("content", "")
                                        if current_content and current_content != last_content:
                                            last_content = current_content
                                            content_parts.append(current_content)
                                            logger.debug(f"添加新内容: {current_content}")
                                        
                                        # 检查reasoning_content
                                        reasoning_content = data.get("reasoning_content", "")
                                        if reasoning_content and reasoning_content != last_content:
                                            last_content = reasoning_content
                                            content_parts.append(reasoning_content)
                                            logger.debug(f"添加推理内容: {reasoning_content}")
                                            
                                except json.JSONDecodeError as e:
                                    logger.error(f"JSON解析错误: {e}, 行内容: {line}")
                                    continue  # 忽略无法解析的行
                        
                        # 拼接所有内容
                        result = ''.join(content_parts) if content_parts else ""
                        logger.debug(f"最终结果: {result}")
                        
                        if not result:
                            logger.warning("未获取到有效内容")
                            error_details = f"收到 {len(received_data)} 条数据"
                            if received_data:
                                error_details += "\n最后一条数据:\n" + json.dumps(received_data[-1], ensure_ascii=False, indent=2)
                            return [types.TextContent(
                                type="text",
                                text=f"请求错误: {error_details}"
                            )]
                        
                        return [types.TextContent(
                            type="text",
                            text=result
                        )]
                        
                    except httpx.TimeoutException as e:
                        logger.error(f"请求超时: {str(e)}")
                        logger.error(f"超时配置: {client.timeout}")
                        return [types.TextContent(
                            type="text",
                            text=f"请求超时(60秒): {str(e)}"
                        )]
                    except httpx.ConnectError as e:
                        logger.error(f"连接错误: {str(e)}")
                        logger.error(f"目标URL: {url}")
                        return [types.TextContent(
                            type="text",
                            text=f"连接错误({url}): {str(e)}"
                        )]
                    
                    try:
                        response.raise_for_status()
                    except httpx.HTTPStatusError as e:
                        logger.error(f"HTTP状态错误: {str(e)}")
                        logger.error(f"响应状态码: {e.response.status_code}")
                        logger.error(f"响应内容: {e.response.text}")
                        return [types.TextContent(
                            type="text",
                            text=f"HTTP状态错误: {str(e)}"
                        )]
                        
            except httpx.HTTPError as e:
                error_msg = f"HTTP请求错误: {str(e)}\n状态码: {getattr(e.response, 'status_code', 'N/A')}\n响应内容: {getattr(e.response, 'text', 'N/A')}"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
                
        except ValueError as e:
            error_msg = f"配置错误: {str(e)}"
            logger.error(error_msg)
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
        except Exception as e:
            error_msg = f"处理错误: {str(e)}\n{traceback.format_exc()}"
            logger.error(error_msg)
            return [types.TextContent(
                type="text",
                text=error_msg
            )] 
  • Input schema defining the expected arguments: 'message' (required string), 're_chat' (optional boolean, default False), 'stream' (optional boolean, default True).
    input_schema = {
        "type": "object",
        "required": ["message"],
        "properties": {
            "message": {
                "type": "string",
                "description": "要发送的消息内容",
            },
            "re_chat": {
                "type": "boolean",
                "description": "是否重新开始对话",
                "default": False
            },
            "stream": {
                "type": "boolean",
                "description": "是否使用流式响应",
                "default": True
            }
        },
    }
  • Tool registration via @ToolRegistry.register decorator on MaxKbTool class, with name='maxkb' and description.
    @ToolRegistry.register
    class MaxKbTool(BaseTool):
        """MaxKB API请求工具"""
        name = "maxkb"
        description = "请求MaxKB API并返回处理后的结果"
  • Helper method to validate and retrieve required environment variables (MAXKB_HOST, MAXKB_CHAT_ID, MAXKB_APPLICATION_ID, MAXKB_AUTHORIZATION). Called within execute.
    def _check_env_variables(self):
        """检查必要的环境变量是否存在"""
        required_vars = [
            'MAXKB_HOST',
            'MAXKB_CHAT_ID',
            'MAXKB_APPLICATION_ID',
            'MAXKB_AUTHORIZATION'
        ]
        env_values = {}
        missing_vars = []
        for var in required_vars:
            value = os.getenv(var)
            if not value:
                missing_vars.append(var)
            else:
                env_values[var] = value
                
        if missing_vars:
            raise ValueError(f"缺少必要的环境变量: {', '.join(missing_vars)}")
            
        logger.debug(f"环境变量检查通过: {env_values}")
        return env_values
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden. It vaguely mentions 'processed result' but doesn't disclose behavioral traits such as authentication needs, rate limits, error handling, or what 'processed' entails. The description lacks details on whether this is a read or write operation, making it inadequate for a tool with no annotation coverage.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, concise sentence in Chinese, which is efficient and front-loaded. However, it's overly vague and under-specified, bordering on unhelpful, which slightly reduces its effectiveness despite the brevity.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (3 parameters, no annotations, no output schema), the description is incomplete. It doesn't explain what the MaxKB API is, what it processes, or what the return values look like. Without annotations or an output schema, more context is needed to understand the tool's behavior and results.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the schema already documents all parameters (message, re_chat, stream) with clear descriptions. The description adds no additional meaning beyond what the schema provides, such as explaining how parameters interact or their impact on the API call. Baseline score of 3 is appropriate since the schema does the heavy lifting.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose2/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description '请求MaxKB API并返回处理后的结果' translates to 'Request MaxKB API and return processed result', which is a tautology that essentially restates the tool name 'maxkb' (MaxKB API). It doesn't specify what the API does, what kind of processing occurs, or what resources are involved. No distinction from sibling tools is provided.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is given on when to use this tool versus alternatives. The description doesn't mention any context, prerequisites, or exclusions. Sibling tools like parse_csv or url suggest this might be for data processing, but no explicit usage instructions are provided.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aigo666/mcp-framework'

If you have feedback or need assistance with the MCP directory API, please join our Discord server