Skip to main content
Glama
websocket_manager.py15.3 kB
import base64 import json import os import sys import threading from typing import Union import cv2 import numpy as np import websocket def parse_json(raw: Union[str, bytes] | None) -> dict | None: """ Safely parse JSON from string or bytes. Args: raw: JSON string, bytes, or None Returns: Parsed dict if successful, None if raw is None, parsing fails, or result is not a dict """ if raw is None: return None if isinstance(raw, bytes): raw = raw.decode("utf-8", errors="replace") try: result = json.loads(raw) return result if isinstance(result, dict) else None except (json.JSONDecodeError, TypeError): return None def is_image_like(msg_content: dict) -> bool: """ Check if a message looks like an image message by examining its fields. This checks for image-specific fields (width, height, encoding) in addition to the data field to distinguish images from other messages that may contain binary data (e.g., PointCloud2, ByteMultiArray). Args: msg_content: The message content dictionary Returns: bool: True if the message appears to be an image, False otherwise """ if not isinstance(msg_content, dict): return False # Check for CompressedImage format (has 'data' and 'format' fields) if "data" in msg_content and "format" in msg_content: format_str = msg_content.get("format", "").lower() if any(fmt in format_str for fmt in ["jpeg", "jpg", "png", "bmp", "compressed"]): return True # Check for raw Image format (has 'data', 'width', 'height', 'encoding') required_fields = {"data", "width", "height", "encoding"} if not required_fields.issubset(msg_content.keys()): return False # Validate field types if not isinstance(msg_content.get("width"), int) or not isinstance( msg_content.get("height"), int ): return False # Check for valid image encodings (sensor_msgs/Image standard encodings) encoding = msg_content.get("encoding", "").lower() valid_encodings = [ "rgb8", "rgba8", "bgr8", "bgra8", "mono8", "mono16", "8uc1", "8uc3", "8uc4", "16uc1", "bayer", "yuv", ] if not any(enc in encoding for enc in valid_encodings): return False return True def parse_image(raw: Union[str, bytes] | None) -> dict | None: """ Decode an image message (json with base64 data) and save it as JPEG. Args: raw: JSON string, bytes, or None Returns: Parsed dict if successful, None if raw is None, parsing fails, or result is not a dict """ # 1. Input validation if raw is None: return None # 2. Parse JSON and extract message try: result = json.loads(raw) msg = result["msg"] except (json.JSONDecodeError, KeyError): print("[Image] Invalid JSON or missing 'msg' field.", file=sys.stderr) return None # 3. Extract and validate required fields data_b64 = msg.get("data") if not data_b64: print("[Image] Missing 'data' field in message.", file=sys.stderr) return None # 4. Ensure output directory exists os.makedirs("./camera", exist_ok=True) # 5. Determine image type and process accordingly format = msg.get("format") print(f"[Image] Format: {format}", file=sys.stderr) # 5a. Handle CompressedImage (already JPEG/PNG encoded) if format and any(fmt in format.lower() for fmt in ["jpeg", "jpg", "png", "bmp", "compressed"]): return _handle_compressed_image(data_b64, result) # 5b. Handle Raw Image (rgb8, bgr8, mono8, mono16, 16uc1) height, width, encoding = msg.get("height"), msg.get("width"), msg.get("encoding") if not all([height, width, encoding]): print("[Image] Missing required fields for raw image.", file=sys.stderr) return None return _handle_raw_image(data_b64, height, width, encoding, msg, result) def _handle_compressed_image(data_b64: str, result: dict) -> dict | None: """Handle compressed image data (JPEG/PNG already encoded).""" path = "./camera/received_image.jpeg" image_bytes = base64.b64decode(data_b64) with open(path, "wb") as f: f.write(image_bytes) print(f"[Image] Saved CompressedImage to {path}", file=sys.stderr) return result if isinstance(result, dict) else None def _handle_raw_image( data_b64: str, height: int, width: int, encoding: str, msg: dict, result: dict ) -> dict | None: """Handle raw image data (needs decoding and conversion).""" # Decode base64 to numpy array image_bytes = base64.b64decode(data_b64) # Determine data type based on encoding if encoding.lower() in ["mono16", "16uc1"]: img_np = np.frombuffer(image_bytes, dtype=np.uint16) else: img_np = np.frombuffer(image_bytes, dtype=np.uint8) # Process based on encoding type try: img_cv = _decode_image_data(img_np, height, width, encoding, msg) if img_cv is None: return None except ValueError as e: print(f"[Image] Reshape error: {e}", file=sys.stderr) return None # Save as JPEG with quality 95 success = cv2.imwrite("./camera/received_image.jpeg", img_cv, [cv2.IMWRITE_JPEG_QUALITY, 95]) if success: print("[Image] Saved raw Image to ./camera/received_image.jpeg", file=sys.stderr) return result if isinstance(result, dict) else None else: return None def _decode_image_data( img_np: np.ndarray, height: int, width: int, encoding: str, msg: dict ) -> np.ndarray | None: """Decode image data based on encoding type.""" # 8-bit encodings if encoding == "rgb8": img_cv = img_np.reshape((height, width, 3)) img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR) elif encoding == "bgr8": img_cv = img_np.reshape((height, width, 3)) elif encoding.lower() == "mono8": img_cv = img_np.reshape((height, width)) # 16-bit encodings elif encoding.lower() in ["mono16", "16uc1"]: img16 = img_np.reshape((height, width)) # Handle big-endian byte order if needed try: if int(msg.get("is_bigendian", 0)) == 1: img16 = img16.byteswap().newbyteorder() except Exception: # If field missing or not int-like, proceed without swapping pass # Normalize 16-bit depth to 8-bit [0,255] for saving/preview img_cv = cv2.normalize(img16, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) else: print(f"[Image] Unsupported encoding: {encoding}", file=sys.stderr) return None return img_cv def parse_input( raw: Union[str, bytes] | None, expects_image: bool | None = None ) -> tuple[dict | None, bool]: """ Parse input data with optional image hint for optimized handling. Logic: - expects_image=True: Try image parsing, fallback to JSON - expects_image=False: Parse as JSON only (fastest) - expects_image=None: Auto-detect using lightweight checks, then parse accordingly Args: raw: JSON string, bytes, or None expects_image: Optional hint about whether to expect image data Returns: tuple: (parsed_data, was_parsed_as_image) - parsed_data: Parsed dict if successful, None otherwise - was_parsed_as_image: True if data was successfully parsed as image """ # 1. Input validation if raw is None: return None, False # 2. Parse as JSON first (always needed as fallback) parsed_data = parse_json(raw) if parsed_data is None: return None, False # 3. Handle explicit hints if expects_image is True: return _handle_image_hint(raw, parsed_data) elif expects_image is False: return _handle_json_hint(parsed_data) else: return _handle_auto_detection(raw, parsed_data) def _handle_image_hint(raw: Union[str, bytes], parsed_data: dict) -> tuple[dict | None, bool]: """Handle explicit image hint - try image parsing first.""" print("[Input] Hinted to parse as image", file=sys.stderr) result = parse_image(raw) if result is not None: return result, True return parsed_data, False def _handle_json_hint(parsed_data: dict) -> tuple[dict | None, bool]: """Handle explicit JSON hint - skip image parsing.""" print("[Input] Hinted to parse as JSON", file=sys.stderr) return parsed_data, False def _handle_auto_detection(raw: Union[str, bytes], parsed_data: dict) -> tuple[dict | None, bool]: """Handle auto-detection - check if message looks like an image.""" print("[Input] Auto-detecting image", file=sys.stderr) # Check if this is a publish message that might contain image data if parsed_data and isinstance(parsed_data, dict) and parsed_data.get("op") == "publish": msg_content = parsed_data.get("msg", {}) if is_image_like(msg_content): # Try image parsing result = parse_image(raw) if result is not None: return result, True # Return the already parsed JSON return parsed_data, False class WebSocketManager: def __init__(self, ip: str, port: int, default_timeout: float = 2.0): self.ip = ip self.port = port self.default_timeout = default_timeout self.ws = None self.lock = threading.RLock() def set_ip(self, ip: str, port: int): """ Set the IP and port for the WebSocket connection. """ self.ip = ip self.port = port print(f"[WebSocket] IP set to {self.ip}:{self.port}", file=sys.stderr) def connect(self) -> str | None: """ Attempt to establish a WebSocket connection. Returns: None if successful, or an error message string if connection failed. """ with self.lock: if self.ws is None or not self.ws.connected: try: url = f"ws://{self.ip}:{self.port}" self.ws = websocket.create_connection(url, timeout=self.default_timeout) print( f"[WebSocket] Connected ({self.default_timeout}s timeout)", file=sys.stderr ) return None # no error except Exception as e: error_msg = f"[WebSocket] Connection error: {e}" print(error_msg, file=sys.stderr) self.ws = None return error_msg return None # already connected, no error def send(self, message: dict) -> str | None: """ Send a JSON-serializable message over WebSocket. Returns: None if successful, or an error message string if send failed. """ with self.lock: conn_error = self.connect() if conn_error: return conn_error # failed to connect if self.ws: try: json_msg = json.dumps(message) # ensure it's JSON-serializable self.ws.send(json_msg) return None # no error except TypeError as e: error_msg = f"[WebSocket] JSON serialization error: {e}" print(error_msg, file=sys.stderr) self.close() return error_msg except Exception as e: error_msg = f"[WebSocket] Send error: {e}" print(error_msg, file=sys.stderr) self.close() return error_msg return "[WebSocket] Not connected, send aborted." def receive(self, timeout: float | None = None) -> Union[str, bytes] | None: """ Receive a single message from rosbridge within the given timeout. Args: timeout (float | None): Seconds to wait before timing out. If None, uses the default timeout. Returns: str | None: JSON string received from rosbridge, or None if timeout/error. """ with self.lock: self.connect() if self.ws: try: # Use default timeout if none specified actual_timeout = timeout if timeout is not None else self.default_timeout print(f"[WebSocket] Using timeout of {actual_timeout} seconds", file=sys.stderr) # Temporarily set the receive timeout self.ws.settimeout(actual_timeout) raw = self.ws.recv() # rosbridge sends JSON as a string return raw except Exception as e: print(f"[WebSocket] Receive error or timeout: {e}", file=sys.stderr) self.close() return None return None def request(self, message: dict, timeout: float | None = None) -> dict: """ Send a request to Rosbridge and return the response. Args: message (dict): The Rosbridge message dictionary to send. timeout (float | None): Seconds to wait for a response. If None, uses the default timeout. Returns: dict: - Parsed JSON response if successful. - {"error": "<error message>"} if connection/send/receive fails. - {"error": "invalid_json", "raw": <response>} if decoding fails. """ # Attempt to send the message (connect() is called internally in send()) send_error = self.send(message) if send_error: return {"error": send_error} # Attempt to receive a response (connect() is called internally in receive()) response = self.receive(timeout=timeout) if response is None: return {"error": "no response or timeout from rosbridge"} # Attempt to parse response (auto-detect images, but services rarely return images) parsed_response, _ = parse_input(response, expects_image=None) if parsed_response is None: print(f"[WebSocket] JSON decode error for response: {response}", file=sys.stderr) return {"error": "invalid_json", "raw": response} return parsed_response def close(self): with self.lock: if self.ws and self.ws.connected: try: self.ws.close() print("[WebSocket] Closed", file=sys.stderr) except Exception as e: print(f"[WebSocket] Close error: {e}", file=sys.stderr) finally: self.ws = None def __enter__(self): """Context manager entry - automatically connects.""" # Don't connect here since we want to maintain the existing pattern # where request() handles connection automatically return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - automatically closes the connection.""" self.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/robotmcp/ros-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server