Skip to main content
Glama

set_pwm

Control hardware pulse width modulation (PWM) signals by setting the frequency through the MCP2Serial server, enabling precise device operation via natural language commands.

Instructions

Execute set_pwm command

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
frequencyYes

Implementation Reference

  • Dynamically registers all tools from config.commands, including 'set_pwm' as a Tool with name='set_pwm', schema based on '{frequency}' in command template, and prompts from config.
    @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools for the MCP service.""" logger.info("Listing available tools") tools = [] for cmd_id, command in config.commands.items(): # 从命令字符串中提取参数名 import re param_names = re.findall(r'\{(\w+)\}', command.command) properties = {name: {"type": "string"} for name in param_names} tools.append(types.Tool( name=cmd_id, description=f"Execute {cmd_id} command", inputSchema={ "type": "object", "properties": properties, "required": param_names }, prompts=command.prompts ))
  • Generates JSON schema for 'set_pwm' input: object with 'frequency' property (type string), required.
    inputSchema={ "type": "object", "properties": properties, "required": param_names },
  • MCP tool handler that for name='set_pwm' retrieves command from config, formats with arguments (e.g. {'frequency': '50'}), sends via serial_connection.send_command, returns TextContent responses.
    @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool execution requests according to MCP protocol.""" logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}") try: if name not in config.commands: error_msg = f"[MCP2Serial v{VERSION}] Error: Unknown tool '{name}'\n" error_msg += "Please check:\n" error_msg += "1. Tool name is correct\n" error_msg += "2. Tool is configured in config.yaml" return [types.TextContent( type="text", text=error_msg )] command = config.commands[name] if arguments is None: arguments = {} # 发送命令并返回 MCP 格式的响应 return serial_connection.send_command(command, arguments) except Exception as e: logger.error(f"Error handling tool call: {str(e)}") error_msg = f"[MCP2Serial v{VERSION}] Error: {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Configuration is correct\n" error_msg += "2. Device is functioning properly" return [types.TextContent( type="text", text=error_msg )]
  • SerialConnection.send_command formats and sends the serial command (e.g. 'PWM 50\r\n' for set_pwm), reads lines until timeout, returns parsed response or error message as MCP TextContent.
    def send_command(self, command: Command, arguments: Dict[str, Any]) -> list[types.TextContent]: """Send a command to the serial port and return result according to MCP protocol.""" try: # 确保连接 if not self.is_loopback and (not self.serial_port or not self.serial_port.is_open): logger.info("No active connection, attempting to connect...") if not self.connect(): error_msg = f"[MCP2Serial v{VERSION}] Failed to establish serial connection.\n" error_msg += "Please check:\n" error_msg += "1. Serial port is correctly configured in config.yaml\n" error_msg += "2. Device is properly connected\n" error_msg += "3. No other program is using the port" return [types.TextContent( type="text", text=error_msg )] # 准备命令 cmd_str = command.command.format(**arguments) # 确保命令以\r\n结尾 cmd_str = cmd_str.rstrip() + '\r\n' # 移除可能的空白字符,强制添加\r\n cmd_bytes = cmd_str.encode() logger.info(f"Sending command: {cmd_str.strip()}") logger.info(f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}") if self.is_loopback: # 回环模式:直接返回发送的命令和OK响应 responses = [ cmd_str.encode(), # 命令回显 f"{config.response_start_string}\r\n".encode() # OK响应 ] else: # 清空缓冲区 self.serial_port.reset_input_buffer() self.serial_port.reset_output_buffer() # 发送命令 bytes_written = self.serial_port.write(cmd_bytes) logger.info(f"Wrote {bytes_written} bytes") self.serial_port.flush() # 等待一段时间确保命令被处理 time.sleep(0.1) # 读取所有响应 responses = [] while self.serial_port.in_waiting: response = self.serial_port.readline() logger.info(f"Raw response: {response}") if response: responses.append(response) if not responses: logger.error("No response received within timeout") error_msg = f"[MCP2Serial v{VERSION}] Command timeout - no response within {self.read_timeout} second(s)\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Please check:\n" error_msg += "1. Device is powered and responding\n" error_msg += "2. Baud rate matches device settings\n" error_msg += "3. Serial connection is stable\n" return [types.TextContent( type="text", text=error_msg )] # 解码第一行响应 first_response = responses[0] first_line = first_response.decode().strip() logger.info(f"Decoded first response: {first_line}") # 检查是否有第二行响应 if len(responses) > 1: second_response = responses[1] if second_response.startswith(config.response_start_string.encode()): # 使用配置的应答开始字符串 if command.need_parse: return [types.TextContent( type="text", text=second_response.decode().strip() )] return [] # 如果响应不是预期的格式,返回详细的错误信息 error_msg = f"[MCP2Serial v{VERSION}] Command execution failed.\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Responses received:\n" for i, resp in enumerate(responses, 1): error_msg += f"{i}. Raw: {resp!r}\n Decoded: {resp.decode().strip()}\n" error_msg += "\nPossible reasons:\n" error_msg += f"- Device echoed the command but did not send {config.response_start_string} response\n" error_msg += "- Command format may be incorrect\n" error_msg += "- Device may be in wrong mode\n" return [types.TextContent( type="text", text=error_msg )]
  • Firmware-side implementation: parses 'PWM <duty>' commands from serial, validates duty 0-100, updates global duty for software PWM LED timer, responds 'OK' or 'NG'.
    if user_input.startswith("PWM"): try: duty_value = float(user_input.split(" ")[1]) # 检查占空比是否在 0 到 100 范围内 if 0 <= duty_value <= 100: duty = int(duty_value) # 更新占空比例值 print("OK") else: print("NG") except (IndexError, ValueError): print("NG")

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcp2everything/mcp2serial'

If you have feedback or need assistance with the MCP directory API, please join our Discord server