Skip to main content
Glama
aigo666

MCP Development Framework

maxkb

Send messages to the MaxKB API and receive processed results with optional re-chat and streaming support.

Instructions

请求MaxKB API并返回处理后的结果

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
messageYes要发送的消息内容
re_chatNo是否重新开始对话
streamNo是否使用流式响应

Implementation Reference

  • The MaxKbTool class with the execute() method (lines 72-234) that performs the actual MaxKB API call. It sends POST requests to the MaxKB API endpoint, handles streaming responses, and returns results.
    import os
    import traceback
    import mcp.types as types
    from . import BaseTool, ToolRegistry
    import logging
    import asyncio
    import socket
    
    # 设置日志
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger(__name__)
    
    # 设置httpcore的超时
    os.environ['HTTPCORE_TIMEOUT'] = '60'
    os.environ['HTTPX_TIMEOUT'] = '60'
    
    @ToolRegistry.register
    class MaxKbTool(BaseTool):
        """MaxKB API请求工具"""
        name = "maxkb"
        description = "请求MaxKB API并返回处理后的结果"
        input_schema = {
            "type": "object",
            "required": ["message"],
            "properties": {
                "message": {
                    "type": "string",
                    "description": "要发送的消息内容",
                },
                "re_chat": {
                    "type": "boolean",
                    "description": "是否重新开始对话",
                    "default": False
                },
                "stream": {
                    "type": "boolean",
                    "description": "是否使用流式响应",
                    "default": True
                }
            },
        }
        
        def _check_env_variables(self):
            """检查必要的环境变量是否存在"""
            required_vars = [
                'MAXKB_HOST',
                'MAXKB_CHAT_ID',
                'MAXKB_APPLICATION_ID',
                'MAXKB_AUTHORIZATION'
            ]
            env_values = {}
            missing_vars = []
            for var in required_vars:
                value = os.getenv(var)
                if not value:
                    missing_vars.append(var)
                else:
                    env_values[var] = value
                    
            if missing_vars:
                raise ValueError(f"缺少必要的环境变量: {', '.join(missing_vars)}")
                
            logger.debug(f"环境变量检查通过: {env_values}")
            return env_values
        
        async def execute(self, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
            """执行API请求并处理返回结果"""
            try:
                # 等待服务器初始化完成
                logger.debug("等待服务器初始化...")
                await asyncio.sleep(2)
                
                logger.debug(f"收到请求参数: {arguments}")
                
                if "message" not in arguments:
                    return [types.TextContent(
                        type="text",
                        text="错误: 缺少必要参数 'message'"
                    )]
                    
                # 检查环境变量
                env_vars = self._check_env_variables()
                
                # 准备请求参数
                url = f"{env_vars['MAXKB_HOST']}/api/application/chat_message/{env_vars['MAXKB_CHAT_ID']}"
                headers = {
                    "accept": "application/json",
                    "AUTHORIZATION": env_vars['MAXKB_AUTHORIZATION'],
                    "Content-Type": "application/json"
                }
                data = {
                    "message": arguments["message"],
                    "re_chat": arguments.get("re_chat", False),
                    "stream": arguments.get("stream", True)
                }
                
                logger.debug(f"准备发送请求到: {url}")
                logger.debug(f"请求头: {headers}")
                logger.debug(f"请求数据: {data}")
                
                try:
                    # 发送请求
                    logger.debug("开始创建HTTP客户端,超时设置为60秒...")
                    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10, keepalive_expiry=60.0)
                    timeout = httpx.Timeout(
                        timeout=60.0,
                        connect=60.0,
                        read=60.0,
                        write=60.0,
                        pool=60.0
                    )
                    async with httpx.AsyncClient(
                        timeout=timeout,
                        limits=limits,
                        transport=httpx.AsyncHTTPTransport(
                            retries=1,
                            verify=False,
                            http1=True,
                            http2=False,
                            socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
                        )
                    ) as client:
                        logger.debug("开始发送POST请求...")
                        try:
                            response = await client.post(url, headers=headers, json=data)
                            logger.debug(f"收到响应状态码: {response.status_code}")
                            logger.debug(f"响应头: {response.headers}")
                            
                            # 处理响应内容
                            content_parts = []
                            last_content = ""
                            received_data = []
                            
                            async for line in response.aiter_lines():
                                logger.debug(f"收到行: {line}")
                                if line.startswith('data: '):
                                    try:
                                        # 解析JSON数据
                                        data = json.loads(line[6:])  # 去掉'data: '前缀
                                        logger.debug(f"解析到的数据: {data}")
                                        received_data.append(data)
                                        
                                        # 检查是否有非空的content
                                        if isinstance(data, dict):
                                            current_content = data.get("content", "")
                                            if current_content and current_content != last_content:
                                                last_content = current_content
                                                content_parts.append(current_content)
                                                logger.debug(f"添加新内容: {current_content}")
                                            
                                            # 检查reasoning_content
                                            reasoning_content = data.get("reasoning_content", "")
                                            if reasoning_content and reasoning_content != last_content:
                                                last_content = reasoning_content
                                                content_parts.append(reasoning_content)
                                                logger.debug(f"添加推理内容: {reasoning_content}")
                                                
                                    except json.JSONDecodeError as e:
                                        logger.error(f"JSON解析错误: {e}, 行内容: {line}")
                                        continue  # 忽略无法解析的行
                            
                            # 拼接所有内容
                            result = ''.join(content_parts) if content_parts else ""
                            logger.debug(f"最终结果: {result}")
                            
                            if not result:
                                logger.warning("未获取到有效内容")
                                error_details = f"收到 {len(received_data)} 条数据"
                                if received_data:
                                    error_details += "\n最后一条数据:\n" + json.dumps(received_data[-1], ensure_ascii=False, indent=2)
                                return [types.TextContent(
                                    type="text",
                                    text=f"请求错误: {error_details}"
                                )]
                            
                            return [types.TextContent(
                                type="text",
                                text=result
                            )]
                            
                        except httpx.TimeoutException as e:
                            logger.error(f"请求超时: {str(e)}")
                            logger.error(f"超时配置: {client.timeout}")
                            return [types.TextContent(
                                type="text",
                                text=f"请求超时(60秒): {str(e)}"
                            )]
                        except httpx.ConnectError as e:
                            logger.error(f"连接错误: {str(e)}")
                            logger.error(f"目标URL: {url}")
                            return [types.TextContent(
                                type="text",
                                text=f"连接错误({url}): {str(e)}"
                            )]
                        
                        try:
                            response.raise_for_status()
                        except httpx.HTTPStatusError as e:
                            logger.error(f"HTTP状态错误: {str(e)}")
                            logger.error(f"响应状态码: {e.response.status_code}")
                            logger.error(f"响应内容: {e.response.text}")
                            return [types.TextContent(
                                type="text",
                                text=f"HTTP状态错误: {str(e)}"
                            )]
                            
                except httpx.HTTPError as e:
                    error_msg = f"HTTP请求错误: {str(e)}\n状态码: {getattr(e.response, 'status_code', 'N/A')}\n响应内容: {getattr(e.response, 'text', 'N/A')}"
                    logger.error(error_msg)
                    return [types.TextContent(
                        type="text",
                        text=error_msg
                    )]
                    
            except ValueError as e:
                error_msg = f"配置错误: {str(e)}"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
            except Exception as e:
                error_msg = f"处理错误: {str(e)}\n{traceback.format_exc()}"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )] 
  • Input schema for the maxkb tool defining required 'message' (string) and optional 're_chat' (boolean) and 'stream' (boolean) parameters.
    input_schema = {
        "type": "object",
        "required": ["message"],
        "properties": {
            "message": {
                "type": "string",
                "description": "要发送的消息内容",
            },
            "re_chat": {
                "type": "boolean",
                "description": "是否重新开始对话",
                "default": False
            },
            "stream": {
                "type": "boolean",
                "description": "是否使用流式响应",
                "default": True
            }
        },
    }
  • The @ToolRegistry.register decorator registers the MaxKbTool class with name='maxkb' and description='请求MaxKB API并返回处理后的结果'.
    @ToolRegistry.register
    class MaxKbTool(BaseTool):
        """MaxKB API请求工具"""
        name = "maxkb"
        description = "请求MaxKB API并返回处理后的结果"
  • The ToolRegistry class that manages tool registration via the @register decorator, and provides list_tools/get_tool/has_tool methods used by the server.
    class ToolRegistry:
        """工具注册器,用于管理所有可用工具"""
        _tools: Dict[str, Type[BaseTool]] = {}
        
        @classmethod
        def register(cls, tool_class: Type[BaseTool]) -> Type[BaseTool]:
            """注册工具"""
            cls._tools[tool_class.name] = tool_class
            return tool_class
        
        @classmethod
        def get_tool(cls, name: str) -> Type[BaseTool]:
            """获取工具类"""
            if name not in cls._tools:
                raise ValueError(f"Unknown tool: {name}")
            return cls._tools[name]
        
        @classmethod
        def list_tools(cls) -> List[types.Tool]:
            """列出所有可用工具"""
            return [tool_class.get_tool_definition() for tool_class in cls._tools.values()]
        
        @classmethod
        def has_tool(cls, name: str) -> bool:
            """检查工具是否存在"""
            return name in cls._tools 
  • The load_tools() function that auto-discovers and imports all tool modules (including maxkb_tool.py) from the tools directory, triggering their registration.
    def load_tools() -> List[Type[BaseTool]]:
        """
        自动加载tools目录下的所有工具模块
        
        Returns:
            List[Type[BaseTool]]: 已加载的工具类列表
        """
        # 获取当前模块的路径
        package_path = os.path.dirname(__file__)
        
        # 获取所有子模块
        for _, name, is_pkg in pkgutil.iter_modules([package_path]):
            # 跳过__init__.py和loader.py
            if name in ['__init__', 'loader']:
                continue
            
            # 导入模块
            module_name = f"{__package__}.{name}"
            try:
                importlib.import_module(module_name)
            except ImportError as e:
                print(f"Warning: Failed to import module {module_name}: {e}")
        
        # 收集所有已注册的工具类
        tools = list(ToolRegistry._tools.values())
        
        return tools
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries full burden. It only says the tool returns a processed result, but fails to disclose critical behavioral traits such as destructive potential, authentication needs, or effects of parameters like re_chat and stream.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single short sentence with no wasted words. It is concise but lacks structure; however, for a simple tool, brevity is acceptable.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given three parameters, no output schema, and no annotations, the description should provide more context about the API call behavior, return format, and expected usage. It does not, making the tool underdocumented for an agent to use correctly.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100% with all three parameters described in Chinese. The tool description adds no additional meaning beyond the schema, which already explains parameters. Baseline score of 3 is appropriate as schema covers parameter semantics.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool requests the MaxKB API and returns a processed result, providing a specific verb and resource. However, it does not explain what MaxKB is or how it differs from sibling parsing tools, which limits differentiation.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description offers no guidance on when to use this tool versus alternatives or when not to use it. It does not mention prerequisites, context, or related tools, leaving the agent to infer usage from the name alone.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aigo666/mcp-framework'

If you have feedback or need assistance with the MCP directory API, please join our Discord server