Skip to main content
Glama
livekit_handler.py3.63 kB
import logging import os from typing import Optional, Callable, Dict import asyncio from livekit.rtc import Room, RemoteParticipant, DataPacketKind logger = logging.getLogger('livekit_handler') class LiveKitHandler: def __init__(self): self.room: Optional[Room] = None self._message_handlers: Dict[str, Callable] = {} # LiveKit configuration self.url = os.getenv('LIVEKIT_URL') self.api_key = os.getenv('LIVEKIT_API_KEY') self.api_secret = os.getenv('LIVEKIT_API_SECRET') if not all([self.url, self.api_key, self.api_secret]): logger.warning("LiveKit environment variables not fully configured") return logger.info("LiveKit configuration loaded") def register_message_handler(self, message_type: str, handler: Callable): """Register a handler for a specific message type""" self._message_handlers[message_type] = handler logger.info(f"Registered handler for message type: {message_type}") async def handle_data_message(self, data: bytes, participant: RemoteParticipant): """Handle incoming data messages""" try: message = data.decode('utf-8') logger.info(f"Received data message from {participant.identity}: {message}") # Call appropriate handler if registered if message in self._message_handlers: await self._message_handlers[message](participant) except Exception as e: logger.error(f"Error handling data message: {str(e)}") async def start(self, room_name: str, token: str): """Start LiveKit connection""" if not all([self.url, self.api_key, self.api_secret]): logger.error("LiveKit environment variables not configured") return False try: self.room = Room() @self.room.on("participant_connected") def on_participant_connected(participant: RemoteParticipant): logger.info(f"participant connected: {participant.sid} {participant.identity}") @self.room.on("data_received") def on_data_received(data: bytes, participant: RemoteParticipant): asyncio.create_task(self.handle_data_message(data, participant)) # Connect to the room with auto_subscribe disabled since we only need data channel await self.room.connect(self.url, token, auto_subscribe=False) logger.info(f"Connected to LiveKit room: {room_name}") return True except Exception as e: logger.error(f"Failed to start LiveKit: {str(e)}") return False async def send_data(self, message: str, reliable: bool = True): """Send data to all participants in the room""" if not self.room: logger.error("Room not initialized") return False try: await self.room.local_participant.publish_data( message.encode('utf-8'), kind=DataPacketKind.RELIABLE if reliable else DataPacketKind.LOSSY ) return True except Exception as e: logger.error(f"Failed to send data: {str(e)}") return False async def stop(self): """Stop LiveKit connection""" if self.room: try: await self.room.disconnect() logger.info("Disconnected from LiveKit room") except Exception as e: logger.error(f"Error disconnecting from LiveKit: {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baryhuang/mcp-remote-macos-use'

If you have feedback or need assistance with the MCP directory API, please join our Discord server