Skip to main content
Glama
Panth1823

Formula1 MCP Server

openf1-stream.service.ts10.5 kB
import mqtt from 'mqtt'; import { config } from '../config/index.js'; import { logger } from '../utils/logger.js'; import { openf1Auth } from './openf1-auth.service.js'; /** * MQTT topic subscriptions for OpenF1 live data */ export enum OpenF1Topic { CAR_DATA = 'cardata.z', POSITION = 'position.z', RACE_CONTROL = 'race_control_messages', TEAM_RADIO = 'team_radio', TIMING_DATA = 'timing_data.z', TIMING_APP_DATA = 'timing_app_data.z', WEATHER_DATA = 'weather_data.z', SESSION_INFO = 'session_info', } /** * Live data event handler callback */ export type LiveDataHandler = (topic: string, data: any) => void; /** * OpenF1 Streaming Service * Manages MQTT WebSocket connection for real-time F1 data * Handles authentication, subscriptions, and message processing */ export class OpenF1StreamService { private static instance: OpenF1StreamService; private mqttClient: mqtt.MqttClient | null = null; private isConnecting: boolean = false; private isConnected: boolean = false; private subscribedTopics: Set<string> = new Set(); private dataHandlers: Map<string, Set<LiveDataHandler>> = new Map(); private reconnectAttempts: number = 0; private readonly MAX_RECONNECT_ATTEMPTS = 5; private constructor() {} /** * Get singleton instance */ public static getInstance(): OpenF1StreamService { if (!OpenF1StreamService.instance) { OpenF1StreamService.instance = new OpenF1StreamService(); } return OpenF1StreamService.instance; } /** * Connect to OpenF1 MQTT broker */ public async connect(): Promise<void> { // Check if streaming is enabled if (!config.mqttEnabled || !config.openf1StreamingEnabled) { throw new Error('MQTT streaming not enabled. Set MQTT_ENABLED=true and OPENF1_STREAMING_ENABLED=true'); } // Already connected if (this.isConnected) { logger.debug('Already connected to OpenF1 MQTT broker'); return; } // Connection in progress if (this.isConnecting) { logger.debug('Connection to OpenF1 MQTT broker already in progress'); return; } this.isConnecting = true; try { // Get access token for authentication const accessToken = await openf1Auth.getAccessToken(); logger.info('Connecting to OpenF1 MQTT broker', { url: config.mqttUrl }); // Create MQTT client with authentication this.mqttClient = mqtt.connect(config.mqttUrl, { username: 'openf1', password: accessToken, protocolVersion: 5, // MQTT 5.0 reconnectPeriod: 5000, // Auto-reconnect every 5 seconds connectTimeout: 30000, // 30 second timeout clean: true, clientId: `f1-mcp-${Date.now()}`, }); // Setup event handlers this.setupEventHandlers(); // Wait for connection await this.waitForConnection(); logger.info('Successfully connected to OpenF1 MQTT broker'); this.isConnected = true; this.isConnecting = false; this.reconnectAttempts = 0; } catch (error) { this.isConnecting = false; this.isConnected = false; logger.error('Failed to connect to OpenF1 MQTT broker', { error }); throw error; } } /** * Wait for MQTT connection to establish */ private waitForConnection(): Promise<void> { return new Promise((resolve, reject) => { if (!this.mqttClient) { return reject(new Error('MQTT client not initialized')); } const timeout = setTimeout(() => { reject(new Error('MQTT connection timeout')); }, 30000); this.mqttClient.once('connect', () => { clearTimeout(timeout); resolve(); }); this.mqttClient.once('error', (error) => { clearTimeout(timeout); reject(error); }); }); } /** * Setup MQTT event handlers */ private setupEventHandlers(): void { if (!this.mqttClient) return; // Connection established this.mqttClient.on('connect', () => { logger.info('MQTT connection established'); this.isConnected = true; this.reconnectAttempts = 0; // Re-subscribe to topics after reconnection if (this.subscribedTopics.size > 0) { this.resubscribeToTopics(); } }); // Message received this.mqttClient.on('message', (topic: string, payload: Buffer) => { this.handleMessage(topic, payload); }); // Connection error this.mqttClient.on('error', (error) => { logger.error('MQTT connection error', { error }); // Check if authentication error (401) if (error.message.includes('401') || error.message.includes('unauthorized')) { logger.warn('MQTT authentication failed - invalidating token'); openf1Auth.invalidateToken(); } }); // Connection closed this.mqttClient.on('close', () => { logger.info('MQTT connection closed'); this.isConnected = false; }); // Reconnecting this.mqttClient.on('reconnect', () => { this.reconnectAttempts++; logger.info(`MQTT reconnecting (attempt ${this.reconnectAttempts}/${this.MAX_RECONNECT_ATTEMPTS})`); if (this.reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) { logger.error('Max reconnection attempts reached'); this.disconnect(); } }); // Offline this.mqttClient.on('offline', () => { logger.warn('MQTT client offline'); this.isConnected = false; }); } /** * Handle incoming MQTT message */ private handleMessage(topic: string, payload: Buffer): void { try { // Parse JSON payload (OpenF1 sends JSON) const data = JSON.parse(payload.toString()); logger.debug('Received MQTT message', { topic, dataSize: payload.length }); // Call registered handlers for this topic const handlers = this.dataHandlers.get(topic); if (handlers) { handlers.forEach(handler => { try { handler(topic, data); } catch (error) { logger.error('Error in data handler', { topic, error }); } }); } // Also call wildcard handlers (registered with '*') const wildcardHandlers = this.dataHandlers.get('*'); if (wildcardHandlers) { wildcardHandlers.forEach(handler => { try { handler(topic, data); } catch (error) { logger.error('Error in wildcard handler', { topic, error }); } }); } } catch (error) { logger.error('Failed to parse MQTT message', { topic, error }); } } /** * Subscribe to MQTT topic */ public async subscribe(topic: OpenF1Topic | string, handler?: LiveDataHandler): Promise<void> { if (!this.isConnected || !this.mqttClient) { throw new Error('Not connected to MQTT broker. Call connect() first.'); } return new Promise((resolve, reject) => { this.mqttClient!.subscribe(topic, { qos: 1 }, (error) => { if (error) { logger.error('Failed to subscribe to topic', { topic, error }); reject(error); } else { logger.info('Subscribed to MQTT topic', { topic }); this.subscribedTopics.add(topic); // Register handler if provided if (handler) { this.addHandler(topic, handler); } resolve(); } }); }); } /** * Unsubscribe from MQTT topic */ public async unsubscribe(topic: OpenF1Topic | string): Promise<void> { if (!this.mqttClient) { throw new Error('MQTT client not initialized'); } return new Promise((resolve, reject) => { this.mqttClient!.unsubscribe(topic, (error) => { if (error) { logger.error('Failed to unsubscribe from topic', { topic, error }); reject(error); } else { logger.info('Unsubscribed from MQTT topic', { topic }); this.subscribedTopics.delete(topic); this.dataHandlers.delete(topic); resolve(); } }); }); } /** * Re-subscribe to all topics after reconnection */ private resubscribeToTopics(): void { logger.info(`Re-subscribing to ${this.subscribedTopics.size} topics`); this.subscribedTopics.forEach(topic => { this.mqttClient!.subscribe(topic, { qos: 1 }, (error) => { if (error) { logger.error('Failed to re-subscribe to topic', { topic, error }); } else { logger.debug('Re-subscribed to topic', { topic }); } }); }); } /** * Add data handler for topic */ public addHandler(topic: string, handler: LiveDataHandler): void { if (!this.dataHandlers.has(topic)) { this.dataHandlers.set(topic, new Set()); } this.dataHandlers.get(topic)!.add(handler); logger.debug('Added handler for topic', { topic }); } /** * Remove data handler for topic */ public removeHandler(topic: string, handler: LiveDataHandler): void { const handlers = this.dataHandlers.get(topic); if (handlers) { handlers.delete(handler); if (handlers.size === 0) { this.dataHandlers.delete(topic); } } } /** * Disconnect from MQTT broker */ public async disconnect(): Promise<void> { if (!this.mqttClient) { logger.debug('No MQTT client to disconnect'); return; } logger.info('Disconnecting from OpenF1 MQTT broker'); return new Promise((resolve) => { this.mqttClient!.end(false, {}, () => { logger.info('Disconnected from OpenF1 MQTT broker'); this.isConnected = false; this.mqttClient = null; this.subscribedTopics.clear(); this.dataHandlers.clear(); resolve(); }); }); } /** * Get connection status */ public getStatus(): { connected: boolean; connecting: boolean; subscribedTopics: string[]; reconnectAttempts: number; } { return { connected: this.isConnected, connecting: this.isConnecting, subscribedTopics: Array.from(this.subscribedTopics), reconnectAttempts: this.reconnectAttempts, }; } /** * Check if connected */ public isActive(): boolean { return this.isConnected; } } // Export singleton instance export const openf1Stream = OpenF1StreamService.getInstance();

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Panth1823/formula1-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server