import logging
import os
from typing import Optional, Callable, Dict
import asyncio
from livekit.rtc import Room, RemoteParticipant, DataPacketKind
logger = logging.getLogger('livekit_handler')
class LiveKitHandler:
def __init__(self):
self.room: Optional[Room] = None
self._message_handlers: Dict[str, Callable] = {}
# LiveKit configuration
self.url = os.getenv('LIVEKIT_URL')
self.api_key = os.getenv('LIVEKIT_API_KEY')
self.api_secret = os.getenv('LIVEKIT_API_SECRET')
if not all([self.url, self.api_key, self.api_secret]):
logger.warning("LiveKit environment variables not fully configured")
return
logger.info("LiveKit configuration loaded")
def register_message_handler(self, message_type: str, handler: Callable):
"""Register a handler for a specific message type"""
self._message_handlers[message_type] = handler
logger.info(f"Registered handler for message type: {message_type}")
async def handle_data_message(self, data: bytes, participant: RemoteParticipant):
"""Handle incoming data messages"""
try:
message = data.decode('utf-8')
logger.info(f"Received data message from {participant.identity}: {message}")
# Call appropriate handler if registered
if message in self._message_handlers:
await self._message_handlers[message](participant)
except Exception as e:
logger.error(f"Error handling data message: {str(e)}")
async def start(self, room_name: str, token: str):
"""Start LiveKit connection"""
if not all([self.url, self.api_key, self.api_secret]):
logger.error("LiveKit environment variables not configured")
return False
try:
self.room = Room()
@self.room.on("participant_connected")
def on_participant_connected(participant: RemoteParticipant):
logger.info(f"participant connected: {participant.sid} {participant.identity}")
@self.room.on("data_received")
def on_data_received(data: bytes, participant: RemoteParticipant):
asyncio.create_task(self.handle_data_message(data, participant))
# Connect to the room with auto_subscribe disabled since we only need data channel
await self.room.connect(self.url, token, auto_subscribe=False)
logger.info(f"Connected to LiveKit room: {room_name}")
return True
except Exception as e:
logger.error(f"Failed to start LiveKit: {str(e)}")
return False
async def send_data(self, message: str, reliable: bool = True):
"""Send data to all participants in the room"""
if not self.room:
logger.error("Room not initialized")
return False
try:
await self.room.local_participant.publish_data(
message.encode('utf-8'),
kind=DataPacketKind.RELIABLE if reliable else DataPacketKind.LOSSY
)
return True
except Exception as e:
logger.error(f"Failed to send data: {str(e)}")
return False
async def stop(self):
"""Stop LiveKit connection"""
if self.room:
try:
await self.room.disconnect()
logger.info("Disconnected from LiveKit room")
except Exception as e:
logger.error(f"Error disconnecting from LiveKit: {str(e)}")