mcp2mqtt
- mcp2mqtt
- tests
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MQTT测试服务器
用于测试mcp2mqtt的MQTT功能
"""
import paho.mqtt.client as mqtt
import time
import json
import logging
import yaml
import os
from typing import Dict, Any
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MQTTTestServer:
"""MQTT测试服务器类"""
def __init__(self, config_path: str = "../config.yaml"):
"""初始化MQTT测试服务器
Args:
config_path: 配置文件路径
"""
self.config = self.load_config(config_path)
self.client = mqtt.Client(client_id=f"{self.config['mqtt']['client_id']}_server")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.connected = False
# 存储最后收到的消息
self.last_message = {
'topic': None,
'payload': None,
'timestamp': None
}
def load_config(self, config_path: str) -> Dict[str, Any]:
"""加载配置文件
Args:
config_path: 配置文件路径
Returns:
配置字典
"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Error loading config: {e}")
raise
def connect(self) -> bool:
"""连接到MQTT服务器
Returns:
bool: 连接是否成功
"""
try:
# 如果配置了用户名和密码,则设置认证
if self.config['mqtt']['username']:
self.client.username_pw_set(
self.config['mqtt']['username'],
self.config['mqtt']['password']
)
# 连接到服务器
self.client.connect(
self.config['mqtt']['broker'],
self.config['mqtt']['port'],
self.config['mqtt']['keepalive']
)
# 启动循环
self.client.loop_start()
logger.info(f"Connected to MQTT broker at {self.config['mqtt']['broker']}")
return True
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
self.connected = True
logger.info("Connected to MQTT broker successfully")
# 订阅所有命令的响应主题
topics = []
for cmd_name, cmd in self.config['commands'].items():
if 'mqtt_topic' in cmd:
topics.append(cmd['mqtt_topic'])
if 'response_topic' in cmd:
topics.append(cmd['response_topic'])
for topic in topics:
self.client.subscribe(topic)
logger.info(f"Subscribed to topic: {topic}")
else:
logger.error(f"Failed to connect to MQTT broker with result code: {rc}")
def on_message(self, client, userdata, msg):
"""消息回调函数"""
try:
payload = msg.payload.decode()
logger.info(f"Received message on topic {msg.topic}: {payload}")
# 存储接收到的消息
self.last_message = {
'topic': msg.topic,
'payload': payload,
'timestamp': time.time()
}
# 发送响应消息
response_topic = None
if msg.topic.endswith('/command'):
response_topic = msg.topic.replace('/command', '/response')
elif msg.topic.endswith('/status'):
response_topic = msg.topic.replace('/status', '/control')
if response_topic:
response = {
'original_topic': msg.topic,
'original_message': payload,
'status': 'received',
'timestamp': time.time()
}
self.client.publish(response_topic, json.dumps(response))
logger.info(f"Sent response to topic {response_topic}")
except Exception as e:
logger.error(f"Error processing message: {e}")
def on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
self.connected = False
logger.warning(f"Disconnected from MQTT broker with result code: {rc}")
def publish_test_message(self, topic: str, message: str):
"""发布测试消息
Args:
topic: 主题
message: 消息内容
"""
if not self.connected:
logger.error("Not connected to MQTT broker")
return False
try:
result = self.client.publish(topic, message)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"Published test message to {topic}: {message}")
return True
else:
logger.error(f"Failed to publish message: {result.rc}")
return False
except Exception as e:
logger.error(f"Error publishing message: {e}")
return False
def close(self):
"""关闭连接"""
if self.connected:
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
def main():
"""主函数"""
# 获取配置文件的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "..", "config.yaml")
# 创建测试服务器实例
server = MQTTTestServer(config_path)
try:
# 连接到MQTT服务器
if not server.connect():
logger.error("Failed to connect to MQTT broker")
return
# 发送测试消息到每个命令的主题
test_messages = [
{
'topic': 'mcp2mqtt/pwm',
'message': 'PWM 100' # 设置PWM到最大
},
{
'topic': 'mcp2mqtt/pwm',
'message': 'PWM 0' # 关闭PWM
},
{
'topic': 'mcp2mqtt/info',
'message': 'INFO' # 查询信息
},
{
'topic': 'mcp2mqtt/led',
'message': 'LED on' # 打开LED
},
{
'topic': 'mcp2mqtt/led',
'message': 'LED off' # 关闭LED
}
]
# 每隔5秒发送一条测试消息
for msg in test_messages:
server.publish_test_message(msg['topic'], msg['message'])
logger.info(f"Published test message: {msg['message']} to topic: {msg['topic']}")
time.sleep(5)
# 保持运行一段时间以接收响应
logger.info("Waiting for responses...")
time.sleep(10)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
finally:
server.close()
if __name__ == "__main__":
main()