Skip to main content
Glama
gesture_agent.pyโ€ข13.3 kB
""" Gesture Agent - MediaPipe Integration Handles hand gesture recognition for UI control """ import asyncio import cv2 import numpy as np from typing import Dict, Any, Optional, Callable, Tuple from datetime import datetime, timedelta import structlog try: import mediapipe as mp MEDIAPIPE_AVAILABLE = True except ImportError: MEDIAPIPE_AVAILABLE = False logger = structlog.get_logger(__name__) class GestureAgent: """Gesture recognition agent using MediaPipe""" def __init__(self, config: Dict[str, Any], orchestrator=None): self.config = config self.mediapipe_config = config.get("mediapipe", {}) self.confidence_threshold = float(self.mediapipe_config.get("confidence_threshold", 0.7)) self.orchestrator = orchestrator # MediaPipe setup if MEDIAPIPE_AVAILABLE: self.mp_hands = mp.solutions.hands self.mp_drawing = mp.solutions.drawing_utils self.hands = self.mp_hands.Hands( static_image_mode=False, max_num_hands=2, model_complexity=1, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) else: self.mp_hands = None self.hands = None # Gesture mappings self.gesture_mappings = { "thumb_up": { "intent": "catalog.toggle_status", "description": "Toggle item active/inactive status", "cooldown_ms": 1000 }, "point_index": { "intent": "ui.select", "description": "Select table row or UI element", "cooldown_ms": 500 }, "open_palm": { "intent": "ui.refresh", "description": "Refresh current dashboard", "cooldown_ms": 2000 }, "peace_sign": { "intent": "ui.navigate", "description": "Switch between catalog and orders view", "cooldown_ms": 1500 } } # Gesture state self.last_gesture_time = {} self.current_gesture = None self.gesture_confidence = 0.0 self.is_detecting = False self.camera = None self.selected_item_id = None # For context-aware gestures logger.info("GestureAgent initialized", mediapipe_available=MEDIAPIPE_AVAILABLE, confidence_threshold=self.confidence_threshold) async def initialize(self): """Initialize the gesture agent""" if not MEDIAPIPE_AVAILABLE: logger.warning("MediaPipe not available, gesture features disabled") return logger.info("GestureAgent initialized with MediaPipe") async def start_detection(self, callback: Optional[Callable] = None, camera_id: int = 0): """Start gesture detection""" if not MEDIAPIPE_AVAILABLE: logger.error("MediaPipe not available, cannot start gesture detection") return try: self.is_detecting = True self.camera = cv2.VideoCapture(camera_id) if not self.camera.isOpened(): logger.error("Failed to open camera", camera_id=camera_id) return self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) self.camera.set(cv2.CAP_PROP_FPS, 30) logger.info("Gesture detection started", camera_id=camera_id) while self.is_detecting: try: ret, frame = self.camera.read() if not ret: logger.warning("Failed to read camera frame") await asyncio.sleep(0.1) continue # Process frame for gestures gesture_result = await self._process_frame(frame) if gesture_result and callback: await callback(gesture_result) # Brief pause to prevent overprocessing await asyncio.sleep(0.033) # ~30 FPS except Exception as e: logger.error("Error during gesture detection", error=str(e)) await asyncio.sleep(0.1) except Exception as e: logger.error("Failed to start gesture detection", error=str(e)) finally: if self.camera: self.camera.release() async def stop_detection(self): """Stop gesture detection""" self.is_detecting = False if self.camera: self.camera.release() logger.info("Gesture detection stopped") async def _process_frame(self, frame: np.ndarray) -> Optional[Dict[str, Any]]: """Process a single frame for gesture recognition""" try: # Convert BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Process with MediaPipe results = self.hands.process(rgb_frame) if not results.multi_hand_landmarks: return None # Analyze first hand (for demo simplicity) hand_landmarks = results.multi_hand_landmarks[0] # Detect gesture gesture, confidence = self._classify_gesture(hand_landmarks) if gesture and confidence > self.confidence_threshold: return await self._handle_gesture(gesture, confidence) return None except Exception as e: logger.error("Failed to process frame", error=str(e)) return None def _classify_gesture(self, landmarks) -> Tuple[Optional[str], float]: """Classify hand gesture from landmarks""" try: # Extract landmark positions landmark_points = [] for lm in landmarks.landmark: landmark_points.append([lm.x, lm.y]) landmark_array = np.array(landmark_points) # Simple gesture classification based on landmark positions # (In production, you'd use more sophisticated ML models) # Thumb up detection thumb_tip = landmark_array[4] thumb_mcp = landmark_array[2] index_tip = landmark_array[8] # Check if thumb is extended upward if thumb_tip[1] < thumb_mcp[1] - 0.05: # Thumb pointing up other_fingers_down = all( landmark_array[tip][1] > landmark_array[tip-2][1] for tip in [8, 12, 16, 20] # Other finger tips ) if other_fingers_down: return "thumb_up", 0.85 # Point index detection index_extended = index_tip[1] < landmark_array[6][1] - 0.05 other_fingers_folded = all( landmark_array[tip][1] > landmark_array[tip-2][1] for tip in [12, 16, 20] ) if index_extended and other_fingers_folded: return "point_index", 0.8 # Open palm detection (all fingers extended) all_extended = all( landmark_array[tip][1] < landmark_array[tip-2][1] - 0.03 for tip in [8, 12, 16, 20] ) if all_extended: return "open_palm", 0.75 # Peace sign (index and middle extended) index_extended = landmark_array[8][1] < landmark_array[6][1] - 0.05 middle_extended = landmark_array[12][1] < landmark_array[10][1] - 0.05 others_folded = all( landmark_array[tip][1] > landmark_array[tip-2][1] for tip in [16, 20] ) if index_extended and middle_extended and others_folded: return "peace_sign", 0.8 return None, 0.0 except Exception as e: logger.error("Failed to classify gesture", error=str(e)) return None, 0.0 async def _handle_gesture(self, gesture: str, confidence: float) -> Optional[Dict[str, Any]]: """Handle detected gesture""" try: # Check cooldown now = datetime.utcnow() mapping = self.gesture_mappings.get(gesture) if not mapping: return None last_time = self.last_gesture_time.get(gesture) cooldown_ms = mapping["cooldown_ms"] if last_time: time_diff = (now - last_time).total_seconds() * 1000 if time_diff < cooldown_ms: return None # Still in cooldown # Update last gesture time self.last_gesture_time[gesture] = now logger.info("Gesture detected", gesture=gesture, confidence=confidence) # Prepare gesture parameters params = {} # Add context for specific gestures if gesture == "thumb_up" and self.selected_item_id: params["item_id"] = self.selected_item_id elif gesture == "ui.navigate": params["target"] = "toggle_catalog_orders" elif gesture == "ui.refresh": params["target"] = "current_view" # Create result result = { "status": "success", "gesture": gesture, "confidence": confidence, "intent": mapping["intent"], "params": params, "description": mapping["description"], "timestamp": now.isoformat() } # Execute through orchestrator if self.orchestrator: orchestrator_result = await self.orchestrator.process_intent( mapping["intent"], params, "gesture" ) result["orchestrator_result"] = orchestrator_result return result except Exception as e: logger.error("Failed to handle gesture", gesture=gesture, error=str(e)) return None async def test_gesture(self, gesture: str) -> Dict[str, Any]: """Test a gesture without camera input""" try: mapping = self.gesture_mappings.get(gesture) if not mapping: return { "status": "error", "error": f"Unknown gesture: {gesture}" } # Simulate gesture detection result = await self._handle_gesture(gesture, 0.9) if not result: result = { "status": "cooldown", "gesture": gesture, "message": "Gesture in cooldown period" } return result except Exception as e: logger.error("Failed to test gesture", gesture=gesture, error=str(e)) return { "status": "error", "gesture": gesture, "error": str(e) } def set_selected_item(self, item_id: Optional[str]): """Set the currently selected item for context-aware gestures""" self.selected_item_id = item_id logger.info("Selected item updated", item_id=item_id) def get_available_gestures(self) -> Dict[str, str]: """Get list of available gestures""" return { gesture: info["description"] for gesture, info in self.gesture_mappings.items() } async def close(self): """Close the gesture agent""" await self.stop_detection() logger.info("GestureAgent closed") # Utility function for testing async def test_gesture_agent(): """Test the gesture agent""" from .orchestrator import IntentOrchestrator config = { "mediapipe": { "confidence_threshold": "0.7" } } orchestrator = IntentOrchestrator() agent = GestureAgent(config, orchestrator) await agent.initialize() # Test gestures test_gestures = [ "thumb_up", "point_index", "open_palm", "peace_sign", "unknown_gesture" ] for gesture in test_gestures: result = await agent.test_gesture(gesture) print(f"Gesture: '{gesture}' -> {result['status']}") # Show available gestures gestures = agent.get_available_gestures() print(f"Available gestures: {list(gestures.keys())}") await agent.close() if __name__ == "__main__": asyncio.run(test_gesture_agent())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/professordnyc/qanat-goose-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server