"""
Camera integration for Robotics WebApp.
Provides USB webcam support for real hardware integration.
"""
import base64
import io
import logging
import threading
import time
from typing import Any
import cv2
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
class USBCamera:
"""USB Camera integration using OpenCV."""
def __init__(self, device_id: int = 0, name: str = "usb_camera"):
self.device_id = device_id
self.name = name
self.cap = None
self.is_connected = False
self.last_frame = None
self.last_frame_time = 0
self.frame_lock = threading.Lock()
self.capture_thread = None
self.running = False
def connect(self) -> bool:
"""Connect to the USB camera."""
try:
self.cap = cv2.VideoCapture(self.device_id, cv2.CAP_DSHOW) # Use DirectShow on Windows
if not self.cap.isOpened():
logger.error(f"Failed to open camera device {self.device_id}")
return False
# Configure camera properties
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cap.set(cv2.CAP_PROP_FPS, 30)
self.is_connected = True
logger.info(f"Connected to USB camera {self.name} (device {self.device_id})")
return True
except Exception as e:
logger.error(f"Error connecting to camera {self.device_id}: {e}")
return False
def disconnect(self):
"""Disconnect from the camera."""
self.running = False
if self.capture_thread and self.capture_thread.is_alive():
self.capture_thread.join(timeout=1.0)
if self.cap:
self.cap.release()
self.cap = None
self.is_connected = False
logger.info(f"Disconnected from USB camera {self.name}")
def start_capture(self):
"""Start the capture thread."""
if self.running:
return
self.running = True
self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
self.capture_thread.start()
logger.info(f"Started capture thread for camera {self.name}")
def _capture_loop(self):
"""Background capture loop."""
while self.running and self.is_connected:
try:
if self.cap and self.cap.isOpened():
ret, frame = self.cap.read()
if ret:
with self.frame_lock:
self.last_frame = frame.copy()
self.last_frame_time = time.time()
else:
logger.warning(f"Camera {self.name} capture failed, attempting reconnect")
self.is_connected = False
break
except Exception as e:
logger.error(f"Error in capture loop for {self.name}: {e}")
self.is_connected = False
break
time.sleep(0.033) # ~30 FPS
def get_frame(self) -> np.ndarray | None:
"""Get the latest frame."""
with self.frame_lock:
return self.last_frame.copy() if self.last_frame is not None else None
def get_frame_as_jpeg(self, quality: int = 80) -> bytes | None:
"""Get the latest frame as JPEG bytes."""
frame = self.get_frame()
if frame is None:
return None
try:
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# Save as JPEG
buffer = io.BytesIO()
pil_image.save(buffer, format="JPEG", quality=quality)
return buffer.getvalue()
except Exception as e:
logger.error(f"Error converting frame to JPEG: {e}")
return None
def get_frame_as_base64(self, quality: int = 80) -> str | None:
"""Get the latest frame as base64-encoded JPEG."""
jpeg_data = self.get_frame_as_jpeg(quality)
if jpeg_data:
return base64.b64encode(jpeg_data).decode("utf-8")
return None
def get_status(self) -> dict[str, Any]:
"""Get camera status information."""
return {
"name": self.name,
"device_id": self.device_id,
"connected": self.is_connected,
"last_frame_time": self.last_frame_time,
"fps": 30, # Target FPS
"resolution": "640x480",
}
class CameraManager:
"""Manages multiple cameras for the robotics system."""
def __init__(self):
self.cameras: dict[str, USBCamera] = {}
self._monitor_task = None
self._running = False
def add_camera(self, device_id: int = 0, name: str = "robotics_camera") -> bool:
"""Add a USB camera to the system."""
if name in self.cameras:
logger.warning(f"Camera {name} already exists")
return False
camera = USBCamera(device_id=device_id, name=name)
if camera.connect():
camera.start_capture()
self.cameras[name] = camera
logger.info(f"Added camera {name} (device {device_id})")
return True
else:
logger.error(f"Failed to add camera {name}")
return False
def remove_camera(self, name: str):
"""Remove a camera from the system."""
if name in self.cameras:
self.cameras[name].disconnect()
del self.cameras[name]
logger.info(f"Removed camera {name}")
return True
return False
def get_camera(self, name: str) -> USBCamera | None:
"""Get a camera by name."""
return self.cameras.get(name)
def get_all_cameras(self) -> list[dict[str, Any]]:
"""Get status of all cameras."""
return [camera.get_status() for camera in self.cameras.values()]
def get_frame(self, camera_name: str, format: str = "jpeg", quality: int = 80) -> Any | None:
"""Get a frame from a specific camera."""
camera = self.get_camera(camera_name)
if not camera:
return None
if format == "base64":
return camera.get_frame_as_base64(quality)
elif format == "jpeg":
return camera.get_frame_as_jpeg(quality)
elif format == "numpy":
return camera.get_frame()
else:
return None
def start_monitoring(self):
"""Start the camera monitoring task."""
if self._running:
return
self._running = True
# Note: For a full implementation, we'd start an asyncio task here
# For now, cameras manage their own threads
logger.info("Camera monitoring started")
def stop_monitoring(self):
"""Stop camera monitoring."""
self._running = False
for camera in self.cameras.values():
camera.disconnect()
self.cameras.clear()
logger.info("Camera monitoring stopped")
# Global camera manager instance
camera_manager = CameraManager()
async def initialize_cameras():
"""Initialize cameras on startup."""
try:
# Try to add a default USB camera
success = camera_manager.add_camera(device_id=0, name="robotics_usb_camera")
if success:
camera_manager.start_monitoring()
logger.info("Camera system initialized successfully")
else:
logger.warning("No USB cameras detected or available")
except Exception as e:
logger.error(f"Failed to initialize camera system: {e}")
async def cleanup_cameras():
"""Clean up cameras on shutdown."""
camera_manager.stop_monitoring()
logger.info("Camera system cleaned up")