/**
* WebSocket Bridge Server - gRPC to WebSocket Bridge
* Permite que el dashboard en navegador se conecte a gRPC usando WebSocket
* ActΓΊa como proxy entre WebSocket (cliente) y gRPC (servidor backend)
*/
import WebSocket, { WebSocketServer } from 'ws';
import { getOptimizedGrpcServer } from './grpc-server.js';
import { getDataService } from './data-service.js';
class WebSocketGrpcBridge {
constructor() {
this.wss = null;
this.grpcServer = null;
this.dataService = null;
this.clients = new Map();
this.port = process.env.WS_BRIDGE_PORT || 8080;
// Client management
this.clientIdCounter = 0;
}
/**
* Initialize WebSocket server and gRPC bridge
*/
async initialize() {
try {
console.log('π Initializing WebSocket-gRPC Bridge...');
// Initialize data service
this.dataService = getDataService();
await this.dataService.initialize();
// Get gRPC server instance
this.grpcServer = getOptimizedGrpcServer();
// Create WebSocket server
this.wss = new WebSocketServer({
port: this.port,
path: '/grpc-ws'
});
// Setup WebSocket event handlers
this.setupWebSocketHandlers();
console.log(`β
WebSocket-gRPC Bridge running on port ${this.port}`);
return true;
} catch (error) {
console.error('β Failed to initialize WebSocket-gRPC Bridge:', error);
return false;
}
}
/**
* Setup WebSocket server event handlers
*/
setupWebSocketHandlers() {
this.wss.on('connection', (ws, request) => {
const clientId = this.generateClientId();
console.log(`π WebSocket client connected: ${clientId}`);
// Register client
this.registerClient(clientId, ws);
// Handle messages from client
ws.on('message', (data) => {
this.handleClientMessage(clientId, data);
});
// Handle client disconnect
ws.on('close', () => {
console.log(`π WebSocket client disconnected: ${clientId}`);
this.unregisterClient(clientId);
});
// Handle errors
ws.on('error', (error) => {
console.error(`β WebSocket error for client ${clientId}:`, error);
this.unregisterClient(clientId);
});
// Send welcome message
this.sendToClient(clientId, {
type: 'connection',
status: 'connected',
clientId: clientId,
timestamp: Date.now()
});
});
this.wss.on('error', (error) => {
console.error('β WebSocket server error:', error);
});
}
/**
* Generate unique client ID
*/
generateClientId() {
return `ws_${++this.clientIdCounter}_${Date.now().toString(36)}`;
}
/**
* Register WebSocket client
*/
registerClient(clientId, ws) {
const clientInfo = {
id: clientId,
ws: ws,
connectedAt: Date.now(),
lastActivity: Date.now(),
subscriptions: new Set(),
streams: new Map()
};
this.clients.set(clientId, clientInfo);
console.log(`π‘ Client registered: ${clientId} (${this.clients.size} total)`);
}
/**
* Unregister WebSocket client
*/
unregisterClient(clientId) {
const clientInfo = this.clients.get(clientId);
if (clientInfo) {
// Clean up any active streams
for (const [streamName, streamData] of clientInfo.streams) {
this.stopClientStream(clientId, streamName);
}
this.clients.delete(clientId);
console.log(`π‘ Client unregistered: ${clientId} (${this.clients.size} remaining)`);
}
}
/**
* Handle message from WebSocket client
*/
async handleClientMessage(clientId, data) {
try {
const message = JSON.parse(data.toString());
const clientInfo = this.clients.get(clientId);
if (!clientInfo) {
console.warn(`β οΈ Message from unknown client: ${clientId}`);
return;
}
clientInfo.lastActivity = Date.now();
console.log(`π¨ Message from ${clientId}:`, message.type);
switch (message.type) {
case 'handshake':
await this.handleHandshake(clientId, message);
break;
case 'subscribe':
await this.handleSubscription(clientId, message);
break;
case 'get_conversation_tree':
await this.handleGetConversationTree(clientId, message);
break;
case 'search_conversations':
await this.handleSearchConversations(clientId, message);
break;
case 'heartbeat':
await this.handleHeartbeat(clientId, message);
break;
default:
console.warn(`β οΈ Unknown message type: ${message.type} from ${clientId}`);
}
} catch (error) {
console.error(`β Error handling client message:`, error);
this.sendErrorToClient(clientId, error.message);
}
}
/**
* Handle client handshake
*/
async handleHandshake(clientId, message) {
console.log(`π€ Handshake from client ${clientId}`);
this.sendToClient(clientId, {
type: 'handshake_ack',
clientId: clientId,
serverTime: Date.now(),
capabilities: ['live_stats', 'active_sessions', 'conversations']
});
}
/**
* Handle client subscription to streams
*/
async handleSubscription(clientId, message) {
const { stream, filters } = message;
console.log(`π‘ Client ${clientId} subscribing to stream: ${stream}`);
const clientInfo = this.clients.get(clientId);
if (!clientInfo) return;
clientInfo.subscriptions.add(stream);
switch (stream) {
case 'live_stats':
await this.startLiveStatsStream(clientId);
break;
case 'active_sessions':
await this.startActiveSessionsStream(clientId);
break;
case 'conversations':
await this.startConversationsStream(clientId, filters);
break;
default:
console.warn(`β οΈ Unknown stream type: ${stream}`);
}
}
/**
* Handle get conversation tree request
*/
async handleGetConversationTree(clientId, message) {
try {
const { requestId, filters } = message;
console.log(`π³ Getting conversation tree for client ${clientId}`);
const treeData = await this.dataService.getConversationTree(filters);
this.sendToClient(clientId, {
type: 'conversation_tree_response',
requestId: requestId,
payload: treeData,
timestamp: Date.now()
});
} catch (error) {
console.error('β Error getting conversation tree:', error);
this.sendErrorToClient(clientId, 'Failed to get conversation tree');
}
}
/**
* Handle search conversations request
*/
async handleSearchConversations(clientId, message) {
try {
const { requestId, query, filters } = message;
console.log(`π Searching conversations for client ${clientId}: "${query}"`);
// For now, return empty results (would implement actual search)
const searchResults = {
results: [],
total_count: 0,
query: query
};
this.sendToClient(clientId, {
type: 'search_response',
requestId: requestId,
payload: searchResults,
timestamp: Date.now()
});
} catch (error) {
console.error('β Error searching conversations:', error);
this.sendErrorToClient(clientId, 'Failed to search conversations');
}
}
/**
* Handle heartbeat from client
*/
async handleHeartbeat(clientId, message) {
this.sendToClient(clientId, {
type: 'heartbeat',
timestamp: Date.now(),
clientId: clientId
});
}
/**
* Start live stats stream for client
*/
async startLiveStatsStream(clientId) {
console.log(`π Starting live stats stream for client ${clientId}`);
const clientInfo = this.clients.get(clientId);
if (!clientInfo) return;
// Send initial stats
await this.sendLiveStatsToClient(clientId);
// Setup periodic updates
const interval = setInterval(async () => {
if (this.clients.has(clientId)) {
await this.sendLiveStatsToClient(clientId);
} else {
clearInterval(interval);
}
}, 5000); // Every 5 seconds
clientInfo.streams.set('live_stats', { interval });
}
/**
* Start active sessions stream for client
*/
async startActiveSessionsStream(clientId) {
console.log(`π₯ Starting active sessions stream for client ${clientId}`);
const clientInfo = this.clients.get(clientId);
if (!clientInfo) return;
// Send initial active sessions
await this.sendActiveSessionsToClient(clientId);
// Setup periodic updates
const interval = setInterval(async () => {
if (this.clients.has(clientId)) {
await this.sendActiveSessionsToClient(clientId);
} else {
clearInterval(interval);
}
}, 10000); // Every 10 seconds
clientInfo.streams.set('active_sessions', { interval });
}
/**
* Start conversations stream for client
*/
async startConversationsStream(clientId, filters) {
console.log(`π¬ Starting conversations stream for client ${clientId}`);
const clientInfo = this.clients.get(clientId);
if (!clientInfo) return;
// Send initial conversations
await this.sendConversationsToClient(clientId, filters);
// Setup periodic updates
const interval = setInterval(async () => {
if (this.clients.has(clientId)) {
await this.sendConversationsToClient(clientId, filters);
} else {
clearInterval(interval);
}
}, 15000); // Every 15 seconds
clientInfo.streams.set('conversations', { interval, filters });
}
/**
* Send live stats to specific client
*/
async sendLiveStatsToClient(clientId) {
try {
const stats = await this.dataService.getLiveStats();
this.sendToClient(clientId, {
type: 'live_stats',
payload: stats,
timestamp: Date.now()
});
} catch (error) {
console.error(`β Error sending live stats to client ${clientId}:`, error);
}
}
/**
* Send active sessions to specific client
*/
async sendActiveSessionsToClient(clientId) {
try {
const activeSessions = await this.dataService.getActiveSessions();
this.sendToClient(clientId, {
type: 'active_sessions',
payload: { sessions: activeSessions, count: activeSessions.length },
timestamp: Date.now()
});
} catch (error) {
console.error(`β Error sending active sessions to client ${clientId}:`, error);
}
}
/**
* Send conversations to specific client
*/
async sendConversationsToClient(clientId, filters = {}) {
try {
const conversations = await this.dataService.getConversationTree(filters);
this.sendToClient(clientId, {
type: 'conversations_update',
payload: conversations,
timestamp: Date.now()
});
} catch (error) {
console.error(`β Error sending conversations to client ${clientId}:`, error);
}
}
/**
* Send message to specific client
*/
sendToClient(clientId, message) {
const clientInfo = this.clients.get(clientId);
if (clientInfo && clientInfo.ws.readyState === WebSocket.OPEN) {
clientInfo.ws.send(JSON.stringify(message));
clientInfo.lastActivity = Date.now();
}
}
/**
* Send error message to client
*/
sendErrorToClient(clientId, errorMessage) {
this.sendToClient(clientId, {
type: 'error',
message: errorMessage,
timestamp: Date.now()
});
}
/**
* Broadcast message to all connected clients
*/
broadcast(message, filter = null) {
let successCount = 0;
let errorCount = 0;
for (const [clientId, clientInfo] of this.clients) {
try {
if (filter && !filter(clientInfo)) {
continue;
}
this.sendToClient(clientId, message);
successCount++;
} catch (error) {
console.error(`β Error broadcasting to client ${clientId}:`, error);
this.unregisterClient(clientId);
errorCount++;
}
}
if (successCount > 0 || errorCount > 0) {
console.log(`π‘ Broadcast completed: ${successCount} success, ${errorCount} errors`);
}
return { successCount, errorCount };
}
/**
* Stop specific stream for client
*/
stopClientStream(clientId, streamName) {
const clientInfo = this.clients.get(clientId);
if (clientInfo && clientInfo.streams.has(streamName)) {
const streamData = clientInfo.streams.get(streamName);
if (streamData.interval) {
clearInterval(streamData.interval);
}
clientInfo.streams.delete(streamName);
console.log(`π Stopped ${streamName} stream for client ${clientId}`);
}
}
/**
* Get bridge statistics
*/
getStats() {
return {
connectedClients: this.clients.size,
clients: Array.from(this.clients.values()).map(client => ({
id: client.id,
connectedAt: client.connectedAt,
lastActivity: client.lastActivity,
subscriptions: Array.from(client.subscriptions),
activeStreams: Array.from(client.streams.keys())
})),
uptime: Date.now() - this.startTime,
port: this.port
};
}
/**
* Shutdown bridge server
*/
async shutdown() {
console.log('π Shutting down WebSocket-gRPC Bridge...');
// Notify all clients of shutdown
this.broadcast({
type: 'server_shutdown',
message: 'Bridge server is shutting down',
timestamp: Date.now()
});
// Close all client connections
for (const [clientId, clientInfo] of this.clients) {
this.unregisterClient(clientId);
}
// Close WebSocket server
if (this.wss) {
this.wss.close();
}
// Close data service
if (this.dataService) {
await this.dataService.close();
}
console.log('β
WebSocket-gRPC Bridge shut down successfully');
}
}
// Create and export singleton instance
let bridgeInstance = null;
export function getWebSocketGrpcBridge() {
if (!bridgeInstance) {
bridgeInstance = new WebSocketGrpcBridge();
}
return bridgeInstance;
}
export default WebSocketGrpcBridge;