from fastmcp import FastMCP
import sys
import logging
import os
import ssl
from dotenv import load_dotenv
import paho.mqtt.client as mqtt
# 加载 .env 环境变量
load_dotenv(override=True)
# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('MQTT-Light-Controller')
# 修复 Windows 控制台编码
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# -------------------------- MQTT 配置 & 客户端 --------------------------
# 从环境变量读取 MQTT 配置(也可直接硬编码,建议用 .env)
MQTT_BROKER = os.getenv("MQTT_BROKER", "b11f17c5.ala.cn-hangzhou.emqxsl.cn")
MQTT_PORT = int(os.getenv("MQTT_PORT", 8883)) # TLS端口
MQTT_CONTROL_TOPIC = os.getenv("MQTT_CONTROL_TOPIC", "smartlight/control") # 控制灯的发布主题
MQTT_STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC", "smartlight/status") # 灯状态订阅主题
MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "mcp-light-controller")
# 若 EMQX 需认证,补充账号密码(根据实际情况调整)
MQTT_USER = os.getenv("MQTT_USER", "")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
# 初始化 MQTT 客户端
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
# MQTT TLS 配置(必填,因为用 8883 端口)
def setup_mqtt_tls():
"""配置 MQTT TLS/SSL 连接"""
context = ssl.create_default_context()
# 忽略证书验证(测试用,生产环境建议加载CA证书)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
mqtt_client.tls_set_context(context)
# 若有账号密码,设置认证
if MQTT_USER and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
# MQTT 连接回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info(f"MQTT 连接成功 (broker: {MQTT_BROKER}:{MQTT_PORT})")
# 订阅灯状态主题(可选,接收灯的状态反馈)
client.subscribe(MQTT_STATUS_TOPIC)
else:
logger.error(f"MQTT 连接失败,错误码: {rc}")
# MQTT 消息接收回调(订阅主题的消息)
def on_message(client, userdata, msg):
logger.info(f"收到 MQTT 消息 [主题: {msg.topic}]: {msg.payload.decode('utf-8')}")
# 注册 MQTT 回调
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
# 连接 MQTT 服务器
def connect_mqtt():
try:
setup_mqtt_tls()
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
# 启动 MQTT 客户端循环(非阻塞)
mqtt_client.loop_start()
except Exception as e:
logger.error(f"MQTT 连接异常: {e}")
raise
# -------------------------- MCP 工具定义 --------------------------
# 创建 MCP 服务器实例
mcp = FastMCP("MQTTLightController")
# 工具1:控制灯的开关
@mcp.tool()
def control_light(light_state: str) -> dict:
"""
控制智能灯的开关状态
:param light_state: 灯的状态,可选值: "on"(开)/ "off"(关)
:return: 操作结果
"""
try:
# 校验参数
if light_state not in ["on", "off"]:
return {"success": False, "error": "参数错误,仅支持 on/off"}
# 发布 MQTT 控制消息(JSON 格式,便于设备解析)
control_msg = f'{{"action": "{light_state}"}}'
mqtt_client.publish(MQTT_CONTROL_TOPIC, control_msg, qos=1)
logger.info(f"已发布 MQTT 控制指令 [主题: {MQTT_CONTROL_TOPIC}]: {control_msg}")
return {
"success": True,
"message": f"灯已{('开启' if light_state == 'on' else '关闭')}",
"mqtt_topic": MQTT_CONTROL_TOPIC,
"mqtt_msg": control_msg
}
except Exception as e:
logger.error(f"控制灯失败: {e}")
return {"success": False, "error": str(e)}
# 工具2:调节灯的亮度(扩展功能)
@mcp.tool()
def adjust_brightness(brightness: int) -> dict:
"""
调节智能灯的亮度
:param brightness: 亮度值(0-100)
:return: 操作结果
"""
try:
# 校验参数
if not (0 <= brightness <= 100):
return {"success": False, "error": "亮度值必须在 0-100 之间"}
# 发布亮度调节指令
control_msg = f'{{"action": "adjust_brightness", "brightness": {brightness}}}'
mqtt_client.publish(MQTT_CONTROL_TOPIC, control_msg, qos=1)
logger.info(f"已发布亮度调节指令 [主题: {MQTT_CONTROL_TOPIC}]: {control_msg}")
return {
"success": True,
"message": f"灯亮度已调节至 {brightness}%",
"mqtt_topic": MQTT_CONTROL_TOPIC,
"mqtt_msg": control_msg
}
except Exception as e:
logger.error(f"调节亮度失败: {e}")
return {"success": False, "error": str(e)}
# -------------------------- 启动入口 --------------------------
if __name__ == "__main__":
# 先连接 MQTT
connect_mqtt()
if "--mqtt-only" in sys.argv:
logger.info(f"已启动 MQTT 订阅监听,订阅主题: {MQTT_STATUS_TOPIC}")
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
pass
else:
# 启动 MCP 服务器(stdio 传输,适配 mcp_pipe.py)
mcp.run(transport="stdio")