Skip to main content
Glama
event_handler.pyโ€ข4.18 kB
"""Event handling for telemetry and driver interactions.""" from __future__ import annotations import asyncio import contextlib import logging from typing import Any, Dict, Optional from .openai_client import OpenAIClient class MCPEventHandler: """Process telemetry events and driver messages, calling OpenAI when needed.""" def __init__(self, openai_client: OpenAIClient): self.openai_client = openai_client self.telemetry_queue: "asyncio.Queue[Dict[str, Any]]" = asyncio.Queue() self.voice_queue: "asyncio.Queue[bytes]" = asyncio.Queue() self.last_flag_state = "Green" self._telemetry_task: Optional[asyncio.Task[None]] = None self._voice_task: Optional[asyncio.Task[None]] = None self._running = False async def start(self) -> None: if not self._telemetry_task: self._running = True self._telemetry_task = asyncio.create_task(self._process_telemetry()) if not self._voice_task: self._voice_task = asyncio.create_task(self._process_voice()) async def stop(self) -> None: self._running = False for task_name in ("_telemetry_task", "_voice_task"): task = getattr(self, task_name) if task: task.cancel() with contextlib.suppress(Exception): await task setattr(self, task_name, None) async def on_telemetry(self, telemetry: Dict[str, Any]) -> None: await self.telemetry_queue.put(telemetry) async def on_voice_input(self, audio: bytes) -> None: """Queue raw audio bytes for processing.""" await self.voice_queue.put(audio) async def handle_user_message(self, message: str) -> str: messages = [ {"role": "system", "content": "You are a helpful racing coach."}, {"role": "user", "content": message}, ] response = await self.openai_client.chat(messages) return response.get("content", "") async def handle_voice_input(self, audio: bytes) -> str: """Transcribe audio and handle it as a user message.""" transcript = await self.openai_client.transcribe_audio(audio) if not transcript: return "" return await self.handle_user_message(transcript) async def _process_telemetry(self) -> None: while self._running: telemetry = await self.telemetry_queue.get() try: await self._evaluate_telemetry(telemetry) except Exception as exc: # noqa: BLE001 logging.error("Telemetry processing error: %s", exc) async def _process_voice(self) -> None: while self._running: audio = await self.voice_queue.get() try: response = await self.handle_voice_input(audio) if response: logging.info("Voice response: %s", response) except Exception as exc: # noqa: BLE001 logging.error("Voice processing error: %s", exc) async def _evaluate_telemetry(self, telemetry: Dict[str, Any]) -> None: flag_state = telemetry.get("flag_state", "Green") if flag_state != self.last_flag_state: self.last_flag_state = flag_state messages = [ {"role": "system", "content": "You are a professional racing spotter."}, { "role": "user", "content": f"Flag changed to {flag_state}. Offer concise advice.", }, ] response = await self.openai_client.chat(messages) logging.info("Flag change advice: %s", response.get("content", "")) if not telemetry.get("is_on_track", True): messages = [ {"role": "system", "content": "You are a racing coach."}, { "role": "user", "content": "Driver off track, give recovery tips.", }, ] response = await self.openai_client.chat(messages) logging.info("Off-track advice: %s", response.get("content", ""))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/consolecowboy0/race-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server