pwm
Control PWM frequency for hardware devices through natural language commands, enabling precise adjustment of pulse-width modulation signals via TCP connections.
Instructions
把PWM调到最大 (frequency=100)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| frequency | Yes | Parameter frequency for the pwm command |
Implementation Reference
- src/mcp2tcp/server.py:58-107 (registration)Dynamically generates the list of available MCP tools, including 'pwm' if config has a 'CMD_PWM' command. Extracts tool name by removing 'CMD_' prefix and lowercasing, builds inputSchema from command parameters or placeholders in the command string.async def handle_list_tools() -> List[types.Tool]: """List available tools.""" if config is None: return [] tools = [] for cmd_id, command in config.commands.items(): # 从命令字符串中提取命令名(去掉CMD_前缀) cmd_name = command.command.split()[0].replace("CMD_", "").lower() # 构建参数描述 properties = {} required = [] # 使用配置文件中的参数定义 if hasattr(command, 'parameters'): for param in command.parameters: properties[param['name']] = { "type": param['type'], "description": param['description'], **({"enum": param['enum']} if 'enum' in param else {}) } if param.get('required', False): required.append(param['name']) else: # 如果没有参数定义,从命令字符串中提取 import re param_names = re.findall(r'\{(\w+)\}', command.command) for param_name in param_names: properties[param_name] = { "type": "string", "description": f"Parameter {param_name} for the {cmd_name} command", "examples": [p.format(**{param_name: "value"}) for p in command.prompts if "{" + param_name + "}" in p] } required.append(param_name) tool = types.Tool( name=cmd_name, # 使用命令名作为工具名 description=command.prompts[0] if command.prompts else f"Execute {cmd_name} command", inputSchema={ "type": "object", "properties": properties, "required": required, "additionalProperties": False } ) tools.append(tool) logger.debug(f"Registered tool: {cmd_name} with parameters: {properties}") return tools
- src/mcp2tcp/server.py:110-192 (handler)MCP tool call handler. For tool 'pwm', locates the corresponding config command (e.g., 'CMD_PWM {frequency}'), validates and formats arguments into the command string, sends to TCP device server, handles errors, and returns the response.async def handle_call_tool(name: str, arguments: Dict[str, Any] | None) -> List[types.TextContent]: """Handle tool execution requests.""" if config is None: return [types.TextContent( type="text", text="Error: Configuration not loaded" )] try: logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}") # 查找对应的命令 cmd_found = None for cmd_id, command in config.commands.items(): cmd_name = command.command.split()[0].replace("CMD_", "").lower() if cmd_name == name: cmd_found = command break if cmd_found is None: error_msg = f"Error: Unknown tool '{name}'\n" error_msg += "Please check:\n" error_msg += "1. Tool name is correct\n" error_msg += "2. Tool is configured in config.yaml" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] if arguments is None: arguments = {} # 验证必需的参数 import re param_names = re.findall(r'\{(\w+)\}', cmd_found.command) missing_params = [param for param in param_names if param not in arguments] if missing_params: error_msg = f"Error: Missing required parameters: {', '.join(missing_params)}\n" error_msg += "Please provide all required parameters." logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] # 发送命令并等待响应 try: response = tcp_connection.send_command(cmd_found, arguments) logger.debug(f"Command response: {response}") return response except ConnectionError as e: error_msg = f"Error: Connection failed - {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. TCP server is running\n" error_msg += "2. Connection settings are correct" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except TimeoutError as e: error_msg = f"Error: Command timeout - {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Device is responding\n" error_msg += "2. Timeout settings are appropriate" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except Exception as e: error_msg = f"Error: {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Configuration is correct\n" error_msg += "2. Device is functioning properly" logger.error(f"Unexpected error: {str(e)}", exc_info=True) return [types.TextContent( type="text", text=error_msg )]
- src/mcp2tcp/server.py:284-360 (helper)Helper function that executes the formatted PWM command over TCP: formats placeholders in command string with arguments (e.g., frequency), sends bytes, receives response, validates it starts with configured response string (default 'OK'), returns as MCP TextContent.def send_command(self, command: Command, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Send a command to the TCP server and return result according to MCP protocol.""" try: if not self.socket: if not self.connect(): return [types.TextContent( type="text", text=f"Failed to connect to TCP server at {self.remote_ip}:{self.port}" )] if command.data_type == "ascii": # 准备命令 cmd_str = command.command.format(**arguments) # 确保命令以\r\n结尾 cmd_str = cmd_str.rstrip() + '\r\n' # 移除可能的空白字符,强制添加\r\n command_bytes = cmd_str.encode() logger.info(f"Sending command: {cmd_str.strip()}") logger.info(f"Sent command: {command_bytes.strip().decode('ascii')}") self.socket.sendall(command_bytes) elif command.data_type == "hex": command_bytes = bytes.fromhex(command.command.replace(" ", "")) logger.info(f"Sent command: {command.command}") self.socket.sendall(command_bytes) self.socket.settimeout(self.receive_timeout) responses = [] while True: try: response = self.socket.recv(4096) if response: logger.debug(f"Received data: {response}") responses.append(response) if command.data_type == "ascii" and response.endswith(b'\r\n'): break elif command.data_type == "hex": break else: break except socket.timeout as e: logger.error(f"TCP receive timeout: {str(e)}") return [types.TextContent( type="text", text=f"TCP receive timeout: {str(e)}" )] if not responses: return [types.TextContent( type="text", text=f"No response received from TCP server within {self.receive_timeout} seconds" )] first_response = responses[0].decode().strip() logger.info(f"Received response: {first_response}") if self.response_start_string in first_response: return [types.TextContent( type="text", text=first_response )] else: return [types.TextContent( type="text", text=f"Invalid response: {first_response}" )] except socket.timeout as e: logger.error(f"TCP send timeout: {str(e)}") return [types.TextContent( type="text", text=f"TCP send timeout: {str(e)}" )] except Exception as e: logger.error(f"TCP error: {str(e)}") return [types.TextContent( type="text", text=f"TCP error: {str(e)}" )]
- tests/tcp_server.py:20-25 (helper)Mock device state handler for PWM in the test TCP server that receives 'CMD_PWM' commands from the MCP server. Sets internal pwm_frequency state and returns confirmation or error.def set_pwm(self, frequency): try: self.pwm_frequency = int(frequency) return f"CMD PWM frequency set to {frequency}Hz\r\n" except ValueError: return f"CMD Error: Invalid frequency value\r\n"