emqx-mcp-server
by Benniu
- src
- emqx_mcp_server
"""
EMQX HTTP API Client Module
This module provides a client for interacting with the EMQX MQTT broker's HTTP API.
It handles authentication, request formatting, and response processing.
"""
import httpx
import base64
import logging
from .config import EMQX_API_URL, EMQX_API_KEY, EMQX_API_SECRET
class EMQXClient:
"""
EMQX HTTP API Client
Provides methods to interact with EMQX Cloud or self-hosted EMQX broker
through its HTTP API. Handles authentication and error processing.
Attributes:
api_url (str): The base URL for the EMQX HTTP API
api_key (str): API key for authentication
api_secret (str): API secret for authentication
logger (Logger): Logger instance for logging messages
"""
def __init__(self, logger: logging.Logger):
self.api_url = EMQX_API_URL
self.api_key = EMQX_API_KEY
self.api_secret = EMQX_API_SECRET
self.logger = logger
def _get_auth_header(self):
"""Create authorization header for EMQX Cloud API"""
auth_string = f"{self.api_key}:{self.api_secret}"
encoded_auth = base64.b64encode(auth_string.encode()).decode()
return {
"Authorization": f"Basic {encoded_auth}",
"Content-Type": "application/json"
}
def _handle_response(self, response):
"""Process API response, extract data and handle errors"""
try:
if response.status_code >= 200 and response.status_code < 300:
return response.json()
else:
error_msg = f"EMQX API Error: {response.status_code} - {response.text}"
self.logger.error(error_msg)
return {"error": error_msg}
except Exception as e:
error_msg = f"Error processing response: {str(e)}"
self.logger.error(error_msg)
return {"error": error_msg}
async def publish_message(self, topic: str, payload: str, qos: int=0, retain: bool=False):
"""
Publish a message to an MQTT topic.
Uses the EMQX HTTP API to publish a message to a specific MQTT topic.
Args:
topic (str): The MQTT topic to publish to
payload (str): The message payload to publish
qos (int, optional): Quality of Service level (0, 1, or 2). Defaults to 0.
retain (bool, optional): Whether to retain the message. Defaults to False.
Returns:
dict: Response from the EMQX API or error information
"""
url = f"{self.api_url}/publish"
data = {
"topic": topic,
"payload": payload,
"qos": qos,
"retain": retain
}
self.logger.info(f"Publishing message to topic {topic}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=self._get_auth_header(), json=data, timeout=30)
response.raise_for_status()
return self._handle_response(response)
except Exception as e:
self.logger.error(f"Error publishing message: {str(e)}")
return {"error": str(e)}
async def list_clients(self, params=None):
"""
Get a list of connected MQTT clients.
Uses the EMQX HTTP API to retrieve information about connected clients.
Args:
params (dict, optional): Query parameters to filter results:
- page: Page number (default: 1)
- limit: Results per page, max 10000 (default: 10)
- clientid: Client ID
- username: Username
- ip_address: Client IP address
- conn_state: Connection state
- clean_start: Clean start flag
- proto_ver: Protocol version
- like_clientid: Fuzzy search by client ID pattern
- like_username: Fuzzy search by username pattern
- like_ip_address: Fuzzy search by IP address pattern
Returns:
dict: Response from the EMQX API containing client data or error information
"""
url = f"{self.api_url}/clients"
# Default params if none provided
if params is None:
params = {"page": 1, "limit": 10}
self.logger.info("Retrieving list of MQTT clients")
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url,
headers=self._get_auth_header(),
params=params,
timeout=30
)
response.raise_for_status()
return self._handle_response(response)
except Exception as e:
self.logger.error(f"Error retrieving clients: {str(e)}")
return {"error": str(e)}
async def get_client_info(self, clientid: str):
"""
Get detailed information about a specific MQTT client by client ID.
Uses the EMQX HTTP API to retrieve detailed information about a specific
client identified by its client ID.
Args:
clientid (str): The unique identifier of the client to retrieve
Returns:
dict: Response from the EMQX API containing client data or error information
"""
url = f"{self.api_url}/clients/{clientid}"
self.logger.info(f"Retrieving information for client ID: {clientid}")
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url,
headers=self._get_auth_header(),
timeout=30
)
response.raise_for_status()
return self._handle_response(response)
except Exception as e:
self.logger.error(f"Error retrieving client info for {clientid}: {str(e)}")
return {"error": str(e)}
async def kick_client(self, clientid: str):
"""
Kick out (disconnect) a client from the MQTT broker.
Uses the EMQX HTTP API to forcibly disconnect a client identified by its client ID.
Args:
clientid (str): The unique identifier of the client to disconnect
Returns:
dict: Response from the EMQX API or error information
"""
url = f"{self.api_url}/clients/{clientid}"
self.logger.info(f"Kicking out client with ID: {clientid}")
async with httpx.AsyncClient() as client:
try:
response = await client.delete(
url,
headers=self._get_auth_header(),
timeout=30
)
response.raise_for_status()
# For successful delete operations, return a success message
if response.status_code == 204: # No Content
return {"success": True, "message": f"Client {clientid} has been disconnected"}
return self._handle_response(response)
except Exception as e:
self.logger.error(f"Error kicking out client {clientid}: {str(e)}")
return {"error": str(e)}