set_pwm
Control PWM signal frequency on hardware devices via MCP2Serial server by specifying the desired frequency. Enables precise hardware modulation through natural language commands.
Instructions
Execute set_pwm command
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| frequency | Yes |
Implementation Reference
- src/mcp2serial/server.py:340-363 (registration)Dynamically registers the 'set_pwm' MCP tool (among others) by parsing the command template from config.yaml to generate input schema and description.@server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools for the MCP service.""" logger.info("Listing available tools") tools = [] for cmd_id, command in config.commands.items(): # 从命令字符串中提取参数名 import re param_names = re.findall(r'\{(\w+)\}', command.command) properties = {name: {"type": "string"} for name in param_names} tools.append(types.Tool( name=cmd_id, description=f"Execute {cmd_id} command", inputSchema={ "type": "object", "properties": properties, "required": param_names }, prompts=command.prompts )) return tools
- src/mcp2serial/server.py:366-398 (handler)The MCP tool handler function that executes the 'set_pwm' tool by retrieving its command config and sending it to the serial device.async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool execution requests according to MCP protocol.""" logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}") try: if name not in config.commands: error_msg = f"[MCP2Serial v{VERSION}] Error: Unknown tool '{name}'\n" error_msg += "Please check:\n" error_msg += "1. Tool name is correct\n" error_msg += "2. Tool is configured in config.yaml" return [types.TextContent( type="text", text=error_msg )] command = config.commands[name] if arguments is None: arguments = {} # 发送命令并返回 MCP 格式的响应 return serial_connection.send_command(command, arguments) except Exception as e: logger.error(f"Error handling tool call: {str(e)}") error_msg = f"[MCP2Serial v{VERSION}] Error: {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Configuration is correct\n" error_msg += "2. Device is functioning properly" return [types.TextContent( type="text", text=error_msg )]
- src/mcp2serial/server.py:206-304 (helper)Helper method in SerialConnection that formats and sends the 'set_pwm' command over serial, reads response, and formats MCP TextContent response.def send_command(self, command: Command, arguments: Dict[str, Any]) -> list[types.TextContent]: """Send a command to the serial port and return result according to MCP protocol.""" try: # 确保连接 if not self.is_loopback and (not self.serial_port or not self.serial_port.is_open): logger.info("No active connection, attempting to connect...") if not self.connect(): error_msg = f"[MCP2Serial v{VERSION}] Failed to establish serial connection.\n" error_msg += "Please check:\n" error_msg += "1. Serial port is correctly configured in config.yaml\n" error_msg += "2. Device is properly connected\n" error_msg += "3. No other program is using the port" return [types.TextContent( type="text", text=error_msg )] # 准备命令 cmd_str = command.command.format(**arguments) # 确保命令以\r\n结尾 cmd_str = cmd_str.rstrip() + '\r\n' # 移除可能的空白字符,强制添加\r\n cmd_bytes = cmd_str.encode() logger.info(f"Sending command: {cmd_str.strip()}") logger.info(f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}") if self.is_loopback: # 回环模式:直接返回发送的命令和OK响应 responses = [ cmd_str.encode(), # 命令回显 f"{config.response_start_string}\r\n".encode() # OK响应 ] else: # 清空缓冲区 self.serial_port.reset_input_buffer() self.serial_port.reset_output_buffer() # 发送命令 bytes_written = self.serial_port.write(cmd_bytes) logger.info(f"Wrote {bytes_written} bytes") self.serial_port.flush() # 等待一段时间确保命令被处理 time.sleep(0.1) # 读取所有响应 responses = [] while self.serial_port.in_waiting: response = self.serial_port.readline() logger.info(f"Raw response: {response}") if response: responses.append(response) if not responses: logger.error("No response received within timeout") error_msg = f"[MCP2Serial v{VERSION}] Command timeout - no response within {self.read_timeout} second(s)\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Please check:\n" error_msg += "1. Device is powered and responding\n" error_msg += "2. Baud rate matches device settings\n" error_msg += "3. Serial connection is stable\n" return [types.TextContent( type="text", text=error_msg )] # 解码第一行响应 first_response = responses[0] first_line = first_response.decode().strip() logger.info(f"Decoded first response: {first_line}") # 检查是否有第二行响应 if len(responses) > 1: second_response = responses[1] if second_response.startswith(config.response_start_string.encode()): # 使用配置的应答开始字符串 if command.need_parse: return [types.TextContent( type="text", text=second_response.decode().strip() )] return [] # 如果响应不是预期的格式,返回详细的错误信息 error_msg = f"[MCP2Serial v{VERSION}] Command execution failed.\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Responses received:\n" for i, resp in enumerate(responses, 1): error_msg += f"{i}. Raw: {resp!r}\n Decoded: {resp.decode().strip()}\n" error_msg += "\nPossible reasons:\n" error_msg += f"- Device echoed the command but did not send {config.response_start_string} response\n" error_msg += "- Command format may be incorrect\n" error_msg += "- Device may be in wrong mode\n" return [types.TextContent( type="text", text=error_msg )]
- src/mcp2serial/server.py:74-129 (helper)Loads the configuration including the 'set_pwm' command definition from config.yaml into config.commands['set_pwm']def load(config_path: str = "config.yaml") -> 'Config': """Load configuration from YAML file.""" # 获取配置文件名 config_name = os.path.basename(config_path) # 定义可能的配置文件位置 config_paths = [ config_path, # 首先检查指定的路径 os.path.join(os.getcwd(), config_name), # 当前工作目录 os.path.expanduser(f"~/.mcp2serial/{config_name}"), # 用户主目录 ] # 添加系统级目录 if os.name == 'nt': # Windows config_paths.append(os.path.join(os.environ.get("ProgramData", "C:\\ProgramData"), "mcp2serial", config_name)) else: # Linux/Mac config_paths.append(f"/etc/mcp2serial/{config_name}") # 尝试从每个位置加载配置 for path in config_paths: if os.path.exists(path): try: with open(path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) logger.info(f"Loading configuration from {path}") # Load serial configuration serial_config = config_data.get('serial', {}) config = Config( port=serial_config.get('port'), baud_rate=serial_config.get('baud_rate', 115200), timeout=serial_config.get('timeout', 1.0), read_timeout=serial_config.get('read_timeout', 1.0), response_start_string=serial_config.get('response_start_string', 'OK') # 新增:加载应答开始字符串 ) # Load commands commands_data = config_data.get('commands', {}) for cmd_id, cmd_data in commands_data.items(): raw_command = cmd_data.get('command', '') logger.debug(f"Loading command {cmd_id}: {repr(raw_command)}") config.commands[cmd_id] = Command( command=raw_command, need_parse=cmd_data.get('need_parse', False), prompts=cmd_data.get('prompts', []) ) logger.debug(f"Loaded command {cmd_id}: {repr(config.commands[cmd_id].command)}") return config except Exception as e: logger.warning(f"Error loading config from {path}: {e}") continue logger.info("No valid config file found, using defaults") return Config()
- firmware/src/main.py:94-106 (handler)Firmware-side handler that receives and executes the PWM (set_pwm) command over serial, updates LED duty cycle.if user_input.startswith("PWM"): try: duty_value = float(user_input.split(" ")[1]) # 检查占空比是否在 0 到 100 范围内 if 0 <= duty_value <= 100: duty = int(duty_value) # 更新占空比例值 print("OK") else: print("NG") except (IndexError, ValueError): print("NG")