Skip to main content
Glama
orchestrator.pyโ€ข12.2 kB
""" Intent Orchestrator Routes voice, gesture, and UI intents to appropriate services """ import asyncio from typing import Dict, Any, Optional, List from datetime import datetime import structlog logger = structlog.get_logger(__name__) class IntentOrchestrator: """Orchestrates intents from voice, gesture, and UI interactions""" def __init__(self): self.intent_handlers = {} self.active_intents = {} self.intent_history = [] # Register default intent handlers self._register_default_handlers() logger.info("IntentOrchestrator initialized") def _register_default_handlers(self): """Register default intent handlers""" # Catalog intents self.intent_handlers.update({ "catalog.refresh": self._handle_catalog_refresh, "catalog.toggle_status": self._handle_catalog_toggle, "catalog.view_details": self._handle_catalog_details, "catalog.search": self._handle_catalog_search }) # Order intents self.intent_handlers.update({ "orders.view": self._handle_orders_view, "orders.refresh": self._handle_orders_refresh, "orders.complete": self._handle_order_complete, "orders.refund": self._handle_order_refund, "orders.details": self._handle_order_details }) # UI intents self.intent_handlers.update({ "ui.select": self._handle_ui_select, "ui.refresh": self._handle_ui_refresh, "ui.navigate": self._handle_ui_navigate }) # System intents self.intent_handlers.update({ "system.help": self._handle_system_help, "system.status": self._handle_system_status }) async def process_intent( self, intent: str, params: Dict[str, Any], source: str = "unknown" ) -> Dict[str, Any]: """Process an intent and route to appropriate handler""" try: logger.info("Processing intent", intent=intent, params=params, source=source) # Record intent intent_record = { "intent": intent, "params": params, "source": source, "timestamp": datetime.utcnow().isoformat(), "status": "processing" } intent_id = f"{intent}_{datetime.utcnow().timestamp()}" self.active_intents[intent_id] = intent_record # Find and execute handler handler = self.intent_handlers.get(intent) if not handler: logger.warning("No handler found for intent", intent=intent) result = { "status": "error", "error": f"No handler for intent: {intent}", "intent": intent } else: result = await handler(params) result["intent"] = intent result["source"] = source # Update intent record intent_record["status"] = result.get("status", "completed") intent_record["result"] = result # Move to history self.intent_history.append(intent_record) del self.active_intents[intent_id] # Keep history size manageable if len(self.intent_history) > 100: self.intent_history = self.intent_history[-50:] logger.info("Intent processed", intent=intent, status=result.get("status")) return result except Exception as e: logger.error("Failed to process intent", intent=intent, error=str(e)) return { "status": "error", "error": str(e), "intent": intent } # Catalog handlers async def _handle_catalog_refresh(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle catalog refresh intent""" return { "status": "success", "action": "refresh_catalog", "message": "Refreshing catalog items...", "ui_update": True } async def _handle_catalog_toggle(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle catalog item status toggle""" item_id = params.get("item_id") if not item_id: return {"status": "error", "error": "Missing item_id"} return { "status": "success", "action": "toggle_item_status", "item_id": item_id, "message": f"Toggling status for item {item_id}", "ui_update": True } async def _handle_catalog_details(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle view catalog item details""" item_id = params.get("item_id") if not item_id: return {"status": "error", "error": "Missing item_id"} return { "status": "success", "action": "show_item_details", "item_id": item_id, "message": f"Showing details for item {item_id}", "ui_update": True } async def _handle_catalog_search(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle catalog search""" query = params.get("query", "") return { "status": "success", "action": "search_catalog", "query": query, "message": f"Searching catalog for: {query}", "ui_update": True } # Order handlers async def _handle_orders_view(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle view orders intent""" return { "status": "success", "action": "show_orders_dashboard", "message": "Loading recent orders...", "ui_update": True } async def _handle_orders_refresh(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle orders refresh intent""" return { "status": "success", "action": "refresh_orders", "message": "Refreshing orders...", "ui_update": True } async def _handle_order_complete(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle mark order complete""" order_id = params.get("order_id") if not order_id: return {"status": "error", "error": "Missing order_id"} return { "status": "success", "action": "mark_order_complete", "order_id": order_id, "message": f"Marking order {order_id[:8]}... as complete", "ui_update": True } async def _handle_order_refund(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle process order refund""" order_id = params.get("order_id") reason = params.get("reason", "Customer request") if not order_id: return {"status": "error", "error": "Missing order_id"} return { "status": "success", "action": "process_order_refund", "order_id": order_id, "reason": reason, "message": f"Processing refund for order {order_id[:8]}...", "ui_update": True } async def _handle_order_details(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle view order details""" order_id = params.get("order_id") if not order_id: return {"status": "error", "error": "Missing order_id"} return { "status": "success", "action": "show_order_details", "order_id": order_id, "message": f"Showing details for order {order_id[:8]}...", "ui_update": True } # UI handlers async def _handle_ui_select(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle UI element selection""" element_id = params.get("element_id") element_type = params.get("element_type", "unknown") return { "status": "success", "action": "ui_select", "element_id": element_id, "element_type": element_type, "message": f"Selected {element_type}: {element_id}", "ui_update": True } async def _handle_ui_refresh(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle UI refresh""" target = params.get("target", "current_view") return { "status": "success", "action": "refresh_ui", "target": target, "message": f"Refreshing {target}...", "ui_update": True } async def _handle_ui_navigate(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle UI navigation""" target = params.get("target", "dashboard") return { "status": "success", "action": "navigate", "target": target, "message": f"Navigating to {target}...", "ui_update": True } # System handlers async def _handle_system_help(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle help request""" available_commands = [ "refresh catalog - Update catalog items", "show orders - Display recent orders", "toggle item status - Activate/deactivate items", "complete order - Mark order as complete", "refund order - Process order refund" ] return { "status": "success", "action": "show_help", "message": "Available commands:", "data": available_commands, "ui_update": False } async def _handle_system_status(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle system status request""" return { "status": "success", "action": "system_status", "message": "Qanat system is running", "data": { "active_intents": len(self.active_intents), "processed_intents": len(self.intent_history), "available_handlers": len(self.intent_handlers) }, "ui_update": False } def get_intent_history(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent intent history""" return self.intent_history[-limit:] def get_active_intents(self) -> Dict[str, Dict[str, Any]]: """Get currently active intents""" return self.active_intents.copy() def register_handler(self, intent: str, handler) -> None: """Register a new intent handler""" self.intent_handlers[intent] = handler logger.info("Registered intent handler", intent=intent) def unregister_handler(self, intent: str) -> None: """Unregister an intent handler""" if intent in self.intent_handlers: del self.intent_handlers[intent] logger.info("Unregistered intent handler", intent=intent) # Utility function for testing async def test_orchestrator(): """Test the intent orchestrator""" orchestrator = IntentOrchestrator() # Test catalog refresh result = await orchestrator.process_intent( "catalog.refresh", {}, "test" ) print(f"Catalog refresh: {result}") # Test order completion result = await orchestrator.process_intent( "orders.complete", {"order_id": "test_order_123"}, "test" ) print(f"Order complete: {result}") # Test help result = await orchestrator.process_intent( "system.help", {}, "test" ) print(f"Help: {result}") # Show history history = orchestrator.get_intent_history() print(f"Intent history: {len(history)} items") if __name__ == "__main__": asyncio.run(test_orchestrator())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/professordnyc/qanat-goose-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server