/**
* Data Service - Real data provider for gRPC handlers
* Conecta con MongoDB/Redis para obtener datos reales del sistema
* Reemplaza los datos mock del dashboard-server.cjs
*/
import { MongoClient } from 'mongodb';
import Redis from 'redis';
import fs from 'fs';
import path from 'path';
class DataService {
constructor() {
this.mongoClient = null;
this.redisClient = null;
this.db = null;
this.isConnected = false;
// Configuration
this.config = this.loadConfiguration();
this.collections = {
conversations: 'conversations',
sessions: 'sessions',
messages: 'messages',
projects: 'projects'
};
// Collection mapping for the actual schema used in the system
this.actualCollections = {
messages: 'messages', // The main collection used by server.js
conversations: 'conversations',
sessions: 'conversation_sessions' // If this exists separately
};
// Cache for performance
this.cache = new Map();
this.cacheExpiry = new Map();
this.defaultCacheTime = 30000; // 30 seconds
}
/**
* Load configuration from environment or config files
*/
loadConfiguration() {
return {
mongodb: {
url: process.env.MONGODB_URI || 'mongodb://localhost:27017/conversations',
database: 'conversations',
options: {
useUnifiedTopology: true,
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,
family: 4
}
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD || null,
db: process.env.REDIS_DB || 0
}
};
}
/**
* Initialize connections to MongoDB and Redis
*/
async initialize() {
try {
console.log('🔗 Initializing data service connections...');
// Connect to MongoDB
await this.connectMongoDB();
// Connect to Redis (optional)
await this.connectRedis();
this.isConnected = true;
console.log('✅ Data service initialized successfully');
return true;
} catch (error) {
console.error('❌ Failed to initialize data service:', error);
return false;
}
}
/**
* Connect to MongoDB
*/
async connectMongoDB() {
try {
console.log(`🍃 Connecting to MongoDB: ${this.config.mongodb.url}`);
this.mongoClient = new MongoClient(this.config.mongodb.url, this.config.mongodb.options);
await this.mongoClient.connect();
// La URL ya incluye la base de datos, extraerla
this.db = this.mongoClient.db(this.config.mongodb.database);
// Test connection
await this.db.admin().ping();
console.log('✅ MongoDB connected successfully');
return true;
} catch (error) {
console.error('❌ MongoDB connection failed:', error);
throw error;
}
}
/**
* Connect to Redis
*/
async connectRedis() {
try {
console.log(`🔴 Connecting to Redis: ${this.config.redis.host}:${this.config.redis.port}`);
this.redisClient = Redis.createClient({
host: this.config.redis.host,
port: this.config.redis.port,
password: this.config.redis.password,
db: this.config.redis.db,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
console.warn('⚠️ Redis server connection refused');
return new Error('Redis server connection refused');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('Redis retry time exhausted');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
});
this.redisClient.on('error', (err) => {
console.warn('⚠️ Redis client error:', err.message);
});
this.redisClient.on('connect', () => {
console.log('✅ Redis connected successfully');
});
await this.redisClient.connect();
return true;
} catch (error) {
console.warn('⚠️ Redis connection failed, continuing without cache:', error.message);
this.redisClient = null;
return false;
}
}
/**
* Get real-time dashboard statistics
*/
async getLiveStats() {
const cacheKey = 'live_stats';
try {
// Check cache first
if (this.isCacheValid(cacheKey)) {
return this.cache.get(cacheKey);
}
// Get actual data from MongoDB
const [
totalMessages,
totalSessions,
activeProjects,
activeSessions,
projectStats
] = await Promise.all([
this.getTotalMessages(),
this.getTotalSessions(),
this.getActiveProjects(),
this.getActiveSessions(),
this.getProjectStats()
]);
const stats = {
total_messages: totalMessages,
total_sessions: totalSessions,
active_projects: activeProjects.length,
active_sessions: activeSessions.length, // REAL count, not hardcoded 1
project_activity: projectStats,
total_cost: projectStats.reduce((sum, p) => sum + (p.cost || 0), 0),
total_tokens: projectStats.reduce((sum, p) => sum + (p.tokens || 0), 0),
messages_last_24h: await this.getMessagesLast24h(),
recent_activity: {
last_messages: await this.getRecentMessages(10),
messages_last_hour: await this.getMessagesLastHour(),
active_sessions: activeSessions.length
}
};
// Cache the results
this.setCache(cacheKey, stats, 10000); // 10 second cache for stats
return stats;
} catch (error) {
console.error('❌ Error getting live stats:', error);
// Return fallback stats if database fails
return this.getFallbackStats();
}
}
/**
* Get active sessions (REAL data from messages collection)
*/
async getActiveSessions() {
try {
const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000);
// Aggregate sessions from messages collection (where the real data is)
const activeSessions = await this.db.collection('messages')
.aggregate([
// Match messages within last 30 minutes
{
$match: {
timestamp: { $gte: thirtyMinutesAgo }
}
},
// Group by session_id to get session stats
{
$group: {
_id: '$session_id',
project_name: { $first: '$project_name' },
message_count: { $sum: 1 },
first_message: { $min: '$timestamp' },
last_activity: { $max: '$timestamp' },
last_message_content: { $last: '$content' },
message_types: { $addToSet: '$message_type' }
}
},
// Sort by last activity
{ $sort: { last_activity: -1 } },
// Limit results
{ $limit: 50 }
])
.toArray();
return activeSessions.map(session => ({
session_id: session._id,
short_id: session._id.substring(0, 8),
project_name: session.project_name || 'unknown',
start_time: session.first_message,
last_activity: session.last_activity,
message_count: session.message_count,
duration_seconds: Math.floor((new Date() - new Date(session.first_message)) / 1000),
is_marked: false, // Would need to check Redis for marking info
current_status: this.calculateSessionStatusFromActivity(session.last_activity),
active_tools: session.message_types.includes('tool_use') ? ['tools'] : [],
last_message: session.last_message_content ? session.last_message_content.substring(0, 100) + '...' : null
}));
} catch (error) {
console.error('❌ Error getting active sessions:', error);
return [];
}
}
/**
* Calculate session status based on activity
*/
calculateSessionStatus(session) {
const now = new Date();
const lastActivity = new Date(session.last_activity);
const minutesSinceActivity = (now - lastActivity) / (1000 * 60);
if (minutesSinceActivity < 5) return 'active';
if (minutesSinceActivity < 30) return 'idle';
return 'ending';
}
/**
* Calculate session status from last activity timestamp
*/
calculateSessionStatusFromActivity(lastActivity) {
const now = new Date();
const lastActivityDate = new Date(lastActivity);
const minutesSinceActivity = (now - lastActivityDate) / (1000 * 60);
if (minutesSinceActivity < 5) return 'active';
if (minutesSinceActivity < 30) return 'idle';
return 'ending';
}
/**
* Get conversation tree with real data
*/
async getConversationTree(filters = {}) {
try {
const { project_filter, limit = 10, hours_back = 24 } = filters;
const timeFilter = new Date(Date.now() - hours_back * 60 * 60 * 1000);
const matchCondition = {
last_activity: { $gte: timeFilter }
};
if (project_filter) {
matchCondition.project_name = project_filter;
}
const projects = await this.db.collection(this.collections.sessions)
.aggregate([
{ $match: matchCondition },
{
$group: {
_id: '$project_name',
total_messages: { $sum: '$message_count' },
session_count: { $sum: 1 },
last_activity: { $max: '$last_activity' },
sessions: { $push: '$$ROOT' }
}
},
{ $sort: { last_activity: -1 } }
])
.toArray();
const result = {
projects: projects.map(project => ({
project_name: project._id,
total_messages: project.total_messages,
message_count: project.total_messages,
last_activity: project.last_activity,
sessions: project.sessions.slice(0, limit).map(session => ({
session_id: session._id || session.session_id,
message_count: session.message_count || 0,
last_message: session.last_activity,
description: session.description || this.generateSessionDescription(session),
status: this.calculateSessionStatus(session),
recent_messages: session.recent_messages || []
}))
})),
total_projects: projects.length,
total_sessions: projects.reduce((sum, p) => sum + p.session_count, 0),
total_messages: projects.reduce((sum, p) => sum + p.total_messages, 0)
};
return result;
} catch (error) {
console.error('❌ Error getting conversation tree:', error);
return { projects: [], total_projects: 0, total_sessions: 0, total_messages: 0 };
}
}
/**
* Generate session description from first message
*/
generateSessionDescription(session) {
if (session.first_message) {
const content = session.first_message.substring(0, 50);
return content.length > 50 ? content + '...' : content;
}
return 'Conversation session';
}
/**
* Get project statistics
*/
async getProjectStats() {
try {
const last24h = new Date(Date.now() - 24 * 60 * 60 * 1000);
const stats = await this.db.collection(this.collections.sessions)
.aggregate([
{ $match: { last_activity: { $gte: last24h } } },
{
$group: {
_id: '$project_name',
messages: { $sum: '$message_count' },
sessions: { $sum: 1 },
cost: { $sum: '$estimated_cost' },
tokens: { $sum: '$total_tokens' },
last_activity: { $max: '$last_activity' }
}
},
{ $sort: { messages: -1 } }
])
.toArray();
return stats.map(stat => ({
name: stat._id || 'unknown',
messages: stat.messages || 0,
sessions: stat.sessions || 0,
cost: Number((stat.cost || 0).toFixed(2)),
tokens: stat.tokens || 0,
last_activity: stat.last_activity
}));
} catch (error) {
console.error('❌ Error getting project stats:', error);
return [];
}
}
/**
* Helper methods for individual metrics
*/
async getTotalMessages() {
try {
// Use estimatedDocumentCount for better performance on large collections
return await this.db.collection('messages').estimatedDocumentCount();
} catch (error) {
console.error('❌ Error getting total messages:', error);
// Fallback to regular count if estimate fails
try {
return await this.db.collection('messages').countDocuments();
} catch (fallbackError) {
console.error('❌ Fallback count also failed:', fallbackError);
return 0;
}
}
}
async getTotalSessions() {
try {
// Count distinct session_ids from messages collection
const distinctSessions = await this.db.collection('messages').distinct('session_id');
return distinctSessions.length;
} catch (error) {
console.error('❌ Error getting total sessions:', error);
return 0;
}
}
async getActiveProjects() {
try {
const last24h = new Date(Date.now() - 24 * 60 * 60 * 1000);
return await this.db.collection('messages')
.distinct('project_name', { timestamp: { $gte: last24h } });
} catch (error) {
console.error('❌ Error getting active projects:', error);
return [];
}
}
async getMessagesLast24h() {
try {
const last24h = new Date(Date.now() - 24 * 60 * 60 * 1000);
return await this.db.collection(this.collections.messages)
.countDocuments({ timestamp: { $gte: last24h } });
} catch (error) {
console.error('❌ Error getting messages last 24h:', error);
return 0;
}
}
async getMessagesLastHour() {
try {
const lastHour = new Date(Date.now() - 60 * 60 * 1000);
return await this.db.collection(this.collections.messages)
.countDocuments({ timestamp: { $gte: lastHour } });
} catch (error) {
console.error('❌ Error getting messages last hour:', error);
return 0;
}
}
async getRecentMessages(limit = 10) {
try {
return await this.db.collection(this.collections.messages)
.find()
.sort({ timestamp: -1 })
.limit(limit)
.toArray();
} catch (error) {
console.error('❌ Error getting recent messages:', error);
return [];
}
}
/**
* Fallback stats when database is unavailable
*/
getFallbackStats() {
console.warn('⚠️ Using fallback stats due to database error');
return {
total_messages: 0,
total_sessions: 0,
active_projects: 0,
active_sessions: 0,
project_activity: [],
total_cost: 0,
total_tokens: 0,
messages_last_24h: 0,
recent_activity: {
last_messages: [],
messages_last_hour: 0,
active_sessions: 0
}
};
}
// === Cache Management ===
setCache(key, data, ttl = this.defaultCacheTime) {
this.cache.set(key, data);
this.cacheExpiry.set(key, Date.now() + ttl);
}
isCacheValid(key) {
if (!this.cache.has(key)) return false;
const expiry = this.cacheExpiry.get(key);
return Date.now() < expiry;
}
clearCache() {
this.cache.clear();
this.cacheExpiry.clear();
}
/**
* Close all connections
*/
async close() {
console.log('🔌 Closing data service connections...');
if (this.mongoClient) {
await this.mongoClient.close();
console.log('✅ MongoDB connection closed');
}
if (this.redisClient) {
await this.redisClient.quit();
console.log('✅ Redis connection closed');
}
this.isConnected = false;
}
/**
* Health check
*/
async healthCheck() {
const health = {
status: 'healthy',
mongodb: false,
redis: false,
timestamp: Date.now()
};
try {
if (this.db) {
await this.db.admin().ping();
health.mongodb = true;
}
} catch (error) {
console.warn('⚠️ MongoDB health check failed:', error.message);
}
try {
if (this.redisClient) {
await this.redisClient.ping();
health.redis = true;
}
} catch (error) {
console.warn('⚠️ Redis health check failed:', error.message);
}
health.status = health.mongodb ? 'healthy' : 'degraded';
return health;
}
}
// Singleton instance
let dataServiceInstance = null;
export function getDataService() {
if (!dataServiceInstance) {
dataServiceInstance = new DataService();
}
return dataServiceInstance;
}
export default DataService;