Skip to main content
Glama

Bangalore BMTC Mobility Connectivity Platform

by ajeetraina
data-integration-service.js22.2 kB
// BMTC Data Integration Service // // This service connects to various BMTC data sources and transforms them // into standardized formats for use in the MCP platform. // // Main responsibilities: // 1. Connect to BMTC GPS/ATD tracking systems // 2. Fetch and transform schedule data // 3. Generate GTFS feeds // 4. Maintain a data lake for analytics const fs = require('fs'); const path = require('path'); const axios = require('axios'); const csv = require('csv-parser'); const { Parser } = require('json2csv'); const schedule = require('node-schedule'); const mongoose = require('mongoose'); const { MongoClient } = require('mongodb'); const logger = require('./utils/logger'); const config = require('./config'); // MongoDB models (imported from main server) const { Bus, Route, Stop, BusLocation } = require('./models'); // Connect to MongoDB mongoose.connect(config.mongodb.uri, { useNewUrlParser: true, useUnifiedTopology: true }) .then(() => logger.info('MongoDB connected')) .catch(err => logger.error('MongoDB connection error:', err)); // Initialize the data integration service class BMTCDataIntegrationService { constructor() { this.isRunning = false; this.stats = { lastRun: null, recordsProcessed: 0, errors: 0 }; } // Start the service with scheduled jobs async start() { if (this.isRunning) { logger.warn('Data integration service is already running'); return; } this.isRunning = true; logger.info('Starting BMTC data integration service'); try { // Initialize connections to data sources await this.initializeDataSources(); // Schedule jobs this.scheduleJobs(); // Run initial data import await this.performFullDataImport(); logger.info('BMTC data integration service started successfully'); } catch (error) { logger.error('Failed to start data integration service:', error); this.isRunning = false; throw error; } } // Initialize connections to various data sources async initializeDataSources() { logger.info('Initializing connections to data sources'); // Test connection to BMTC GPS API try { await axios.get(config.bmtcApi.healthEndpoint, { headers: { 'Authorization': `Bearer ${config.bmtcApi.apiKey}` } }); logger.info('Successfully connected to BMTC GPS API'); } catch (error) { logger.error('Failed to connect to BMTC GPS API:', error.message); // We'll continue anyway and retry later } // Test connection to BMTC Schedule Database try { const client = new MongoClient(config.bmtcScheduleDb.uri); await client.connect(); await client.db().command({ ping: 1 }); await client.close(); logger.info('Successfully connected to BMTC Schedule Database'); } catch (error) { logger.error('Failed to connect to BMTC Schedule Database:', error.message); // We'll continue anyway and retry later } // Initialize local data directories const dirs = [ config.dataDirectories.raw, config.dataDirectories.processed, config.dataDirectories.gtfs, config.dataDirectories.logs ]; for (const dir of dirs) { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); logger.info(`Created directory: ${dir}`); } } } // Schedule recurring jobs scheduleJobs() { logger.info('Setting up scheduled jobs'); // Fetch GPS data every minute schedule.scheduleJob('*/1 * * * *', async () => { try { await this.fetchRealtimeGpsData(); } catch (error) { logger.error('Error in GPS data fetch job:', error); this.stats.errors++; } }); // Update schedules daily at 1:00 AM schedule.scheduleJob('0 1 * * *', async () => { try { await this.updateScheduleData(); } catch (error) { logger.error('Error in schedule update job:', error); this.stats.errors++; } }); // Generate GTFS static feed weekly on Sunday at 2:00 AM schedule.scheduleJob('0 2 * * 0', async () => { try { await this.generateGtfsStaticFeed(); } catch (error) { logger.error('Error in GTFS static feed generation job:', error); this.stats.errors++; } }); // Clean up old data monthly on the 1st at 3:00 AM schedule.scheduleJob('0 3 1 * *', async () => { try { await this.cleanupOldData(); } catch (error) { logger.error('Error in data cleanup job:', error); this.stats.errors++; } }); logger.info('All scheduled jobs have been set up'); } // Perform a full data import (typically run on startup) async performFullDataImport() { logger.info('Starting full data import'); try { // Import static data first await this.importStops(); await this.importRoutes(); await this.importBuses(); // Then import dynamic data await this.fetchRealtimeGpsData(); // Generate initial GTFS feed await this.generateGtfsStaticFeed(); this.stats.lastRun = new Date(); logger.info('Full data import completed successfully'); } catch (error) { logger.error('Error during full data import:', error); this.stats.errors++; throw error; } } // Import bus stops from BMTC data source async importStops() { logger.info('Importing bus stops data'); try { // In a real implementation, this would fetch from BMTC's system // For this example, we'll simulate fetching from their API const response = await axios.get(config.bmtcApi.stopsEndpoint, { headers: { 'Authorization': `Bearer ${config.bmtcApi.apiKey}` } }); const stops = response.data; // Save raw data for audit trail const timestamp = new Date().toISOString().replace(/:/g, '-'); fs.writeFileSync( path.join(config.dataDirectories.raw, `stops_${timestamp}.json`), JSON.stringify(stops, null, 2) ); // Process and transform stops data const transformedStops = stops.map(stop => ({ stopId: stop.stop_id.toString(), stopName: stop.stop_name, location: { type: 'Point', coordinates: [parseFloat(stop.longitude), parseFloat(stop.latitude)] }, address: stop.address || '', amenities: { hasShelter: Boolean(stop.has_shelter), hasSeating: Boolean(stop.has_seating), hasDisplayBoard: Boolean(stop.has_display), isAccessible: Boolean(stop.is_accessible) }, routes: stop.routes || [] })); // Batch insert into MongoDB // Using bulk operations for better performance const operations = transformedStops.map(stop => ({ updateOne: { filter: { stopId: stop.stopId }, update: { $set: stop }, upsert: true } })); if (operations.length > 0) { const result = await Stop.bulkWrite(operations); logger.info(`Stops import completed: ${result.upsertedCount} inserted, ${result.modifiedCount} updated`); } else { logger.warn('No stops data to import'); } this.stats.recordsProcessed += transformedStops.length; return transformedStops.length; } catch (error) { logger.error('Error importing stops:', error); this.stats.errors++; throw error; } } // Import routes from BMTC data source async importRoutes() { logger.info('Importing routes data'); try { // In a real implementation, this would fetch from BMTC's system const response = await axios.get(config.bmtcApi.routesEndpoint, { headers: { 'Authorization': `Bearer ${config.bmtcApi.apiKey}` } }); const routes = response.data; // Save raw data const timestamp = new Date().toISOString().replace(/:/g, '-'); fs.writeFileSync( path.join(config.dataDirectories.raw, `routes_${timestamp}.json`), JSON.stringify(routes, null, 2) ); // Transform routes data const transformedRoutes = routes.map(route => ({ routeId: route.route_id.toString(), routeName: route.route_name, origin: route.origin, destination: route.destination, distance: parseFloat(route.distance), frequency: parseInt(route.frequency, 10), stops: (route.stops || []).map(stop => ({ stopId: stop.stop_id.toString(), stopName: stop.stop_name, sequenceNumber: parseInt(stop.sequence, 10), distanceFromOrigin: parseFloat(stop.distance), expectedTravelTime: parseInt(stop.travel_time, 10) })), schedule: (route.schedule || []).map(sched => ({ dayType: sched.day_type, trips: (sched.trips || []).map(trip => ({ departureTime: trip.departure_time, arrivalTime: trip.arrival_time, busType: trip.bus_type })) })) })); // Batch insert into MongoDB const operations = transformedRoutes.map(route => ({ updateOne: { filter: { routeId: route.routeId }, update: { $set: route }, upsert: true } })); if (operations.length > 0) { const result = await Route.bulkWrite(operations); logger.info(`Routes import completed: ${result.upsertedCount} inserted, ${result.modifiedCount} updated`); } else { logger.warn('No routes data to import'); } this.stats.recordsProcessed += transformedRoutes.length; return transformedRoutes.length; } catch (error) { logger.error('Error importing routes:', error); this.stats.errors++; throw error; } } // Import buses from BMTC data source async importBuses() { logger.info('Importing buses data'); try { // In a real implementation, this would fetch from BMTC's system const response = await axios.get(config.bmtcApi.busesEndpoint, { headers: { 'Authorization': `Bearer ${config.bmtcApi.apiKey}` } }); const buses = response.data; // Save raw data const timestamp = new Date().toISOString().replace(/:/g, '-'); fs.writeFileSync( path.join(config.dataDirectories.raw, `buses_${timestamp}.json`), JSON.stringify(buses, null, 2) ); // Transform buses data const transformedBuses = buses.map(bus => ({ vehicleId: bus.vehicle_id.toString(), registrationNumber: bus.registration_number, routeId: bus.route_id.toString(), capacity: parseInt(bus.capacity, 10), isActive: Boolean(bus.is_active), lastServiceDate: bus.last_service_date ? new Date(bus.last_service_date) : null, features: { isAC: Boolean(bus.is_ac), isElectric: Boolean(bus.is_electric), hasWifi: Boolean(bus.has_wifi), isWheelchairAccessible: Boolean(bus.is_wheelchair_accessible) } })); // Batch insert into MongoDB const operations = transformedBuses.map(bus => ({ updateOne: { filter: { vehicleId: bus.vehicleId }, update: { $set: bus }, upsert: true } })); if (operations.length > 0) { const result = await Bus.bulkWrite(operations); logger.info(`Buses import completed: ${result.upsertedCount} inserted, ${result.modifiedCount} updated`); } else { logger.warn('No buses data to import'); } this.stats.recordsProcessed += transformedBuses.length; return transformedBuses.length; } catch (error) { logger.error('Error importing buses:', error); this.stats.errors++; throw error; } } // Fetch real-time GPS data from BMTC tracking system async fetchRealtimeGpsData() { logger.info('Fetching real-time GPS data'); try { // In a real implementation, this would fetch from BMTC's tracking system const response = await axios.get(config.bmtcApi.gpsEndpoint, { headers: { 'Authorization': `Bearer ${config.bmtcApi.apiKey}` } }); const gpsData = response.data; // Save raw data (optional - could be a lot of data) const timestamp = new Date().toISOString().replace(/:/g, '-'); fs.writeFileSync( path.join(config.dataDirectories.raw, `gps_${timestamp}.json`), JSON.stringify(gpsData, null, 2) ); // Transform GPS data const transformedGpsData = gpsData.map(data => ({ vehicleId: data.vehicle_id.toString(), routeId: data.route_id.toString(), location: { type: 'Point', coordinates: [parseFloat(data.longitude), parseFloat(data.latitude)] }, speed: parseFloat(data.speed), heading: parseFloat(data.heading), nextStopId: data.next_stop_id ? data.next_stop_id.toString() : null, delay: parseFloat(data.delay || 0), timestamp: new Date(data.timestamp) })); // Insert into MongoDB if (transformedGpsData.length > 0) { await BusLocation.insertMany(transformedGpsData); logger.info(`GPS data import completed: ${transformedGpsData.length} locations inserted`); } else { logger.warn('No GPS data to import'); } this.stats.recordsProcessed += transformedGpsData.length; return transformedGpsData.length; } catch (error) { logger.error('Error fetching GPS data:', error); this.stats.errors++; throw error; } } // Update schedule data from BMTC source async updateScheduleData() { logger.info('Updating schedule data'); try { // This method would update the schedule data by fetching from BMTC's system // For this example, we'll just call the importRoutes method which already includes schedule data const count = await this.importRoutes(); logger.info(`Schedule data update completed for ${count} routes`); return count; } catch (error) { logger.error('Error updating schedule data:', error); this.stats.errors++; throw error; } } // Generate GTFS static feed async generateGtfsStaticFeed() { logger.info('Generating GTFS static feed'); try { const timestamp = new Date().toISOString().replace(/:/g, '-'); const outputDir = path.join(config.dataDirectories.gtfs, `gtfs_${timestamp}`); // Create output directory if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, { recursive: true }); } // Generate agency.txt const agency = [{ agency_id: 'BMTC', agency_name: 'Bangalore Metropolitan Transport Corporation', agency_url: 'https://mybmtc.karnataka.gov.in', agency_timezone: 'Asia/Kolkata', agency_lang: 'en', agency_phone: '1800-425-1663', agency_fare_url: 'https://mybmtc.karnataka.gov.in/fare', }]; this.writeGtfsCsv(outputDir, 'agency.txt', agency); // Generate stops.txt const stops = await Stop.find({}); const stopsGtfs = stops.map(stop => ({ stop_id: stop.stopId, stop_name: stop.stopName, stop_lat: stop.location.coordinates[1], stop_lon: stop.location.coordinates[0], wheelchair_boarding: stop.amenities.isAccessible ? 1 : 0, })); this.writeGtfsCsv(outputDir, 'stops.txt', stopsGtfs); // Generate routes.txt const routes = await Route.find({}); const routesGtfs = routes.map(route => ({ route_id: route.routeId, agency_id: 'BMTC', route_short_name: route.routeId, route_long_name: route.routeName, route_type: 3, // Bus service route_color: '0000FF', route_text_color: 'FFFFFF', })); this.writeGtfsCsv(outputDir, 'routes.txt', routesGtfs); // Generate trips.txt, stop_times.txt, and calendar.txt // These are more complex and would require more detailed schedule data // For this example, we'll just create a simplified version const trips = []; const stopTimes = []; const calendar = []; // Create a simplified calendar (every day service) const defaultCalendar = { service_id: 'everyday', monday: 1, tuesday: 1, wednesday: 1, thursday: 1, friday: 1, saturday: 1, sunday: 1, start_date: '20240101', end_date: '20241231', }; calendar.push(defaultCalendar); // Create trips and stop times for each route routes.forEach(route => { // We'll create one trip per day type for simplicity const dayTypes = ['WEEKDAY', 'SATURDAY', 'SUNDAY']; dayTypes.forEach(dayType => { const scheduleForDayType = route.schedule.find(s => s.dayType === dayType); if (scheduleForDayType && scheduleForDayType.trips.length > 0) { // Use the first trip of the day as an example const firstTrip = scheduleForDayType.trips[0]; const tripId = `${route.routeId}_${dayType}_1`; trips.push({ route_id: route.routeId, service_id: 'everyday', trip_id: tripId, trip_headsign: route.destination, direction_id: 0, shape_id: route.routeId, }); // Create stop times for each stop in the route if (route.stops && route.stops.length > 0) { route.stops.forEach((stop, index) => { // Simplified time calculation based on expectedTravelTime const departureTime = index === route.stops.length - 1 ? firstTrip.arrivalTime : this.addMinutesToTime(firstTrip.departureTime, stop.expectedTravelTime); const arrivalTime = departureTime; // Simplified - arrival and departure are the same stopTimes.push({ trip_id: tripId, arrival_time: arrivalTime, departure_time: departureTime, stop_id: stop.stopId, stop_sequence: stop.sequenceNumber, pickup_type: 0, drop_off_type: 0, }); }); } } }); }); this.writeGtfsCsv(outputDir, 'trips.txt', trips); this.writeGtfsCsv(outputDir, 'stop_times.txt', stopTimes); this.writeGtfsCsv(outputDir, 'calendar.txt', calendar); // Create a zip file of the GTFS feed // In a real implementation, we would use the 'archiver' package to create a zip file // For this example, we'll just log that the feed was generated logger.info(`GTFS static feed generated at ${outputDir}`); return outputDir; } catch (error) { logger.error('Error generating GTFS static feed:', error); this.stats.errors++; throw error; } } // Helper method to write GTFS CSV files writeGtfsCsv(outputDir, filename, data) { const parser = new Parser({ header: true }); const csv = parser.parse(data); fs.writeFileSync(path.join(outputDir, filename), csv); } // Helper method to add minutes to a time string (HH:MM:SS) addMinutesToTime(timeStr, minutes) { const [hours, mins, secs] = timeStr.split(':').map(Number); const date = new Date(); date.setHours(hours, mins, secs); date.setMinutes(date.getMinutes() + minutes); return `${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`; } // Clean up old data (TTL index should handle this, but this is a backup) async cleanupOldData() { logger.info('Cleaning up old data'); try { // Delete bus locations older than 24 hours const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000); const result = await BusLocation.deleteMany({ timestamp: { $lt: oneDayAgo } }); logger.info(`Deleted ${result.deletedCount} old bus locations`); // Clean up old raw data files (keep the last 7 days) this.cleanupOldFiles(config.dataDirectories.raw, 7); // Clean up old GTFS feeds (keep the last 30 days) this.cleanupOldFiles(config.dataDirectories.gtfs, 30); logger.info('Data cleanup completed'); } catch (error) { logger.error('Error cleaning up old data:', error); this.stats.errors++; throw error; } } // Helper method to clean up old files cleanupOldFiles(directory, daysToKeep) { const files = fs.readdirSync(directory); const now = Date.now(); const daysInMs = daysToKeep * 24 * 60 * 60 * 1000; files.forEach(file => { const filePath = path.join(directory, file); const stats = fs.statSync(filePath); if (now - stats.mtimeMs > daysInMs) { fs.unlinkSync(filePath); logger.info(`Deleted old file: ${filePath}`); } }); } // Get service statistics getStats() { return { ...this.stats, isRunning: this.isRunning, uptime: this.isRunning ? (new Date() - this.stats.lastRun) / 1000 : 0, currentTime: new Date() }; } // Stop the service async stop() { if (!this.isRunning) { logger.warn('Data integration service is not running'); return; } logger.info('Stopping BMTC data integration service'); schedule.gracefulShutdown(); this.isRunning = false; logger.info('BMTC data integration service stopped'); } } // Export the service module.exports = BMTCDataIntegrationService;

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ajeetraina/bengaluru-bmtc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server