mcp2mqtt
- mcp2mqtt
- tests
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MQTT应答程序
用于接收MQTT消息并按照配置的格式进行应答
基于配置文件自动处理三种类型的命令:
PWM控制命令
PICO信息查询命令
LED控制命令
响应格式:
PWM命令:CMD PWM {value} OK
INFO命令:CMD INFO Device:Pico Status:Running OK
LED命令:CMD LED {state} OK
"""
import paho.mqtt.client as mqtt
import time
import logging
import yaml
import os
from typing import Dict, Any
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MQTTResponder:
"""MQTT应答程序类"""
def __init__(self, config_path: str = "../config.yaml"):
"""初始化MQTT应答程序
Args:
config_path: 配置文件路径
"""
self.config = self.load_config(config_path)
self.client = mqtt.Client(client_id=f"{self.config['mqtt']['client_id']}_responder")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.connected = False
def load_config(self, config_path: str) -> Dict[str, Any]:
"""加载配置文件"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Error loading config: {e}")
raise
def connect(self) -> bool:
"""连接到MQTT服务器"""
try:
if self.config['mqtt']['username']:
self.client.username_pw_set(
self.config['mqtt']['username'],
self.config['mqtt']['password']
)
self.client.connect(
self.config['mqtt']['broker'],
self.config['mqtt']['port'],
self.config['mqtt']['keepalive']
)
self.client.loop_start()
logger.info(f"Connected to MQTT broker at {self.config['mqtt']['broker']}")
return True
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
self.connected = True
logger.info("Connected to MQTT broker successfully")
# 订阅所有命令主题
for cmd_name, cmd in self.config['commands'].items():
if 'mqtt_topic' in cmd:
self.client.subscribe(cmd['mqtt_topic'])
logger.info(f"Subscribed to topic: {cmd['mqtt_topic']}")
else:
logger.error(f"Failed to connect to MQTT broker with result code: {rc}")
def on_message(self, client, userdata, msg):
"""消息回调函数"""
try:
payload = msg.payload.decode()
logger.info(f"Received message on topic {msg.topic}: {payload}")
# 查找对应的命令配置
for cmd_name, cmd in self.config['commands'].items():
if cmd['mqtt_topic'] == msg.topic:
# 生成响应
response = self.generate_response(cmd_name, payload)
# 发送响应
if response and 'response_topic' in cmd:
self.client.publish(cmd['response_topic'], response)
logger.info(f"Sent response to topic {cmd['response_topic']}: {response}")
break
except Exception as e:
logger.error(f"Error processing message: {e}")
def generate_response(self, cmd_name: str, payload: str) -> str:
"""生成响应消息"""
try:
cmd_config = self.config['commands'][cmd_name]
response_start = self.config['mqtt']['response_start_string']
if cmd_name == 'set_pwm':
# 解析PWM值
try:
value = int(payload.split()[-1])
return f"{response_start} PWM {value} OK"
except:
return f"{response_start} PWM ERROR"
elif cmd_name == 'get_pico_info':
# 返回设备信息
return f"{response_start} INFO Device:Pico Status:Running OK"
elif cmd_name == 'led_control':
# 解析LED状态
state = payload.split()[-1].lower()
if state in ['on', 'off']:
return f"{response_start} LED {state} OK"
return f"{response_start} LED ERROR"
return None
except Exception as e:
logger.error(f"Error generating response: {e}")
return None
def on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
self.connected = False
logger.warning(f"Disconnected from MQTT broker with result code: {rc}")
def close(self):
"""关闭连接"""
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
def main():
"""主函数"""
import signal
import sys
def signal_handler(sig, frame):
logger.info("Received signal to stop...")
responder.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 获取配置文件路径
config_path = '../config.yaml'
if not os.path.isfile(config_path):
config_path = os.path.join(os.path.dirname(__file__), '..', '/', 'config.yaml')
# 创建并启动应答程序
responder = MQTTResponder(config_path)
if responder.connect():
logger.info("MQTT Responder is running. Press CTRL+C to stop.")
signal.pause()
else:
logger.error("Failed to start MQTT Responder")
sys.exit(1)
if __name__ == "__main__":
main()