Skip to main content
Glama

emqx-mcp-server

by Benniu
emqx_message_tools.py2.39 kB
""" EMQX Message Tools Module This module provides tools for publishing MQTT messages to an EMQX broker. It registers these tools with the MCP server, making them available for clients to use through the MCP protocol. """ import logging from typing import Any from ..emqx_client import EMQXClient class EMQXMessageTools: def __init__(self, logger: logging.Logger): self.logger = logger self.emqx_client = EMQXClient(logger) def register_tools(self, mcp: Any): """Register EMQX Publish tools.""" @mcp.tool(name="publish_mqtt_message", description="Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment") async def publish(request): """Handle publish message request Args: request: MCP request containing message data - topic: MQTT topic - payload: Message content - qos: Quality of Service (0, 1, or 2) - retain: Whether to retain the message (true or false) Returns: MCPResponse: Response object with publish result """ self.logger.info("Handling publish request") # Extract parameters from the request topic = request.get("topic") payload = request.get("payload") qos = request.get("qos", 0) # Default QoS level is 0 retain = request.get("retain", False) # Default is not to retain # Validate required parameters before proceeding if not topic: self.logger.error("Missing required parameter: topic") return f'"error": "Missing required parameter: topic"' if payload is None: self.logger.error("Missing required parameter: payload") return f'"error": "Missing required parameter: payload"' # Publish message to EMQX using the client result = await self.emqx_client.publish_message( topic=topic, payload=payload, qos=qos, retain=retain ) self.logger.info(f"Message published successfully to topic: {topic}") return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Benniu/emqx-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server