Skip to main content
Glama
Benniu

emqx-mcp-server

by Benniu

publish_mqtt_message

Publish MQTT messages to EMQX clusters on EMQX Cloud or self-managed deployments for IoT and real-time communication applications.

Instructions

Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • MCP tool handler for 'publish_mqtt_message'. Extracts and validates parameters from the request (topic, payload, qos, retain), then delegates to EMQXClient.publish_message() to perform the publish.
    @mcp.tool(name="publish_mqtt_message", 
              description="Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment")
    async def publish(request):
        """Handle publish message request
        
        Args:
            request: MCP request containing message data
                - topic: MQTT topic 
                - payload: Message content
                - qos: Quality of Service (0, 1, or 2)
                - retain: Whether to retain the message (true or false)
        
        Returns:
            MCPResponse: Response object with publish result
        """
        self.logger.info("Handling publish request")
        
        # Extract parameters from the request
        topic = request.get("topic")
        payload = request.get("payload")
        qos = request.get("qos", 0)  # Default QoS level is 0
        retain = request.get("retain", False)  # Default is not to retain
        
        # Validate required parameters before proceeding
        if not topic:
            self.logger.error("Missing required parameter: topic")
            return f'"error": "Missing required parameter: topic"'
        
        if payload is None:
            self.logger.error("Missing required parameter: payload")
            return f'"error": "Missing required parameter: payload"'
        
        # Publish message to EMQX using the client
        result = await self.emqx_client.publish_message(
            topic=topic,
            payload=payload,
            qos=qos,
            retain=retain
        )
        
        self.logger.info(f"Message published successfully to topic: {topic}")
        return result
  • Instantiates EMQXMessageTools and calls its register_tools method on the FastMCP server instance, which defines and registers the 'publish_mqtt_message' tool using @mcp.tool decorator.
    emqx_message_tools = EMQXMessageTools(self.logger)
    emqx_message_tools.register_tools(self.mcp)
  • Supporting utility in EMQXClient class that makes the actual HTTP POST request to the EMQX broker's /publish API endpoint to publish the MQTT message.
    async def publish_message(self, topic: str, payload: str, qos: int=0, retain: bool=False):
        """
        Publish a message to an MQTT topic.
        
        Uses the EMQX HTTP API to publish a message to a specific MQTT topic.
        
        Args:
            topic (str): The MQTT topic to publish to
            payload (str): The message payload to publish
            qos (int, optional): Quality of Service level (0, 1, or 2). Defaults to 0.
            retain (bool, optional): Whether to retain the message. Defaults to False.
            
        Returns:
            dict: Response from the EMQX API or error information
        """
        url = f"{self.api_url}/publish"
        data = {
            "topic": topic,
            "payload": payload,
            "qos": qos,
            "retain": retain
        }
        self.logger.info(f"Publishing message to topic {topic}")
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, headers=self._get_auth_header(), json=data, timeout=30)
                response.raise_for_status()
                return self._handle_response(response)
            except Exception as e:
                self.logger.error(f"Error publishing message: {str(e)}")
                return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Benniu/emqx-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server