We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/moneypiaorui/ros-mcp-server-minipi'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import base64
import json
from typing import Optional
from pathlib import Path
from typing import Protocol
from datetime import datetime
import numpy as np
import cv2
class Subscriber(Protocol):
def receive_binary(self) -> bytes:
...
def send(self, message: dict) -> None:
...
class Image:
def __init__(self, subscriber: Subscriber, topic: str = "/camera/image_raw"):
self.subscriber = subscriber
self.topic = topic
def subscribe(self, save_path: Optional[str] = None) -> Optional[bytes]:
try:
subscribe_msg = {
"op": "subscribe",
"topic": self.topic,
"type": "sensor_msgs/Image"
}
self.subscriber.send(subscribe_msg)
raw = self.subscriber.receive_binary()
if not raw:
print("[Image] No data received from subscriber")
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
msg = json.loads(raw)
msg = msg["msg"]
# Extract metadata
height = msg["height"]
width = msg["width"]
encoding = msg["encoding"]
data_b64 = msg["data"]
# Decode base64 to raw bytes
image_bytes = base64.b64decode(data_b64)
img_np = np.frombuffer(image_bytes, dtype=np.uint8)
# Handle encoding
if encoding == "rgb8":
img_np = img_np.reshape((height, width, 3))
img_cv = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # convert to BGR for OpenCV
elif encoding == "bgr8":
img_cv = img_np.reshape((height, width, 3))
elif encoding == "mono8":
img_cv = img_np.reshape((height, width))
else:
print(f"[Image] Unsupported encoding: {encoding}")
return None
# Save image
if save_path is None:
downloads_dir = Path(__file__).resolve().parents[2] / "screenshots"
if not downloads_dir.exists():
downloads_dir.mkdir(parents=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = downloads_dir / f"{timestamp}.png"
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(save_path), img_cv)
print(f"[Image] Saved to {save_path}")
return img_cv
except Exception as e:
print(f"[Image] Failed to receive or decode: {e}")
return None