We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/robotics-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import logging
import subprocess
import platform
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class DreameClient:
"""
Client for controlling Dreame robot vacuums.
Currently supports:
- Connectivity check (Ping)
- Authentication status check
- Placeholder for command execution (requires manual token)
"""
def __init__(self, ip: str, token: Optional[str] = None):
self.ip = ip
self.token = token
self.connected = False
self.device_info = {}
async def connect(self) -> bool:
"""
Attempts to connect to the robot.
1. Checks network reachability (verified).
2. Checks if token is valid (stub).
"""
if not self._check_ping():
logger.error(f"❌ Robot at {self.ip} is unreachable via Ping.")
return False
logger.info(f"✅ Robot at {self.ip} is reachable.")
if not self.token or self.token == "YOUR_TOKEN_HERE":
logger.warning("⚠️ Dreame Authentication Token is missing.")
logger.warning(
"Please extract the token using a 3rd party tool (e.g. VeVS Mi Home) and update config."
)
return False
# TODO: Implement actual handshake/hello if token is present
# using python-miio or custom packet logic.
# For now, we assume if token is provided, we try to use it.
try:
# from miio import DreameVacuum
# self.device = DreameVacuum(self.ip, self.token)
# info = self.device.info()
# logger.info(f"✅ Connected to Dreame Device: {info}")
self.connected = True
return True
except Exception as e:
logger.error(f"❌ Failed to connect with provided token: {e}")
return False
def _check_ping(self) -> bool:
"""Pings the robot IP."""
param = "-n" if platform.system().lower() == "windows" else "-c"
command = ["ping", param, "1", self.ip]
try:
return (
subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0
)
except Exception:
return False
async def get_status(self) -> Dict[str, Any]:
"""Returns robot status."""
if not self.connected:
return {
"status": "disconnected",
"connectivity": "online" if self._check_ping() else "offline",
"auth": "missing_token" if not self.token else "invalid_token",
}
# Mock status for now if connected logic was real
return {"status": "docked", "battery": 100}
async def start_cleaning(self):
if not self.connected:
raise ConnectionError("Cannot start cleaning: Robot not connected.")
# Implementation for start cleaning
pass
async def return_to_base(self):
if not self.connected:
raise ConnectionError("Cannot return to base: Robot not connected.")
# Implementation for return to base
pass