Skip to main content
Glama
Benniu
by Benniu

publish_mqtt_message

Publish MQTT messages to EMQX clusters on EMQX Cloud or self-managed deployments using the Model Context Protocol for MQTT broker interaction.

Instructions

Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • The MCP tool handler function 'publish' that processes the publish_mqtt_message request, extracts parameters, validates them, and calls the EMQXClient to publish the MQTT message.
    @mcp.tool(name="publish_mqtt_message", description="Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment") async def publish(request): """Handle publish message request Args: request: MCP request containing message data - topic: MQTT topic - payload: Message content - qos: Quality of Service (0, 1, or 2) - retain: Whether to retain the message (true or false) Returns: MCPResponse: Response object with publish result """ self.logger.info("Handling publish request") # Extract parameters from the request topic = request.get("topic") payload = request.get("payload") qos = request.get("qos", 0) # Default QoS level is 0 retain = request.get("retain", False) # Default is not to retain # Validate required parameters before proceeding if not topic: self.logger.error("Missing required parameter: topic") return f'"error": "Missing required parameter: topic"' if payload is None: self.logger.error("Missing required parameter: payload") return f'"error": "Missing required parameter: payload"' # Publish message to EMQX using the client result = await self.emqx_client.publish_message( topic=topic, payload=payload, qos=qos, retain=retain ) self.logger.info(f"Message published successfully to topic: {topic}") return result
  • Instantiates EMQXMessageTools and calls register_tools on the MCP server instance, which registers the publish_mqtt_message tool via its decorator.
    emqx_message_tools = EMQXMessageTools(self.logger) emqx_message_tools.register_tools(self.mcp)
  • The EMQXClient helper method that performs the actual HTTP POST to the EMQX API /publish endpoint to send the MQTT message.
    async def publish_message(self, topic: str, payload: str, qos: int=0, retain: bool=False): """ Publish a message to an MQTT topic. Uses the EMQX HTTP API to publish a message to a specific MQTT topic. Args: topic (str): The MQTT topic to publish to payload (str): The message payload to publish qos (int, optional): Quality of Service level (0, 1, or 2). Defaults to 0. retain (bool, optional): Whether to retain the message. Defaults to False. Returns: dict: Response from the EMQX API or error information """ url = f"{self.api_url}/publish" data = { "topic": topic, "payload": payload, "qos": qos, "retain": retain } self.logger.info(f"Publishing message to topic {topic}") async with httpx.AsyncClient() as client: try: response = await client.post(url, headers=self._get_auth_header(), json=data, timeout=30) response.raise_for_status() return self._handle_response(response) except Exception as e: self.logger.error(f"Error publishing message: {str(e)}") return {"error": str(e)}

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Benniu/emqx-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server