Skip to main content
Glama
mock_iracing_stream.pyโ€ข6.21 kB
import asyncio import json import math import random import time from dataclasses import dataclass, asdict from typing import Dict, Any @dataclass class MockTelemetry: """Simplified telemetry snapshot matching iRacing fields.""" SessionTime: float Lap: int LapCurrentLapTime: float LapDist: float Speed: float RPM: float Gear: int Throttle: float Brake: float SteeringWheelAngle: float TrackTemp: float AirTemp: float FuelLevel: float IsOnTrack: bool SessionState: str SessionFlags: str class MockIRacingDataGenerator: """Generate somewhat realistic iRacing-like telemetry data.""" def __init__(self, track_length_m: float = 5000.0, tick_rate: float = 10.0): self.track_length = track_length_m self.tick_rate = tick_rate self.start_time = time.time() self.lap_start_time = self.start_time self.lap = 1 self.lap_dist = 0.0 self.speed = 0.0 self.rpm = 0.0 self.gear = 1 self.throttle = 0.0 self.brake = 0.0 self.fuel_level = 18.0 self.track_temp = 85.0 self.air_temp = 72.0 self.steering = 0.0 def _update_state(self, dt: float) -> None: # Randomly adjust throttle and brake for variability if random.random() < 0.1: self.throttle = max(0.0, min(1.0, self.throttle + random.uniform(-0.3, 0.3))) if random.random() < 0.05: self.brake = max(0.0, min(1.0, random.uniform(0.0, 1.0))) else: self.brake = max(0.0, self.brake - 0.1) # Update speed based on throttle and brake accel = (self.throttle - self.brake) * 5.0 # m/s^2 self.speed = max(0.0, self.speed + accel * dt) self.speed = min(self.speed, 80.0) # cap around 180 mph (~80 m/s) # Convert speed to mph for telemetry speed_mph = self.speed * 2.23694 # Simple gear estimation based on speed self.gear = min(6, max(1, int(speed_mph // 30) + 1)) # RPM tied to speed and gear with some noise gear_ratio = [0, 3.2, 2.1, 1.5, 1.2, 1.0, 0.85][self.gear] self.rpm = min(9000.0, speed_mph * gear_ratio * 50 + random.uniform(-50, 50)) # Steering oscillates slightly with noise self.steering = math.sin(time.time() * 0.5) * 0.2 + random.uniform(-0.02, 0.02) # Update lap distance self.lap_dist += self.speed * dt if self.lap_dist >= self.track_length: self.lap += 1 self.lap_dist -= self.track_length self.lap_start_time = time.time() # Fuel consumption self.fuel_level = max(0.0, self.fuel_level - self.speed * dt * 0.0005) def generate(self) -> Dict[str, Any]: now = time.time() dt = now - self.start_time - self.session_time if hasattr(self, 'session_time') else 1.0 / self.tick_rate self.session_time = now - self.start_time self._update_state(dt) # Simulate occasional flag changes and off-track events for AI testing elapsed = self.session_time # Simulate flag changes every 30-60 seconds flag_cycle = elapsed % 60 if flag_cycle < 5: session_flags = "Yellow" # Caution elif flag_cycle < 8: session_flags = "Red" # Red flag elif flag_cycle < 55: session_flags = "Green" # Normal racing else: session_flags = "Checkered" # End of session # Simulate going off track occasionally (every 45 seconds for 3 seconds) off_track_cycle = elapsed % 45 is_on_track = not (42 <= off_track_cycle <= 45) lap_time = now - self.lap_start_time telemetry = MockTelemetry( SessionTime=self.session_time, Lap=self.lap, LapCurrentLapTime=lap_time, LapDist=self.lap_dist, Speed=self.speed * 2.23694, # mph RPM=self.rpm, Gear=self.gear, Throttle=self.throttle, Brake=self.brake, SteeringWheelAngle=self.steering, TrackTemp=self.track_temp, AirTemp=self.air_temp, FuelLevel=self.fuel_level, IsOnTrack=is_on_track, SessionState="Racing", SessionFlags=session_flags, ) return asdict(telemetry) class MockIRacingStreamServer: """TCP server emitting JSON telemetry frames to connected clients.""" def __init__(self, host: str = "127.0.0.1", port: int = 9000, tick_rate: float = 10.0): self.generator = MockIRacingDataGenerator(tick_rate=tick_rate) self.host = host self.port = port self.tick_rate = tick_rate async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: try: while True: data = json.dumps(self.generator.generate()).encode() + b"\n" writer.write(data) await writer.drain() await asyncio.sleep(1.0 / self.tick_rate) except asyncio.CancelledError: # pragma: no cover - server shutdown pass finally: writer.close() await writer.wait_closed() async def start(self) -> None: server = await asyncio.start_server(self._handle_client, self.host, self.port) async with server: await server.serve_forever() def main() -> None: """Entry point for running the mock telemetry stream.""" import argparse parser = argparse.ArgumentParser(description="Mock iRacing telemetry stream") parser.add_argument("--host", default="127.0.0.1", help="Host to bind server") parser.add_argument("--port", type=int, default=9000, help="Port to bind server") parser.add_argument("--rate", type=float, default=10.0, help="Telemetry frames per second") args = parser.parse_args() server = MockIRacingStreamServer(host=args.host, port=args.port, tick_rate=args.rate) try: asyncio.run(server.start()) except KeyboardInterrupt: # pragma: no cover - manual stop pass if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/consolecowboy0/race-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server