StreamingManager.js•7.51 kB
export class StreamingManager {
constructor() {
this.activeStreams = new Map();
this.streamCounter = 0;
this.chunkSize = 10; // Number of facts per chunk
this.maxConcurrentStreams = 5;
}
generateStreamId() {
return `stream_${Date.now()}_${++this.streamCounter}`;
}
async createStream(facts, options = {}) {
if (this.activeStreams.size >= this.maxConcurrentStreams) {
throw new Error('Maximum concurrent streams exceeded');
}
const streamId = this.generateStreamId();
const chunkSize = options.chunkSize || this.chunkSize;
const includeProgress = options.includeProgress !== false;
const stream = {
id: streamId,
facts,
totalFacts: facts.length,
chunkSize,
currentIndex: 0,
includeProgress,
startTime: Date.now(),
status: 'active',
metadata: {
query: options.query || '',
type: options.type || 'all',
domain: options.domain || 'all',
}
};
this.activeStreams.set(streamId, stream);
return streamId;
}
async getNextChunk(streamId) {
const stream = this.activeStreams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
if (stream.status !== 'active') {
throw new Error(`Stream ${streamId} is not active`);
}
const startIndex = stream.currentIndex;
const endIndex = Math.min(startIndex + stream.chunkSize, stream.totalFacts);
const chunk = stream.facts.slice(startIndex, endIndex);
stream.currentIndex = endIndex;
const isLastChunk = endIndex >= stream.totalFacts;
if (isLastChunk) {
stream.status = 'completed';
stream.endTime = Date.now();
}
const progress = {
current: endIndex,
total: stream.totalFacts,
percentage: Math.round((endIndex / stream.totalFacts) * 100),
remainingFacts: stream.totalFacts - endIndex,
estimatedTimeRemaining: this.calculateEstimatedTime(stream, endIndex),
};
const chunkData = {
streamId,
chunkIndex: Math.floor(startIndex / stream.chunkSize),
isLastChunk,
facts: chunk.map(fact => this.formatFactForStreaming(fact)),
progress: stream.includeProgress ? progress : undefined,
metadata: stream.metadata,
};
return chunkData;
}
calculateEstimatedTime(stream, currentIndex) {
if (currentIndex === 0) return null;
const elapsedTime = Date.now() - stream.startTime;
const avgTimePerFact = elapsedTime / currentIndex;
const remainingFacts = stream.totalFacts - currentIndex;
return Math.round((remainingFacts * avgTimePerFact) / 1000); // seconds
}
formatFactForStreaming(fact) {
return {
id: fact.id,
content: fact.content,
type: fact.type,
domain: fact.domain,
qualityScore: fact.qualityScore,
tags: fact.tags || [],
timestamp: fact.timestamp,
relevanceScore: fact.relevanceScore || 0,
summary: fact.content.length > 150 ? fact.content.substring(0, 150) + '...' : fact.content,
};
}
async getStreamStatus(streamId) {
const stream = this.activeStreams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
return {
id: streamId,
status: stream.status,
progress: {
current: stream.currentIndex,
total: stream.totalFacts,
percentage: Math.round((stream.currentIndex / stream.totalFacts) * 100),
},
metadata: stream.metadata,
startTime: stream.startTime,
endTime: stream.endTime || null,
duration: stream.endTime ? stream.endTime - stream.startTime : Date.now() - stream.startTime,
};
}
async cancelStream(streamId) {
const stream = this.activeStreams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
stream.status = 'cancelled';
stream.endTime = Date.now();
return {
streamId,
status: 'cancelled',
processed: stream.currentIndex,
total: stream.totalFacts,
};
}
async cleanupCompletedStreams() {
const cutoffTime = Date.now() - (30 * 60 * 1000); // 30 minutes
for (const [streamId, stream] of this.activeStreams.entries()) {
if (
(stream.status === 'completed' || stream.status === 'cancelled') &&
(stream.endTime || stream.startTime) < cutoffTime
) {
this.activeStreams.delete(streamId);
}
}
}
async getAllStreams() {
const streams = [];
for (const [streamId, stream] of this.activeStreams.entries()) {
streams.push(await this.getStreamStatus(streamId));
}
return streams;
}
async pauseStream(streamId) {
const stream = this.activeStreams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
if (stream.status !== 'active') {
throw new Error(`Stream ${streamId} is not active`);
}
stream.status = 'paused';
stream.pausedAt = Date.now();
return {
streamId,
status: 'paused',
progress: {
current: stream.currentIndex,
total: stream.totalFacts,
},
};
}
async resumeStream(streamId) {
const stream = this.activeStreams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
if (stream.status !== 'paused') {
throw new Error(`Stream ${streamId} is not paused`);
}
stream.status = 'active';
if (stream.pausedAt) {
// Adjust start time to account for pause duration
const pauseDuration = Date.now() - stream.pausedAt;
stream.startTime += pauseDuration;
delete stream.pausedAt;
}
return {
streamId,
status: 'active',
progress: {
current: stream.currentIndex,
total: stream.totalFacts,
},
};
}
// Batch streaming for very large queries
async createBatchStream(queryParams, factStore, options = {}) {
const batchSize = options.batchSize || 100;
const maxResults = options.maxResults || 1000;
// Execute query in batches
let allFacts = [];
let offset = 0;
let hasMore = true;
while (hasMore && allFacts.length < maxResults) {
const batchParams = {
...queryParams,
limit: Math.min(batchSize, maxResults - allFacts.length),
offset,
};
const result = await factStore.queryFacts(batchParams);
const facts = result.facts || [];
if (facts.length === 0) {
hasMore = false;
} else {
allFacts = allFacts.concat(facts);
offset += facts.length;
if (facts.length < batchSize) {
hasMore = false;
}
}
}
return await this.createStream(allFacts, {
...options,
query: queryParams.query || '',
type: queryParams.type || 'all',
domain: queryParams.domain || 'all',
});
}
getStats() {
const streams = Array.from(this.activeStreams.values());
return {
totalStreams: streams.length,
activeStreams: streams.filter(s => s.status === 'active').length,
pausedStreams: streams.filter(s => s.status === 'paused').length,
completedStreams: streams.filter(s => s.status === 'completed').length,
cancelledStreams: streams.filter(s => s.status === 'cancelled').length,
totalFactsStreaming: streams.reduce((sum, s) => sum + s.totalFacts, 0),
totalFactsProcessed: streams.reduce((sum, s) => sum + s.currentIndex, 0),
};
}
}