process-docs.ts•23.1 kB
#!/usr/bin/env node
/**
* Process Documents CLI Script
*
* This script provides a command-line interface for processing documents
* that have been crawled but not yet processed. It's particularly useful if
* you want to run the processing step separately from the crawling step.
*
* @example
* ```
* npm run process-docs -- --job-id 550e8400-e29b-41d4-a716-446655440000 --wait
* ```
*/
// Load environment variables from .env file
import * as dotenv from 'dotenv';
dotenv.config();
import yargs from 'yargs';
import logger from '../utils/logger';
import { JobService } from '../services/job.service';
import { DocumentService } from '../services/document.service';
import { DocumentProcessorService } from '../services/document-processor.service';
import { JobStatus, PrismaClient } from '../generated/prisma';
import { getPrismaClient } from '../config/database';
import inquirer from 'inquirer';
// Define interface for the CLI arguments
interface CliArgs {
'job-id': string;
wait: boolean;
verbose: boolean;
reprocess: boolean;
package?: string;
version?: string;
'skip-prompts': boolean;
[key: string]: unknown;
}
// Define the expected metadata structure
interface JobMetadata {
packageName?: string;
packageVersion?: string;
// Add other fields as necessary
}
/**
* Display job progress in real-time on the CLI
* @param jobId The ID of the job to monitor
* @param prisma PrismaClient instance
* @returns A promise that resolves when the job completes or fails
*/
async function displayJobProgress(jobId: string, prisma: any): Promise<void> {
return new Promise((resolve) => {
const progressInterval = 2000; // Update progress every 2 seconds
let lastProgress = -1;
const interval = setInterval(async () => {
try {
const job = await prisma.job.findUnique({
where: { id: jobId },
select: {
status: true,
progress: true,
stats: true,
error: true,
timeElapsed: true,
timeRemaining: true
}
});
if (!job) {
console.log(`Job ${jobId} not found`);
clearInterval(interval);
resolve();
return;
}
// Only display progress if it has changed
const currentProgress = Math.floor(job.progress * 100);
if (currentProgress !== lastProgress) {
lastProgress = currentProgress;
// Clear previous line and show progress
process.stdout.write('\r\x1b[K'); // Clear line
process.stdout.write(`Progress: ${currentProgress}% `);
// Add status indicators
if (job.stats) {
process.stdout.write(`| Docs: ${job.stats.pagesProcessed || 0} processed, ${job.stats.totalChunks || 0} chunks `);
}
// Add time information if available
if (job.timeElapsed !== null && job.timeRemaining !== null) {
const formatTime = (seconds: number) => {
if (seconds < 60) return `${seconds}s`;
const mins = Math.floor(seconds / 60);
const secs = seconds % 60;
return `${mins}m ${secs}s`;
};
process.stdout.write(`| Time: ${formatTime(job.timeElapsed || 0)} elapsed`);
if (job.timeRemaining) {
process.stdout.write(`, ~${formatTime(job.timeRemaining)} remaining`);
}
}
}
// Check if job is complete
if (['completed', 'failed', 'cancelled', 'paused'].includes(job.status)) {
clearInterval(interval);
// Print a newline to ensure next output starts on a fresh line
console.log('');
resolve();
}
} catch (error) {
console.error(`Error fetching job progress: ${error instanceof Error ? error.message : String(error)}`);
clearInterval(interval);
resolve();
}
}, progressInterval);
});
}
/**
* Deletes all chunks associated with documents belonging to a specific job.
* @param jobId The ID of the job whose chunks should be cleared.
* @param prisma PrismaClient instance.
*/
async function clearChunksForJob(jobId: string, prisma: PrismaClient): Promise<number> {
let deletedCount = 0;
try {
logger.debug(`Finding document IDs for job ${jobId} to clear chunks.`);
// Find all document IDs for the given job
const documents = await prisma.document.findMany({
where: { jobId: jobId },
select: { id: true }
});
if (!documents || documents.length === 0) {
logger.info(`No documents found for job ${jobId}, no chunks to clear.`);
return 0; // Return 0 deleted
}
const documentIds = documents.map(doc => doc.id);
logger.debug(`Found ${documentIds.length} documents for job ${jobId}.`); // Removed listing all IDs for brevity
logger.info(`Deleting chunks associated with ${documentIds.length} documents for job ${jobId}.`);
const deleteResult = await prisma.chunk.deleteMany({
where: {
documentId: {
in: documentIds
}
}
});
deletedCount = deleteResult.count;
logger.info(`Deleted ${deletedCount} chunks for job ${jobId}.`);
} catch (error) {
logger.error(`Error clearing chunks for job ${jobId}:`, error);
// Log the error but allow reprocessing to continue
console.error(`\nError deleting existing chunks: ${error instanceof Error ? error.message : String(error)}`);
console.error('Proceeding with reprocessing, but old chunks might remain.');
// Return the count known so far, even if incomplete
}
return deletedCount;
}
/**
* Prompt the user for package information if not provided via command line
* @param args The parsed command line arguments
* @returns Object containing package name and version
*/
async function promptForPackageInfo(args: CliArgs): Promise<{ packageName: string; packageVersion: string }> {
// Use CLI args if provided
if (args.package && args['skip-prompts']) {
return {
packageName: args.package,
packageVersion: args.version || 'latest'
};
}
// Define answers interface
interface PromptAnswers {
packageName?: string;
packageVersion?: string;
}
// If we have questions to ask, prompt the user
const answers: PromptAnswers = {};
// Only ask for package name if not provided via CLI
if (!args.package) {
const packageNameResponse = await inquirer.prompt<{packageName: string}>({
type: 'input',
name: 'packageName',
message: 'What is the name of the package this documentation is for?',
validate: (input: string) => {
return input.trim().length > 0 ? true : 'Package name is required';
}
});
answers.packageName = packageNameResponse.packageName;
}
// Always ask for version if not provided via CLI
if (!args.version) {
const versionResponse = await inquirer.prompt<{packageVersion: string}>({
type: 'input',
name: 'packageVersion',
message: 'What version of the package is this documentation for? (leave blank for "latest")',
default: 'latest'
});
answers.packageVersion = versionResponse.packageVersion;
}
return {
packageName: args.package || answers.packageName || '',
packageVersion: args.version || answers.packageVersion || 'latest'
};
}
/**
* Check if job metadata contains package information
* @param jobId The ID of the job to check
* @param jobService JobService instance
* @returns True if package metadata exists, false otherwise
*/
async function hasPackageMetadata(jobId: string, jobService: JobService): Promise<boolean> {
try {
const job = await jobService.findJobById(jobId);
if (!job || !job.metadata) return false;
const metadata = job.metadata as Record<string, any>;
return !!(metadata.packageName && metadata.packageName.trim().length > 0);
} catch (error) {
logger.error(`Error checking package metadata for job ${jobId}:`, error);
return false;
}
}
/**
* Process all documents for a specific job
*/
async function processDocumentsForJob(jobId: string, prisma: PrismaClient): Promise<void> {
const jobService = new JobService(prisma);
const documentService = new DocumentService(prisma);
const documentProcessorService = new DocumentProcessorService(prisma);
// Get job details
const job = await jobService.findJobById(jobId);
if (!job) {
throw new Error(`Job with ID ${jobId} not found`);
}
// Update job status to processing stage
await jobService.updateJobProgress(jobId, 'running', 0.5);
await jobService.updateJobMetadata(jobId, { stage: 'processing' });
logger.info(`Starting document processing for job ${jobId}`);
// Get all documents created by this specific job
const documents = await prisma.document.findMany({
where: {
jobId: jobId
}
});
logger.info(`Found ${documents.length} documents to process for job ${jobId}`);
// Process each document
const totalDocs = documents.length;
let processedDocs = 0;
for (const document of documents) {
try {
logger.info(`Processing document ${document.id} (${document.title}) for job ${jobId}`);
// Check if document.metadata is empty and provide default values if necessary
const metadata = document.metadata || {
package: (job.metadata as JobMetadata)?.packageName || '', // Default package name
version: (job.metadata as JobMetadata)?.packageVersion || 'latest', // Default version
type: 'documentation', // Default type
tags: ['default'], // Default tags
};
// Process the document HTML and generate embeddings
await documentProcessorService.processDocument(document.id, document.content, metadata);
// Update job progress
processedDocs++;
const progress = 0.5 + (0.5 * (processedDocs / totalDocs));
await jobService.updateJobProgress(jobId, 'running', progress);
// Update job stats
const currentStats = (await jobService.findJobById(jobId))?.stats as Record<string, number>;
await jobService.updateJobStats(jobId, {
...currentStats,
pagesProcessed: processedDocs,
pagesSkipped: 0,
totalChunks: processedDocs, // Will be refined later with actual chunk count
});
logger.info(`Successfully processed document ${document.id} for job ${jobId}`);
} catch (error) {
logger.error(`Error processing document ${document.id} for job ${jobId}:`, error);
// Update document to mark it as having processing errors
await documentService.updateDocument(document.id, {
metadata: {
...document.metadata as Record<string, any>,
processingError: error instanceof Error ? error.message : 'Unknown error during processing',
},
});
// Continue with other documents despite the error
}
}
// Get the actual number of chunks created by this job
const stats = await prisma.$queryRaw<[{ count: BigInt }]>`
SELECT COUNT(*) as count FROM chunks c
INNER JOIN documents d ON d.id = c.document_id
WHERE d.job_id = ${jobId}
`;
// Convert BigInt to Number for stats
const chunkCount = stats[0]?.count ? Number(stats[0].count) : processedDocs;
// Mark job as completed
await jobService.updateJobProgress(jobId, 'completed', 1.0);
await jobService.updateJobStats(jobId, {
pagesProcessed: processedDocs,
pagesSkipped: totalDocs - processedDocs,
totalChunks: Number(chunkCount),
});
logger.info(`Document processing and embedding generation completed for job ${jobId}`);
logger.info(`Job statistics: ${processedDocs} documents processed, ${chunkCount} chunks created`);
}
/**
* Main function to parse arguments and execute the document processing
*/
async function main() {
// Filter out the '--' argument that npm adds when running as 'npm run process-docs -- --args'
const filteredArgs = process.argv.filter(arg => arg !== '--');
// Parse command line arguments using yargs and the filtered arguments
const argv = yargs(filteredArgs)
.usage('Usage: $0 --job-id <jobId> [--reprocess] [options]')
.option('job-id', {
type: 'string',
demandOption: true,
describe: 'The ID of the job to process documents for'
})
.option('wait', {
type: 'boolean',
default: false,
describe: 'Wait for the job to complete before exiting'
})
.option('verbose', {
type: 'boolean',
alias: 'v',
default: false,
describe: 'Enable more detailed logging'
})
.option('reprocess', {
type: 'boolean',
default: false,
describe: 'Delete existing chunks and reprocess all documents for the specified job'
})
.option('package', {
type: 'string',
describe: 'The name of the package this documentation is for'
})
.option('version', {
type: 'string',
describe: 'The version of the package this documentation is for (defaults to "latest")'
})
.option('skip-prompts', {
type: 'boolean',
default: false,
describe: 'Skip interactive prompts and use provided CLI arguments or defaults'
})
.epilogue('For more information, see the documentation in the README.md file.')
.help()
.alias('help', 'h')
.parseSync() as CliArgs;
try {
// Enable verbose logging if the flag is set
if (argv.verbose) {
logger.level = 'debug';
logger.debug('Verbose logging enabled');
logger.debug('Parsed CLI arguments:', argv);
}
logger.info(`Starting document processing for job: ${argv['job-id']}`);
try {
// Connect to the database
const prisma = getPrismaClient();
// Initialize the job service
const jobService = new JobService(prisma);
// Check if job exists
const job = await jobService.findJobById(argv['job-id']);
if (!job) {
console.error(`Error: Job ${argv['job-id']} not found`);
process.exit(1);
}
console.log(`Found job ${job.id} (${job.status})`);
// --- Reprocessing Logic ---
if (argv.reprocess) {
logger.info(`Reprocess flag set for job ${job.id}. Deleting existing chunks...`);
console.log(`Reprocessing requested. Deleting existing chunks for job ${job.id}...`);
const deletedCount = await clearChunksForJob(job.id, prisma);
console.log(`Deleted ${deletedCount} existing chunks.`);
// Check if the job has package metadata
const hasPackageInfo = await hasPackageMetadata(job.id, jobService);
// If package info was provided via CLI, override existing metadata
if (argv.package) {
const currentMetadata = (job.metadata as Record<string, any>) || {};
await jobService.updateJobMetadata(job.id, {
...currentMetadata,
packageName: argv.package,
packageVersion: argv.version || 'latest'
});
console.log(`\nPackage mapping ${hasPackageInfo ? 'updated' : 'set'} from arguments: ${argv.package}@${argv.version || 'latest'}`);
}
// If no package info and not skipping prompts, ask the user
else if (!hasPackageInfo && !argv['skip-prompts']) {
console.log(`\nNo package metadata found for job ${job.id}.`);
console.log(`Package metadata helps properly categorize and map documentation.`);
// Get package information through prompting
const packageInfo = await promptForPackageInfo(argv);
if (packageInfo.packageName) {
// Update job metadata with the package information
const currentMetadata = (job.metadata as Record<string, any>) || {};
await jobService.updateJobMetadata(job.id, {
...currentMetadata,
packageName: packageInfo.packageName,
packageVersion: packageInfo.packageVersion
});
console.log(`\nPackage mapping set: ${packageInfo.packageName}@${packageInfo.packageVersion}`);
} else {
console.log(`\nNo package mapping set. Documents will not be mapped to a specific package.`);
}
}
// Just display existing metadata
else if (hasPackageInfo) {
// Show existing package metadata
const metadata = (job.metadata as Record<string, any>) || {};
console.log(`\nUsing existing package mapping: ${metadata.packageName}@${metadata.packageVersion || 'latest'}`);
}
// Reset job progress and stats before reprocessing
logger.info(`Resetting progress and stats for job ${job.id} before reprocessing.`);
await jobService.updateJobProgress(job.id, 'pending', 0); // Reset status to pending, progress to 0
await jobService.updateJobStats(job.id, { // Reset stats (removed 'errors')
pagesProcessed: 0,
totalChunks: 0,
pagesSkipped: 0,
});
// Update job metadata stage if needed, e.g., back to 'crawling' or keep as 'processing'?
// Let's leave the stage as is for now, assuming reprocessing stays within the 'processing' context.
logger.info(`Existing chunks deleted and job state reset for job ${job.id}. Starting reprocessing.`);
console.log(`Job state reset. Starting reprocessing...`);
}
// --- End Reprocessing Logic ---
if (!argv.wait) {
// Start the processing in the background
processDocumentsForJob(job.id, prisma).catch(async (error) => {
logger.error(`Unhandled error in background job ${job.id}:`, error);
// Update job status to failed and store error message
const errorMsg = `Background processing error: ${error instanceof Error ? error.message : String(error)}`;
try {
// First, update status and progress (progress undefined keeps it as is or resets based on status)
await jobService.updateJobProgress(job.id, 'failed', 0);
// Then, update metadata to store the error message
await jobService.updateJobMetadata(job.id, { error: errorMsg });
} catch (e: any) {
logger.error(`Failed to update job ${job.id} status/metadata after background error: ${e}`);
}
});
console.log(`Document processing started in the background.`);
console.log(`Use 'mcp_docmcp_local_stdio_get_job_status' with jobId=${job.id} to check progress.`);
// Exit after starting the process
process.exit(0);
} else {
// Implement wait functionality
console.log(`Waiting for job ${job.id} to complete...`);
// When --wait is specified, start displaying progress
console.log(`Real-time progress display enabled. Press Ctrl+C to stop (job will continue in background).`);
// Start the process
const processPromise = processDocumentsForJob(job.id, prisma);
// Display progress while job is running
const progressDisplayPromise = displayJobProgress(job.id, prisma);
// Wait for both the process and progress display to finish
try {
await Promise.all([processPromise, progressDisplayPromise]);
// Get final job details
const finalJob = await jobService.findJobById(job.id);
if (!finalJob) {
console.error(`Error: Job ${job.id} not found`);
process.exit(1);
}
// Final status report
console.log(`\nJob ${job.id} ${finalJob.status}!`);
if (finalJob.status === 'completed') {
const jobStats = finalJob.stats as Record<string, any>;
if (jobStats) {
console.log(`Processed ${jobStats.pagesProcessed || 0} documents`);
console.log(`Created ${jobStats.totalChunks || 0} chunks`);
}
console.log(`\nDocumentation processing complete.`);
process.exit(0);
} else {
console.error(`\nJob ended with status: ${finalJob.status}`);
if (finalJob.error) {
console.error(`Error details: ${finalJob.error}`);
}
process.exit(finalJob.status === 'cancelled' || finalJob.status === 'paused' ? 0 : 1);
}
} catch (error) {
console.error(`\nError during job execution or progress display: ${error instanceof Error ? error.message : String(error)}`);
// Attempt to mark job as failed and store error message
const errorMsg = `Processing error: ${error instanceof Error ? error.message : String(error)}`;
try {
// First, update status and progress
await jobService.updateJobProgress(job.id, 'failed', 0);
// Then, update metadata with the error
await jobService.updateJobMetadata(job.id, { error: errorMsg });
} catch (e: any) {
logger.error(`Failed to update job ${job.id} status/metadata after error: ${e}`);
}
process.exit(1);
}
}
} catch (dbError) {
// Check for database connection errors
const errorMessage = dbError instanceof Error ? dbError.message : String(dbError);
if (errorMessage.includes("Can't reach database server") ||
errorMessage.includes("connection") ||
errorMessage.includes("ECONNREFUSED")) {
logger.error('Database connection error');
console.error('\n❌ Database Connection Error');
console.error('-----------------------------');
console.error('Could not connect to the PostgreSQL database.');
console.error('\nPossible solutions:');
console.error('1. Make sure the PostgreSQL server is running');
console.error('2. Check that the database connection settings in .env are correct');
console.error('3. If using Docker, ensure the database container is up and running:');
console.error(' $ ./docker-start.sh');
console.error('\nSee README.md for more information on setting up the environment.');
process.exit(2); // Special exit code for database connection issues
}
// Re-throw other database errors
throw dbError;
}
} catch (error) {
logger.error('Error during script initialization or argument parsing:', error);
console.error(`Initialization Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
process.exit(1);
}
}
// Execute the main function
main().catch(error => {
logger.error('Unhandled error in process-docs script:', error);
console.error(`Fatal error: ${error instanceof Error ? error.message : 'Unknown error'}`);
process.exit(1);
});