Skip to main content
Glama
mqtt_mcp_server.py8.95 kB
import argparse import os import time import json import paho.mqtt.client as mqtt from mcp.server.fastmcp import FastMCP import threading from typing import List, Dict, Any # --- Configuration --- parser = argparse.ArgumentParser(description="MCP Server for MQTT Operations") # MQTT Configuration parser.add_argument('--broker', default=os.getenv('MQTT_BROKER_ADDRESS', 'localhost'), help='MQTT broker address') parser.add_argument('--port', type=int, default=int(os.getenv('MQTT_PORT', 1883)), help='MQTT broker port') parser.add_argument('--client-id', default=os.getenv('MQTT_CLIENT_ID', 'mcp-mqtt-client'), help='MQTT client ID') parser.add_argument('--username', default=os.getenv('MQTT_USERNAME'), help='MQTT username') parser.add_argument('--password', default=os.getenv('MQTT_PASSWORD'), help='MQTT password') # Transport Configuration parser.add_argument('--transport', choices=['stdio', 'streamable-http', 'sse'], default='stdio', help='Transport type (default: stdio)') args = parser.parse_args() # --- MCP Server Setup --- mcp = FastMCP("MQTT Bridge") # --- MQTT Client Helper --- def get_mqtt_client(client_id_suffix: str = "") -> mqtt.Client: """Creates and configures an MQTT client based on parsed args.""" client_id = f"{args.client_id}{client_id_suffix}" client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id) if args.username: client.username_pw_set(args.username, args.password) return client # --- MCP Tools --- @mcp.tool() def mqtt_publish(topic: str, message: str, qos: int = 0, retain: bool = False) -> str: """ Publishes a message to a specific MQTT topic. Args: topic: The MQTT topic to publish to. message: The message payload to send. qos: The Quality of Service level (0, 1, or 2). Defaults to 0. retain: Whether the message should be retained by the broker. Defaults to False. Returns: A confirmation message string. """ if qos not in [0, 1, 2]: return "Error: QoS must be 0, 1, or 2." client = get_mqtt_client("-publisher") result = None error_message = None def on_connect(client, userdata, flags, reason_code, properties): nonlocal result nonlocal error_message if reason_code == 0: print(f"Publisher connected to {args.broker}:{args.port}") publish_info = client.publish(topic, message, qos=qos, retain=retain) # For QoS 1 and 2, wait for PUBACK/PUBCOMP. For QoS 0, it returns immediately. if qos > 0: # Wait a reasonable amount of time for ACK. In a real app, might use on_publish callback. publish_info.wait_for_publish(timeout=5) if publish_info.is_published(): result = f"Message published to topic \'{topic}\' (QoS {qos}, Retain: {retain})" else: error_message = f"Failed to publish message to topic \'{topic}\' (Timeout or Error)" else: # QoS 0 result = f"Message published to topic \'{topic}\' (QoS 0, Retain: {retain})" client.loop_stop() # Stop loop after publishing client.disconnect() else: error_message = f"Failed to connect to broker: {reason_code}" client.loop_stop() # Stop loop on connection failure def on_disconnect(client, userdata, disconnect_flags, reason_code, properties): print(f"Publisher disconnected: {reason_code}") client.on_connect = on_connect client.on_disconnect = on_disconnect try: client.connect(args.broker, args.port, 60) client.loop_start() # Start network loop in background thread # Wait for connection and publish attempt # The loop will be stopped in on_connect or on_disconnect # Give it a bit more time than the publish timeout loop_timeout = 10 # seconds start_time = time.monotonic() while client.is_connected() or (time.monotonic() - start_time < loop_timeout): if result is not None or error_message is not None: break time.sleep(0.1) # Prevent busy-waiting # Ensure loop is stopped if timeout occurred before callback finished if client._thread and client._thread.is_alive(): client.loop_stop() client.disconnect() # Attempt disconnect if loop stopped prematurely except Exception as e: error_message = f"MQTT connection or publish error: {e}" if client._thread and client._thread.is_alive(): client.loop_stop() return result if result is not None else error_message or "Publish operation finished with unknown state." @mcp.tool() def mqtt_subscribe(topic: str, num_messages: int = 1, timeout: int = 10) -> List[Dict[str, Any]]: """ Subscribes to an MQTT topic and receives a specified number of messages or waits for a timeout. Args: topic: The MQTT topic to subscribe to (can include wildcards like + or #). num_messages: The maximum number of messages to receive. Defaults to 1. timeout: The maximum time (in seconds) to wait for messages. Defaults to 10. Returns: A list of dictionaries, where each dictionary represents a received message with 'topic' and 'payload' keys. """ client = get_mqtt_client("-subscriber") received_messages: List[Dict[str, Any]] = [] received_count = 0 connection_error = None stop_event = threading.Event() # Used to signal when to stop the loop def on_connect(client, userdata, flags, reason_code, properties): nonlocal connection_error if reason_code == 0: print(f"Subscriber connected to {args.broker}:{args.port}") # Subscribe upon successful connection client.subscribe(topic) print(f"Subscribed to topic: {topic}") else: connection_error = f"Failed to connect: {reason_code}" stop_event.set() # Signal to stop if connection fails def on_message(client, userdata, msg): nonlocal received_count message_data = { "topic": msg.topic, "payload": msg.payload.decode('utf-8', errors='ignore'), # Decode payload safely "qos": msg.qos, "retain": msg.retain } received_messages.append(message_data) received_count += 1 print(f"Received message {received_count}/{num_messages} on topic '{msg.topic}'") if received_count >= num_messages: stop_event.set() # Signal to stop once desired messages are received def on_disconnect(client, userdata, disconnect_flags, reason_code, properties): print(f"Subscriber disconnected: {reason_code}") # If disconnected unexpectedly, signal stop if not stop_event.is_set(): print("Unexpected disconnection, stopping subscribe operation.") stop_event.set() client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect try: client.connect(args.broker, args.port, 60) client.loop_start() # Wait for the stop event to be set (by timeout, message count, or error) stop_event.wait(timeout=timeout) client.loop_stop() client.disconnect() except Exception as e: connection_error = f"MQTT connection or subscribe error: {e}" if client._thread and client._thread.is_alive(): client.loop_stop() if connection_error: # We might have received some messages before the error, but indicate failure # Alternatively, raise an exception or return an error structure print(f"Error during subscription: {connection_error}") # Returning what we got, but the caller should check status/logs # Or return a specific error object: return {"error": connection_error, "received": received_messages} # For simplicity here, just returning the messages collected so far. print(f"Subscribe operation finished. Received {len(received_messages)} messages.") return received_messages def main(): print(f"🚀 Starting MQTT MCP Server...") print(f"📡 MQTT Broker: {args.broker}:{args.port}") print(f"🚚 Transport: {args.transport}") if args.transport == 'stdio': print("📺 Running in STDIO mode (for local development/Claude Desktop)") mcp.run() elif args.transport == 'streamable-http': print("💡 Recommended for web deployments") mcp.run( transport="streamable-http" ) elif args.transport == 'sse': print("⚠️ WARNING: SSE transport is deprecated. Consider using streamable-http.") mcp.run( transport="sse", ) # --- Main Execution --- if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Omniscience-Labs/OMNI-MQTT-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server