Skip to main content
Glama
Benniu

emqx-mcp-server

by Benniu

publish_mqtt_message

Publish MQTT messages to EMQX clusters on EMQX Cloud or self-managed deployments for IoT and real-time communication applications.

Instructions

Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • MCP tool handler for 'publish_mqtt_message'. Extracts and validates parameters from the request (topic, payload, qos, retain), then delegates to EMQXClient.publish_message() to perform the publish.
    @mcp.tool(name="publish_mqtt_message", 
              description="Publish an MQTT Message to Your EMQX Cluster on EMQX Cloud or Self-Managed Deployment")
    async def publish(request):
        """Handle publish message request
        
        Args:
            request: MCP request containing message data
                - topic: MQTT topic 
                - payload: Message content
                - qos: Quality of Service (0, 1, or 2)
                - retain: Whether to retain the message (true or false)
        
        Returns:
            MCPResponse: Response object with publish result
        """
        self.logger.info("Handling publish request")
        
        # Extract parameters from the request
        topic = request.get("topic")
        payload = request.get("payload")
        qos = request.get("qos", 0)  # Default QoS level is 0
        retain = request.get("retain", False)  # Default is not to retain
        
        # Validate required parameters before proceeding
        if not topic:
            self.logger.error("Missing required parameter: topic")
            return f'"error": "Missing required parameter: topic"'
        
        if payload is None:
            self.logger.error("Missing required parameter: payload")
            return f'"error": "Missing required parameter: payload"'
        
        # Publish message to EMQX using the client
        result = await self.emqx_client.publish_message(
            topic=topic,
            payload=payload,
            qos=qos,
            retain=retain
        )
        
        self.logger.info(f"Message published successfully to topic: {topic}")
        return result
  • Instantiates EMQXMessageTools and calls its register_tools method on the FastMCP server instance, which defines and registers the 'publish_mqtt_message' tool using @mcp.tool decorator.
    emqx_message_tools = EMQXMessageTools(self.logger)
    emqx_message_tools.register_tools(self.mcp)
  • Supporting utility in EMQXClient class that makes the actual HTTP POST request to the EMQX broker's /publish API endpoint to publish the MQTT message.
    async def publish_message(self, topic: str, payload: str, qos: int=0, retain: bool=False):
        """
        Publish a message to an MQTT topic.
        
        Uses the EMQX HTTP API to publish a message to a specific MQTT topic.
        
        Args:
            topic (str): The MQTT topic to publish to
            payload (str): The message payload to publish
            qos (int, optional): Quality of Service level (0, 1, or 2). Defaults to 0.
            retain (bool, optional): Whether to retain the message. Defaults to False.
            
        Returns:
            dict: Response from the EMQX API or error information
        """
        url = f"{self.api_url}/publish"
        data = {
            "topic": topic,
            "payload": payload,
            "qos": qos,
            "retain": retain
        }
        self.logger.info(f"Publishing message to topic {topic}")
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, headers=self._get_auth_header(), json=data, timeout=30)
                response.raise_for_status()
                return self._handle_response(response)
            except Exception as e:
                self.logger.error(f"Error publishing message: {str(e)}")
                return {"error": str(e)}

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Benniu/emqx-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server