get_pico_info
Retrieve information from Pico development boards to enable AI-driven IoT control through MQTT integration, allowing natural language commands and real-time device management.
Instructions
获取Pico开发板信息
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/mcp2mqtt/server.py:290-292 (handler)Executes the 'get_pico_info' tool by preparing the 'INFO' message to be sent over MQTT to retrieve Pico device information.elif name == "get_pico_info": message = "INFO"
- src/mcp2mqtt/server.py:215-243 (registration)Registers and lists all available tools including 'get_pico_info' dynamically from the configuration, defining their schemas.@server.list_tools() async def handle_list_tools() -> List[types.Tool]: """List available tools.""" tools = [] for tool_name, tool_config in config.tools.items(): tools.append( types.Tool( name=tool_config.name, description=tool_config.description, inputSchema={ "type": "object", "properties": { param["name"]: { "type": param["type"], "description": param["description"], **({"enum": param["enum"]} if "enum" in param else {}) } for param in tool_config.parameters }, "required": [ param["name"] for param in tool_config.parameters if param.get("required", False) ] } ) ) return tools
- src/mcp2mqtt/server.py:215-243 (schema)Dynamically generates the input schema for 'get_pico_info' (likely empty parameters) from the loaded configuration.@server.list_tools() async def handle_list_tools() -> List[types.Tool]: """List available tools.""" tools = [] for tool_name, tool_config in config.tools.items(): tools.append( types.Tool( name=tool_config.name, description=tool_config.description, inputSchema={ "type": "object", "properties": { param["name"]: { "type": param["type"], "description": param["description"], **({"enum": param["enum"]} if "enum" in param else {}) } for param in tool_config.parameters }, "required": [ param["name"] for param in tool_config.parameters if param.get("required", False) ] } ) ) return tools
- src/mcp2mqtt/server.py:244-327 (handler)The main @server.call_tool() handler that dispatches to the specific logic for 'get_pico_info' and sends MQTT message.@server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any] | None) -> List[types.TextContent | types.ImageContent]: """Handle tool execution requests.""" try: logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}") # 检查工具是否存在 if name not in config.tools: return [types.TextContent( type="text", text=f"Error: Tool {name} not found" )] tool_config = config.tools[name] # 验证参数 if arguments is None: arguments = {} # 检查必需参数 for param in tool_config.parameters: if param.get('required', False) and param['name'] not in arguments: return [types.TextContent( type="text", text=f"Error: Missing required parameter {param['name']}" )] # 验证枚举值 if 'enum' in param and param['name'] in arguments: if arguments[param['name']] not in param['enum']: return [types.TextContent( type="text", text=f"Error: Invalid value for {param['name']}" )] # 准备消息 message = None if name == "set_pwm": frequency = arguments.get("frequency", 0) if not (0 <= frequency <= 100): return [types.TextContent( type="text", text="Error: Frequency must be between 0 and 100" )] message = f"PWM {frequency}" elif name == "get_pico_info": message = "INFO" elif name == "led_control": state = arguments.get("state", "").lower() if state not in ["on", "off"]: return [types.TextContent( type="text", text="Error: State must be 'on' or 'off'" )] message = f"LED {state}" else: return [types.TextContent( type="text", text=f"Error: Unknown tool {name}" )] # 发送消息并等待响应 mqtt_connection = MQTTConnection(config) response = await mqtt_connection.connect_and_send( topic=tool_config.mqtt_topic, message=message, response_topic=tool_config.response_topic ) return [types.TextContent( type="text", text=response if response else f"{config.mqtt_response_start_string} {message} OK" )] except Exception as e: logger.error(f"Error handling tool call: {e}") return [types.TextContent( type="text", text=f"Error: {str(e)}" )]