import { getOptimizedGrpcServer } from './grpc-server.js';
import { getDataService } from './data-service.js';
import os from 'os';
// Referencias externas que se inyectarán desde server.js
let messagesCollection = null;
let redisClient = null;
// Data service instance for real data
const dataService = getDataService();
// Active streams management
const activeStreams = new Map();
const activeSessionStreams = new Set();
const metricsStreams = new Set();
const updateStreams = new Map();
// Inyectar dependencias desde el servidor principal
export function injectDependencies(mongoCollection, redis) {
messagesCollection = mongoCollection;
redisClient = redis;
}
// Handler para streaming de mensajes en tiempo real
export function handleStreamMessages(call) {
const grpcServer = getOptimizedGrpcServer();
grpcServer.registerClient(call);
// Enviar evento de conexión inicial
call.write({
type: 'connection',
message: null,
timestamp: Date.now()
});
// El streaming continuo se maneja desde el broadcast del servidor
}
// Handler para obtener el árbol de conversaciones - USANDO DATOS REALES
export async function handleGetConversationTree(call, callback) {
try {
console.log('🌳 Getting REAL conversation tree from DataService...');
const { project_filter, limit = 50, hours_back = 24 } = call.request;
// Initialize data service if needed
if (!dataService.isConnected) {
console.log('🔗 Initializing DataService for conversation tree...');
const initialized = await dataService.initialize();
if (!initialized) {
console.error('❌ Failed to initialize DataService');
return callback(new Error('DataService not available'));
}
}
// Get REAL conversation tree from database
const filters = {
project_filter,
limit,
hours_back
};
const realTreeData = await dataService.getConversationTree(filters);
console.log(`✅ Retrieved REAL conversation tree: ${realTreeData.total_projects} projects, ${realTreeData.total_sessions} sessions, ${realTreeData.total_messages} messages`);
// Format for gRPC protocol
const formattedProjects = realTreeData.projects.map(project => ({
name: project.project_name,
message_count: project.total_messages,
sessions: project.sessions.map(session => ({
session_id: session.session_id,
short_id: session.session_id.substring(0, 8),
message_count: session.message_count,
start_time: new Date(session.last_message).getTime() - (session.message_count * 60000), // Estimate start time
last_activity: new Date(session.last_message).getTime(),
is_active: session.status === 'active',
is_marked: false, // TODO: Implement markers
recent_messages: session.recent_messages || []
})),
last_activity: new Date(project.last_activity).getTime()
}));
const response = {
projects: formattedProjects,
total_messages: realTreeData.total_messages,
total_sessions: realTreeData.total_sessions
};
callback(null, response);
} catch (error) {
console.error('❌ Error in GetConversationTree with REAL data:', error);
callback(error);
}
}
// Handler para búsqueda de conversaciones
export async function handleSearchConversations(call, callback) {
try {
const {
query,
project_filter,
session_filter,
message_type_filter,
start_date,
end_date,
only_marked,
limit = 50,
offset = 0
} = call.request;
if (!messagesCollection) {
return callback(new Error('Database not available'));
}
// Construir query de búsqueda
const searchQuery = {};
// Búsqueda de texto
if (query) {
searchQuery.$text = { $search: query };
}
// Filtros
if (project_filter) {
searchQuery.project_name = project_filter;
}
if (session_filter) {
searchQuery.session_id = session_filter;
}
if (message_type_filter) {
searchQuery.message_type = message_type_filter;
}
// Filtro de fechas
if (start_date || end_date) {
searchQuery.timestamp = {};
if (start_date) {
searchQuery.timestamp.$gte = new Date(parseInt(start_date));
}
if (end_date) {
searchQuery.timestamp.$lte = new Date(parseInt(end_date));
}
}
// Contar total
const totalCount = await messagesCollection.countDocuments(searchQuery);
// Obtener resultados paginados
const messages = await messagesCollection
.find(searchQuery)
.sort({ timestamp: -1 })
.skip(offset)
.limit(limit)
.toArray();
// Formatear resultados
const results = messages.map(msg => ({
message: formatMessage(msg),
highlights: query ? extractHighlights(msg.content, query) : [],
relevance_score: 1.0, // TODO: Implementar scoring real
context: {
session_id: msg.session_id,
project_name: msg.project_name,
total_messages: 0, // TODO: Contar mensajes de la sesión
session_start: msg.timestamp.getTime(),
is_marked: false // TODO: Implementar marcadores
}
}));
callback(null, {
results,
total_count: totalCount,
returned_count: results.length,
has_more: offset + results.length < totalCount
});
} catch (error) {
console.error('❌ Error in SearchConversations:', error);
callback(error);
}
}
// Handler para marcar conversaciones como importantes
export async function handleMarkImportant(call, callback) {
try {
const { session_id, is_marked, note, tags } = call.request;
// TODO: Implementar sistema de marcadores
// Por ahora, guardamos en Redis como ejemplo
if (redisClient && redisClient.isOpen) {
const key = `marked:${session_id}`;
if (is_marked) {
await redisClient.setEx(key, 86400 * 30, JSON.stringify({ // 30 días
marked: true,
note: note || '',
tags: tags || [],
marked_at: Date.now()
}));
} else {
await redisClient.del(key);
}
}
callback(null, {
success: true,
message: is_marked ? 'Conversación marcada como importante' : 'Marca removida'
});
} catch (error) {
console.error('❌ Error in MarkImportant:', error);
callback(error);
}
}
// Handler para exportar conversaciones
export async function handleExportConversation(call, callback) {
try {
const { session_id, format = 'json', include_metadata = true } = call.request;
if (!messagesCollection) {
return callback(new Error('Database not available'));
}
// Obtener todos los mensajes de la sesión
const messages = await messagesCollection
.find({ session_id })
.sort({ timestamp: 1 })
.toArray();
if (messages.length === 0) {
return callback(new Error('Session not found'));
}
let content, filename, mimeType;
switch (format) {
case 'json':
content = JSON.stringify(messages, null, 2);
filename = `conversation_${session_id.substring(0, 8)}.json`;
mimeType = 'application/json';
break;
case 'markdown':
content = formatAsMarkdown(messages, include_metadata);
filename = `conversation_${session_id.substring(0, 8)}.md`;
mimeType = 'text/markdown';
break;
case 'txt':
content = formatAsText(messages, include_metadata);
filename = `conversation_${session_id.substring(0, 8)}.txt`;
mimeType = 'text/plain';
break;
default:
return callback(new Error('Unsupported format'));
}
callback(null, {
content,
filename,
mime_type: mimeType,
message_count: messages.length
});
} catch (error) {
console.error('❌ Error in ExportConversation:', error);
callback(error);
}
}
// Handler para estadísticas en tiempo real - USANDO DATOS REALES
export function handleGetLiveStats(call) {
console.log('📊 Starting real-time stats stream with REAL data');
// Enviar estadísticas iniciales con datos reales
sendRealLiveStats(call);
// Configurar actualizaciones periódicas con datos reales
const interval = setInterval(async () => {
await sendRealLiveStats(call);
}, 5000); // Cada 5 segundos
call.on('cancelled', () => {
console.log('📊 Live stats stream cancelled');
clearInterval(interval);
});
call.on('error', (error) => {
console.error('❌ Live stats stream error:', error);
clearInterval(interval);
});
}
// Funciones auxiliares
function formatMessage(msg) {
return {
id: msg._id?.toString() || msg.id,
session_id: msg.session_id,
project_name: msg.project_name,
message_type: msg.message_type,
content: msg.content,
hook_event: msg.hook_event,
timestamp: msg.timestamp.getTime ? msg.timestamp.getTime() : msg.timestamp,
metadata: msg.metadata ? {
source: msg.metadata.source || '',
model: msg.metadata.model || '',
usage: msg.metadata.usage ? {
input_tokens: msg.metadata.usage.input_tokens || 0,
output_tokens: msg.metadata.usage.output_tokens || 0,
cache_creation_input_tokens: msg.metadata.usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: msg.metadata.usage.cache_read_input_tokens || 0,
service_tier: msg.metadata.usage.service_tier || ''
} : null,
cost_usd: msg.metadata.cost_usd || 0,
duration_ms: msg.metadata.duration_ms || 0
} : null
};
}
function extractHighlights(content, query) {
if (!content || !query) return [];
const words = query.toLowerCase().split(' ').filter(word => word.length > 2);
const highlights = [];
words.forEach(word => {
const regex = new RegExp(`(.{0,50})${word}(.{0,50})`, 'gi');
const matches = content.match(regex);
if (matches) {
highlights.push(...matches.slice(0, 3)); // Max 3 highlights
}
});
return highlights;
}
function formatAsMarkdown(messages, includeMetadata) {
const project = messages[0]?.project_name || 'Unknown';
const sessionId = messages[0]?.session_id || 'Unknown';
const startTime = new Date(messages[0]?.timestamp || Date.now()).toLocaleString();
let content = `# Conversación: ${project}\n\n`;
content += `**Session ID**: \`${sessionId}\`\n`;
content += `**Iniciada**: ${startTime}\n`;
content += `**Mensajes**: ${messages.length}\n\n---\n\n`;
messages.forEach((msg, index) => {
const time = new Date(msg.timestamp).toLocaleString();
const icon = msg.message_type === 'user' ? '👤' :
msg.message_type === 'assistant' ? '🤖' : '🔧';
content += `## ${icon} ${msg.message_type.charAt(0).toUpperCase() + msg.message_type.slice(1)}\n`;
content += `*${time}*\n\n`;
content += `${msg.content}\n\n`;
if (includeMetadata && msg.metadata) {
content += `<details>\n<summary>Metadata</summary>\n\n`;
content += `\`\`\`json\n${JSON.stringify(msg.metadata, null, 2)}\n\`\`\`\n\n</details>\n\n`;
}
if (index < messages.length - 1) {
content += '---\n\n';
}
});
return content;
}
function formatAsText(messages, includeMetadata) {
const project = messages[0]?.project_name || 'Unknown';
const sessionId = messages[0]?.session_id || 'Unknown';
const startTime = new Date(messages[0]?.timestamp || Date.now()).toLocaleString();
let content = `CONVERSACIÓN: ${project}\n`;
content += `Session ID: ${sessionId}\n`;
content += `Iniciada: ${startTime}\n`;
content += `Mensajes: ${messages.length}\n`;
content += '='.repeat(60) + '\n\n';
messages.forEach((msg, index) => {
const time = new Date(msg.timestamp).toLocaleString();
content += `[${time}] ${msg.message_type.toUpperCase()}: ${msg.content}\n`;
if (includeMetadata && msg.metadata) {
content += `METADATA: ${JSON.stringify(msg.metadata, null, 2)}\n`;
}
content += '\n' + '-'.repeat(40) + '\n\n';
});
return content;
}
// NUEVA FUNCIÓN: Enviar estadísticas REALES desde DataService
async function sendRealLiveStats(call) {
try {
console.log('📊 Sending REAL live stats from DataService...');
// Initialize data service if not connected
if (!dataService.isConnected) {
console.log('🔗 Initializing DataService for real data...');
const initialized = await dataService.initialize();
if (!initialized) {
console.error('❌ Failed to initialize DataService, using fallback');
call.write(createFallbackStats());
return;
}
}
// Get REAL statistics from database
const realStats = await dataService.getLiveStats();
const statsEvent = {
type: 'live_update',
stats: {
total_messages: realStats.total_messages,
total_sessions: realStats.total_sessions,
active_projects: realStats.active_projects,
total_cost_usd: realStats.total_cost,
total_tokens: realStats.total_tokens,
project_stats: realStats.project_activity.map(project => ({
name: project.name,
message_count: project.messages,
session_count: project.sessions,
last_activity: project.last_activity
})),
recent_activity: {
last_messages: realStats.recent_activity.last_messages.slice(0, 10),
messages_last_hour: realStats.recent_activity.messages_last_hour,
active_sessions: realStats.active_sessions // REAL count, not hardcoded 1!
}
},
timestamp: Date.now()
};
console.log(`✅ Sending REAL stats: ${realStats.active_sessions} active sessions, ${realStats.total_messages} total messages`);
call.write(statsEvent);
} catch (error) {
console.error('❌ Error sending REAL live stats:', error);
// Send fallback on error
call.write(createFallbackStats());
}
}
// Fallback stats when DataService is unavailable
function createFallbackStats() {
console.warn('⚠️ Using fallback stats - DataService unavailable');
return {
type: 'live_update',
stats: {
total_messages: 0,
total_sessions: 0,
active_projects: 0,
total_cost_usd: 0,
total_tokens: 0,
project_stats: [],
recent_activity: {
last_messages: [],
messages_last_hour: 0,
active_sessions: 0
}
},
timestamp: Date.now()
};
}
// Función para broadcast de nuevos mensajes (llamada desde server.js)
export function broadcastNewMessage(message) {
const grpcServer = getOptimizedGrpcServer();
const event = {
type: 'new_message',
message: formatMessage(message),
timestamp: Date.now()
};
grpcServer.broadcast(event);
}
export function broadcastStatsUpdate() {
const grpcServer = getOptimizedGrpcServer();
// Se podría implementar broadcast de estadísticas aquí
}
// === NUEVOS HANDLERS PARA REEMPLAZAR WEBSOCKET ===
/**
* Handler para streaming de sesiones activas en tiempo real
*/
export function handleStreamActiveSessions(call) {
console.log('📡 New client connected to active sessions stream');
activeSessionStreams.add(call);
// Send initial active sessions
sendCurrentActiveSessions(call);
// Handle client disconnect
call.on('cancelled', () => {
console.log('📡 Client disconnected from active sessions stream');
activeSessionStreams.delete(call);
});
call.on('error', (error) => {
console.error('❌ Active sessions stream error:', error);
activeSessionStreams.delete(call);
});
}
/**
* Handler para suscripciones a actualizaciones específicas
*/
export function handleSubscribeToUpdates(call) {
console.log('📡 New client subscribed to updates');
const { type, project_filter, session_ids, filters } = call.request;
// Store subscription details
const subscriptionId = generateSubscriptionId();
updateStreams.set(subscriptionId, {
call,
type,
project_filter,
session_ids: session_ids || [],
filters: filters || {}
});
// Send confirmation
call.write({
type: 'subscription_confirmed',
session_id: subscriptionId,
project_name: project_filter || 'all',
message: null,
session_metadata: null,
timestamp: Date.now()
});
// Handle client disconnect
call.on('cancelled', () => {
console.log('📡 Client unsubscribed from updates');
updateStreams.delete(subscriptionId);
});
call.on('error', (error) => {
console.error('❌ Updates stream error:', error);
updateStreams.delete(subscriptionId);
});
}
/**
* Handler para broadcast de métricas del sistema
*/
export function handleBroadcastMetrics(call) {
console.log('📊 New client connected to metrics stream');
metricsStreams.add(call);
// Send initial metrics
sendCurrentMetrics(call);
// Start periodic metrics updates
const metricsInterval = setInterval(() => {
if (metricsStreams.has(call)) {
sendCurrentMetrics(call);
}
}, 5000); // Every 5 seconds
// Handle client disconnect
call.on('cancelled', () => {
console.log('📊 Client disconnected from metrics stream');
metricsStreams.delete(call);
clearInterval(metricsInterval);
});
call.on('error', (error) => {
console.error('❌ Metrics stream error:', error);
metricsStreams.delete(call);
clearInterval(metricsInterval);
});
}
/**
* Send current REAL active sessions to client using DataService
*/
async function sendCurrentActiveSessions(call) {
try {
console.log('👥 Sending REAL active sessions from DataService...');
// Initialize data service if needed
if (!dataService.isConnected) {
console.log('🔗 Initializing DataService for active sessions...');
const initialized = await dataService.initialize();
if (!initialized) {
console.error('❌ Failed to initialize DataService for active sessions');
return;
}
}
// Get REAL active sessions from database
const realActiveSessions = await dataService.getActiveSessions();
console.log(`✅ Found ${realActiveSessions.length} REAL active sessions`);
for (const session of realActiveSessions) {
const activeSessionUpdate = {
type: 'session_started',
session: {
session_id: session.session_id,
short_id: session.short_id,
project_name: session.project_name,
start_time: session.start_time,
last_activity: session.last_activity,
message_count: session.message_count,
duration_seconds: session.duration_seconds,
is_marked: session.is_marked,
current_status: session.current_status,
active_tools: session.active_tools,
last_message: session.last_message
},
timestamp: Date.now()
};
call.write(activeSessionUpdate);
}
} catch (error) {
console.error('❌ Error sending REAL active sessions:', error);
}
}
/**
* Send current system metrics to client
*/
function sendCurrentMetrics(call) {
try {
const cpus = os.cpus();
const totalMem = os.totalmem();
const freeMem = os.freemem();
const uptime = os.uptime();
const metrics = {
type: 'system',
system: {
cpu_usage: getCpuUsage(), // Simplified - would need proper CPU monitoring
memory_usage: ((totalMem - freeMem) / totalMem) * 100,
active_connections: activeStreams.size + activeSessionStreams.size + metricsStreams.size,
uptime_seconds: uptime
},
performance: {
avg_response_time: 0, // TODO: Implement response time tracking
requests_per_second: 0, // TODO: Implement request rate tracking
error_rate: 0, // TODO: Implement error rate tracking
cache_hit_rate: redisClient ? 85.0 : 0 // Placeholder
},
database: {
total_documents: 0, // TODO: Get from MongoDB
documents_per_second: 0, // TODO: Implement tracking
avg_query_time: 0, // TODO: Implement query time tracking
active_connections: 1 // Simplified
},
cache: {
total_keys: 0, // TODO: Get from Redis
hit_rate: 85.0, // Placeholder
memory_usage_bytes: 0, // TODO: Get from Redis
ops_per_second: 0 // TODO: Implement tracking
},
timestamp: Date.now()
};
call.write(metrics);
} catch (error) {
console.error('❌ Error sending metrics:', error);
}
}
/**
* Simplified CPU usage calculation
*/
function getCpuUsage() {
const cpus = os.cpus();
let idle = 0;
let total = 0;
cpus.forEach(cpu => {
for (let type in cpu.times) {
total += cpu.times[type];
}
idle += cpu.times.idle;
});
return ((total - idle) / total) * 100;
}
/**
* Generate unique subscription ID
*/
function generateSubscriptionId() {
return `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Broadcast active session update to all clients
*/
export function broadcastActiveSessionUpdate(type, session) {
const update = {
type,
session: session,
timestamp: Date.now()
};
activeSessionStreams.forEach(call => {
try {
call.write(update);
} catch (error) {
console.error('❌ Error broadcasting session update:', error);
activeSessionStreams.delete(call);
}
});
}
/**
* Broadcast conversation update to subscribed clients
*/
export function broadcastConversationUpdate(type, sessionId, projectName, message = null, metadata = null) {
const update = {
type,
session_id: sessionId,
project_name: projectName,
message: message ? formatMessage(message) : null,
session_metadata: metadata,
timestamp: Date.now()
};
// Send to relevant subscribers
updateStreams.forEach((subscription, subscriptionId) => {
const { call, project_filter, session_ids, filters } = subscription;
// Check if update matches subscription filters
let shouldSend = true;
if (project_filter && project_filter !== projectName) {
shouldSend = false;
}
if (session_ids.length > 0 && !session_ids.includes(sessionId)) {
shouldSend = false;
}
if (shouldSend) {
try {
call.write(update);
} catch (error) {
console.error('❌ Error broadcasting conversation update:', error);
updateStreams.delete(subscriptionId);
}
}
});
}