/**
* Learn Service - Main orchestration layer for LearnMCP
* Coordinates all learning content operations
*/
import fs from 'fs/promises';
import path from 'path';
import { LearningSourceRegistry } from './learning-source-registry.js';
import { ContentExtractorFactory } from './content-extractors/index.js';
import { BackgroundProcessor } from './background-processor.js';
import { ContentSummarizer } from './content-summarizer.js';
import { createLearnLogger } from './utils/custom-logger.js';
export class LearnService {
constructor(dataDir, logger = null) {
this.dataDir = dataDir;
this.learnContentDir = path.join(dataDir, 'learn-content');
this.logger = logger || createLearnLogger('LearnService');
// Initialize components
this.sourceRegistry = new LearningSourceRegistry(dataDir);
this.extractorFactory = new ContentExtractorFactory();
this.backgroundProcessor = new BackgroundProcessor({
maxQueueSize: 50,
maxConcurrent: 2,
processingInterval: 3000,
});
this.contentSummarizer = new ContentSummarizer();
}
/**
* Initialize the learn service
*/
async initialize() {
try {
// Ensure learn content directory exists
await this.ensureDir(this.learnContentDir);
// Start background processor
this.backgroundProcessor.start();
this.logger.info('LearnService initialized', {
dataDir: this.dataDir,
learnContentDir: this.learnContentDir,
});
} catch (error) {
this.logger.error('Failed to initialize LearnService', {
error: error.message,
});
throw error;
}
}
/**
* Shutdown the learn service
*/
async shutdown() {
try {
await this.backgroundProcessor.stop();
this.logger.info('LearnService shutdown complete');
} catch (error) {
this.logger.error('Error during LearnService shutdown', {
error: error.message,
});
}
}
/**
* Add learning sources to a project
*/
async addLearningSources(projectId, urls) {
try {
this.logger.info('Adding learning sources', {
projectId,
urlCount: urls.length,
});
// Validate URLs
const validationResults = this.extractorFactory.validateUrls(urls);
const validUrls = validationResults.filter(r => r.valid).map(r => r.url);
const invalidUrls = validationResults.filter(r => !r.valid);
if (validUrls.length === 0) {
throw new Error('No valid URLs provided');
}
// Add sources to registry
const addedSources = await this.sourceRegistry.addSources(projectId, validUrls);
return {
success: true,
addedSources,
invalidUrls,
message: `Added ${addedSources.length} learning sources. ${invalidUrls.length} URLs were invalid.`,
};
} catch (error) {
this.logger.error('Failed to add learning sources', {
projectId,
error: error.message,
});
throw error;
}
}
/**
* Process learning sources for a project
*/
async processLearningSources(projectId) {
try {
this.logger.info('Starting learning source processing', { projectId });
// Get pending sources
const pendingSources = await this.sourceRegistry.getSourcesByStatus(projectId, 'pending');
if (pendingSources.length === 0) {
return {
success: true,
message: 'No pending sources to process',
processedCount: 0,
};
}
// Queue processing tasks
let queuedCount = 0;
for (const source of pendingSources) {
try {
await this.backgroundProcessor.addTask({
id: `process_${source.id}`,
type: 'content_extraction',
data: { projectId, source },
processor: this.processSourceContent.bind(this),
priority: 1,
});
queuedCount++;
} catch (error) {
this.logger.warn('Failed to queue source for processing', {
sourceId: source.id,
error: error.message,
});
}
}
return {
success: true,
message: `Queued ${queuedCount} sources for processing`,
queuedCount,
totalPending: pendingSources.length,
};
} catch (error) {
this.logger.error('Failed to process learning sources', {
projectId,
error: error.message,
});
throw error;
}
}
/**
* Process individual source content (background task)
*/
async processSourceContent({ projectId, source }) {
try {
this.logger.debug('Processing source content', {
projectId,
sourceId: source.id,
url: source.url,
});
// Update status to processing
await this.sourceRegistry.updateSourceStatus(projectId, source.id, 'processing');
// Extract content
const extractedContent = await this.extractorFactory.extractContent(source.url);
extractedContent.sourceId = source.id;
// Summarize content
const summaryData = await this.contentSummarizer.summarize(extractedContent);
// Save summary
await this.saveSummary(projectId, source.id, summaryData);
// Update source status
await this.sourceRegistry.updateSourceStatus(projectId, source.id, 'completed', {
title: extractedContent.metadata?.title,
duration: extractedContent.metadata?.duration,
wordCount: extractedContent.content?.wordCount,
});
this.logger.info('Source processing completed', {
projectId,
sourceId: source.id,
title: extractedContent.metadata?.title,
});
return { success: true, sourceId: source.id };
} catch (error) {
// Update source status to failed
await this.sourceRegistry.updateSourceStatus(projectId, source.id, 'failed', {
error: error.message,
});
this.logger.error('Source processing failed', {
projectId,
sourceId: source.id,
error: error.message,
});
throw error;
}
}
/**
* Get processing status for a project
*/
async getProcessingStatus(projectId) {
try {
const registryStatus = await this.sourceRegistry.getProcessingStatus(projectId);
const processorStatus = this.backgroundProcessor.getStatus();
return {
projectId,
sources: registryStatus,
processor: {
isRunning: processorStatus.isRunning,
queueSize: processorStatus.queueSize,
processingCount: processorStatus.processingCount,
},
lastUpdated: new Date().toISOString(),
};
} catch (error) {
this.logger.error('Failed to get processing status', {
projectId,
error: error.message,
});
throw error;
}
}
/**
* List learning sources for a project
*/
async listLearningSources(projectId, status = null) {
try {
return await this.sourceRegistry.getSourcesByStatus(projectId, status);
} catch (error) {
this.logger.error('Failed to list learning sources', {
projectId,
status,
error: error.message,
});
throw error;
}
}
/**
* Get learning summary for a project
*/
async getLearningSummary(projectId, sourceId = null, tokenLimit = 2000) {
try {
if (sourceId) {
// Get specific source summary
return await this.loadSummary(projectId, sourceId);
} else {
// Get aggregated summaries
return await this.getAggregatedSummaries(projectId, tokenLimit);
}
} catch (error) {
this.logger.error('Failed to get learning summary', {
projectId,
sourceId,
error: error.message,
});
throw error;
}
}
/**
* Delete learning sources
*/
async deleteLearningSources(projectId, sourceIds) {
try {
// Delete summaries
for (const sourceId of sourceIds) {
await this.deleteSummary(projectId, sourceId);
}
// Delete from registry
const deletedSources = await this.sourceRegistry.deleteSources(projectId, sourceIds);
this.logger.info('Learning sources deleted', {
projectId,
deletedCount: deletedSources.length,
});
return {
success: true,
deletedSources,
message: `Deleted ${deletedSources.length} learning sources`,
};
} catch (error) {
this.logger.error('Failed to delete learning sources', {
projectId,
sourceIds,
error: error.message,
});
throw error;
}
}
// Helper methods
async ensureDir(dirPath) {
try {
await fs.access(dirPath);
} catch (error) {
if (error.code === 'ENOENT') {
await fs.mkdir(dirPath, { recursive: true });
}
}
}
async saveSummary(projectId, sourceId, summaryData) {
const summariesDir = path.join(this.learnContentDir, projectId, 'summaries');
await this.ensureDir(summariesDir);
const summaryPath = path.join(summariesDir, `${sourceId}.json`);
await fs.writeFile(summaryPath, JSON.stringify(summaryData, null, 2));
}
async loadSummary(projectId, sourceId) {
try {
const summaryPath = path.join(
this.learnContentDir,
projectId,
'summaries',
`${sourceId}.json`
);
const data = await fs.readFile(summaryPath, 'utf8');
return JSON.parse(data);
} catch (error) {
if (error.code === 'ENOENT') {
return null;
}
throw error;
}
}
async deleteSummary(projectId, sourceId) {
try {
const summaryPath = path.join(
this.learnContentDir,
projectId,
'summaries',
`${sourceId}.json`
);
await fs.unlink(summaryPath);
} catch (error) {
if (error.code !== 'ENOENT') {
throw error;
}
}
}
async getAggregatedSummaries(projectId, tokenLimit) {
try {
const summariesDir = path.join(this.learnContentDir, projectId, 'summaries');
const files = await fs.readdir(summariesDir);
const summaries = [];
for (const file of files) {
if (file.endsWith('.json')) {
const sourceId = file.replace('.json', '');
const summary = await this.loadSummary(projectId, sourceId);
if (summary) {
summaries.push({ sourceId, ...summary });
}
}
}
if (summaries.length === 0) {
return null;
}
// Sort by relevance score
summaries.sort(
(a, b) => (b.metadata?.relevanceScore || 0) - (a.metadata?.relevanceScore || 0)
);
// Aggregate within token limit
let aggregatedText = '';
let tokenCount = 0;
const includedSources = [];
for (const summary of summaries) {
const summaryText = summary.summary || '';
const estimatedTokens = Math.ceil(summaryText.length / 4);
if (tokenCount + estimatedTokens <= tokenLimit) {
aggregatedText += `\n\n**${summary.metadata?.title || summary.sourceId}**\n${summaryText}`;
tokenCount += estimatedTokens;
includedSources.push(summary.sourceId);
} else {
break;
}
}
return {
aggregatedSummary: aggregatedText.trim(),
tokenCount,
sourcesIncluded: includedSources,
totalSources: summaries.length,
metadata: {
created: new Date().toISOString(),
tokenLimit,
truncated: includedSources.length < summaries.length,
},
};
} catch (error) {
if (error.code === 'ENOENT') {
return null;
}
throw error;
}
}
}