Skip to main content
Glama

Open Search MCP

by flyanima
MIT License
2
  • Apple
  • Linux
batch-content-analyzer.ts13.5 kB
/** * 批量内容分析引擎 - 并行处理多个URL的内容提取 * 支持异步任务管理、状态跟踪、智能重试和性能监控 */ import { EventEmitter } from 'events'; import { v4 as uuidv4 } from 'uuid'; import { EnhancedContentExtractor, ExtractedData, ExtractionOptions } from './enhanced-content-extractor.js'; import { Logger } from '../utils/logger.js'; export type BatchJobId = string; export interface AnalysisConfig { extractionSchema?: any; prompt?: string; systemPrompt?: string; formats: OutputFormat[]; concurrency: number; retryConfig: RetryConfig; timeoutMs: number; enableStreaming?: boolean; } export interface RetryConfig { maxRetries: number; backoffMs: number; exponentialBackoff?: boolean; } export interface OutputFormat { type: 'markdown' | 'json' | 'html' | 'screenshot'; options?: any; } export interface BatchStatus { jobId: string; status: 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled'; progress: { total: number; completed: number; failed: number; remaining: number; percentage: number; }; estimatedTimeRemaining?: number; errors?: ProcessingError[]; startedAt: string; completedAt?: string; performance: { averageProcessingTime: number; throughputPerSecond: number; totalProcessingTime: number; }; } export interface BatchResults { jobId: string; results: AnalysisResult[]; summary: { totalUrls: number; successfulExtractions: number; failedExtractions: number; averageConfidence: number; totalProcessingTime: number; }; errors: ProcessingError[]; } export interface AnalysisResult { url: string; success: boolean; data?: ExtractedData; error?: ProcessingError; processingTime: number; retryCount: number; } export interface ProcessingError { url: string; error: string; timestamp: string; retryCount: number; errorType: 'network' | 'parsing' | 'timeout' | 'unknown'; } export class BatchContentAnalyzer extends EventEmitter { private contentExtractor: EnhancedContentExtractor; private logger: Logger; private activeJobs: Map<BatchJobId, BatchJob> = new Map(); private jobQueue: BatchJob[] = []; private isProcessing: boolean = false; constructor() { super(); this.contentExtractor = new EnhancedContentExtractor(); this.logger = new Logger('BatchContentAnalyzer'); } /** * 启动批量分析任务 */ async startBatchAnalysis( urls: string[], analysisConfig: AnalysisConfig ): Promise<BatchJobId> { const jobId = uuidv4(); const job: BatchJob = { id: jobId, urls: [...urls], config: analysisConfig, status: 'queued', results: [], errors: [], startedAt: new Date().toISOString(), progress: { total: urls.length, completed: 0, failed: 0, remaining: urls.length, percentage: 0 }, performance: { averageProcessingTime: 0, throughputPerSecond: 0, totalProcessingTime: 0 }, streamingResults: analysisConfig.enableStreaming ? [] : undefined }; this.activeJobs.set(jobId, job); this.jobQueue.push(job); this.logger.info(`Batch analysis job ${jobId} queued with ${urls.length} URLs`); // 启动处理队列 this.processQueue(); return jobId; } /** * 检查批量任务状态 */ async checkBatchStatus(jobId: BatchJobId): Promise<BatchStatus> { const job = this.activeJobs.get(jobId); if (!job) { throw new Error(`Job ${jobId} not found`); } return { jobId: job.id, status: job.status, progress: job.progress, estimatedTimeRemaining: this.calculateEstimatedTime(job), errors: job.errors, startedAt: job.startedAt, completedAt: job.completedAt, performance: job.performance }; } /** * 获取批量任务结果 */ async getBatchResults(jobId: BatchJobId): Promise<BatchResults> { const job = this.activeJobs.get(jobId); if (!job) { throw new Error(`Job ${jobId} not found`); } if (job.status !== 'completed' && job.status !== 'failed') { throw new Error(`Job ${jobId} is not completed yet`); } const successfulResults = job.results.filter(r => r.success); const averageConfidence = successfulResults.length > 0 ? successfulResults.reduce((sum, r) => sum + (r.data?.confidence || 0), 0) / successfulResults.length : 0; return { jobId: job.id, results: job.results, summary: { totalUrls: job.urls.length, successfulExtractions: successfulResults.length, failedExtractions: job.results.filter(r => !r.success).length, averageConfidence, totalProcessingTime: job.performance.totalProcessingTime }, errors: job.errors }; } /** * 流式批量分析 */ async* streamBatchAnalysis( urls: string[], config: AnalysisConfig ): AsyncIterator<AnalysisResult> { const jobId = await this.startBatchAnalysis(urls, { ...config, enableStreaming: true }); // 监听结果事件 while (true) { const status = await this.checkBatchStatus(jobId); if (status.status === 'completed' || status.status === 'failed') { break; } // 等待新结果 await this.delay(100); const job = this.activeJobs.get(jobId); if (job && job.streamingResults && job.streamingResults.length > 0) { const result = job.streamingResults.shift(); if (result) { yield result; } } } } /** * 取消批量任务 */ async cancelBatchJob(jobId: BatchJobId): Promise<void> { const job = this.activeJobs.get(jobId); if (!job) { throw new Error(`Job ${jobId} not found`); } if (job.status === 'completed' || job.status === 'failed') { throw new Error(`Job ${jobId} is already ${job.status}`); } job.status = 'cancelled'; job.completedAt = new Date().toISOString(); this.logger.info(`Batch job ${jobId} cancelled`); this.emit('jobCancelled', jobId); } /** * 处理任务队列 */ private async processQueue(): Promise<void> { if (this.isProcessing || this.jobQueue.length === 0) { return; } this.isProcessing = true; while (this.jobQueue.length > 0) { const job = this.jobQueue.shift(); if (job) { await this.processJob(job); } } this.isProcessing = false; } /** * 处理单个任务 */ private async processJob(job: BatchJob): Promise<void> { job.status = 'processing'; const startTime = Date.now(); this.logger.info(`Starting batch job ${job.id} with ${job.urls.length} URLs`); this.emit('jobStarted', job.id); try { // 创建并发处理池 const semaphore = new Semaphore(job.config.concurrency); const processingPromises = job.urls.map(url => this.processUrl(url, job, semaphore) ); // 等待所有URL处理完成 await Promise.allSettled(processingPromises); // 更新任务状态 job.status = 'completed'; job.completedAt = new Date().toISOString(); job.performance.totalProcessingTime = Date.now() - startTime; this.logger.info(`Batch job ${job.id} completed: ${job.progress.completed}/${job.progress.total} successful`); this.emit('jobCompleted', job.id); } catch (error) { job.status = 'failed'; job.completedAt = new Date().toISOString(); this.logger.error(`Batch job ${job.id} failed:`, error); this.emit('jobFailed', job.id, error); } } /** * 处理单个URL */ private async processUrl(url: string, job: BatchJob, semaphore: Semaphore): Promise<void> { await semaphore.acquire(); try { const result = await this.processUrlWithRetry(url, job); job.results.push(result); if (result.success) { job.progress.completed++; } else { job.progress.failed++; } job.progress.remaining--; job.progress.percentage = (job.progress.completed + job.progress.failed) / job.progress.total * 100; // 更新性能指标 this.updatePerformanceMetrics(job, result.processingTime); // 流式输出 if (job.config.enableStreaming && job.streamingResults) { job.streamingResults.push(result); } this.emit('urlProcessed', job.id, result); } finally { semaphore.release(); } } /** * 带重试的URL处理 */ private async processUrlWithRetry(url: string, job: BatchJob): Promise<AnalysisResult> { let lastError: Error | null = null; let retryCount = 0; while (retryCount <= job.config.retryConfig.maxRetries) { try { const startTime = Date.now(); let extractedData: ExtractedData; if (job.config.extractionSchema) { extractedData = await this.contentExtractor.extractWithSchema( url, job.config.extractionSchema, this.buildExtractionOptions(job.config) ); } else if (job.config.prompt) { extractedData = await this.contentExtractor.extractWithPrompt( url, job.config.prompt, job.config.systemPrompt, this.buildExtractionOptions(job.config) ); } else { throw new Error('Either extractionSchema or prompt must be provided'); } const processingTime = Date.now() - startTime; return { url, success: true, data: extractedData, processingTime, retryCount }; } catch (error) { lastError = error as Error; retryCount++; if (retryCount <= job.config.retryConfig.maxRetries) { const delay = this.calculateRetryDelay(retryCount, job.config.retryConfig); await this.delay(delay); this.logger.warn(`Retrying URL ${url} (attempt ${retryCount}/${job.config.retryConfig.maxRetries})`); } } } // 记录错误 const processingError: ProcessingError = { url, error: lastError?.message || 'Unknown error', timestamp: new Date().toISOString(), retryCount, errorType: this.classifyError(lastError) }; job.errors.push(processingError); return { url, success: false, error: processingError, processingTime: 0, retryCount }; } /** * 工具方法 */ private buildExtractionOptions(config: AnalysisConfig): ExtractionOptions { return { formats: config.formats.map(f => f.type), onlyMainContent: true, timeout: config.timeoutMs }; } private calculateRetryDelay(retryCount: number, retryConfig: RetryConfig): number { if (retryConfig.exponentialBackoff) { return retryConfig.backoffMs * Math.pow(2, retryCount - 1); } return retryConfig.backoffMs; } private classifyError(error: Error | null): ProcessingError['errorType'] { if (!error) return 'unknown'; const message = error.message.toLowerCase(); if (message.includes('timeout')) return 'timeout'; if (message.includes('network') || message.includes('connection')) return 'network'; if (message.includes('parse') || message.includes('invalid')) return 'parsing'; return 'unknown'; } private calculateEstimatedTime(job: BatchJob): number | undefined { if (job.progress.completed === 0) return undefined; const avgTime = job.performance.averageProcessingTime; return avgTime * job.progress.remaining; } private updatePerformanceMetrics(job: BatchJob, processingTime: number): void { const completed = job.progress.completed + job.progress.failed; job.performance.averageProcessingTime = (job.performance.averageProcessingTime * (completed - 1) + processingTime) / completed; const elapsedTime = Date.now() - new Date(job.startedAt).getTime(); job.performance.throughputPerSecond = completed / (elapsedTime / 1000); } private delay(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } } /** * 信号量实现 */ class Semaphore { private permits: number; private waitQueue: Array<() => void> = []; constructor(permits: number) { this.permits = permits; } async acquire(): Promise<void> { if (this.permits > 0) { this.permits--; return; } return new Promise<void>(resolve => { this.waitQueue.push(resolve); }); } release(): void { if (this.waitQueue.length > 0) { const resolve = this.waitQueue.shift(); if (resolve) resolve(); } else { this.permits++; } } } /** * 内部任务接口 */ interface BatchJob { id: string; urls: string[]; config: AnalysisConfig; status: 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled'; results: AnalysisResult[]; errors: ProcessingError[]; startedAt: string; completedAt?: string; progress: { total: number; completed: number; failed: number; remaining: number; percentage: number; }; performance: { averageProcessingTime: number; throughputPerSecond: number; totalProcessingTime: number; }; streamingResults?: AnalysisResult[]; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flyanima/open-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server