Skip to main content
Glama

Open Search MCP

by flyanima
MIT License
2
  • Apple
  • Linux
concurrent-search-manager.ts17.5 kB
/** * Concurrent Search Manager - 并发搜索管理器 * * 核心功能: * - 多搜索引擎并发查询 * - 结果聚合和去重 * - 负载均衡和故障转移 * - 性能监控和优化 * - 缓存管理 */ import { Logger } from '../utils/logger.js'; import { QueryExpansionEngine, ExpansionResult } from './query-expansion-engine.js'; import { EventEmitter } from 'events'; /** * 搜索引擎类型枚举 */ export enum SearchEngineType { ACADEMIC = 'academic', WEB = 'web', NEWS = 'news', SOCIAL = 'social', TECHNICAL = 'technical', MULTIMEDIA = 'multimedia' } /** * 搜索引擎配置接口 */ export interface SearchEngineConfig { id: string; name: string; type: SearchEngineType; endpoint?: string; priority: number; timeout: number; maxRetries: number; rateLimit: { requestsPerMinute: number; burstLimit: number; }; isEnabled: boolean; healthCheck: { url?: string; interval: number; timeout: number; }; } /** * 搜索请求接口 */ export interface SearchRequest { id: string; query: string; engines: string[]; options: { maxResults: number; timeout: number; enableExpansion: boolean; expansionConfig?: any; filters?: { dateRange?: { start: Date; end: Date }; language?: string; domain?: string; type?: string; }; }; } /** * 搜索结果接口 */ export interface SearchResult { id: string; title: string; url: string; snippet: string; source: string; engine: string; score: number; timestamp: Date; metadata: { type?: string; author?: string; publishDate?: Date; language?: string; domain?: string; }; } /** * 聚合搜索结果接口 */ export interface AggregatedSearchResult { requestId: string; originalQuery: string; expandedQueries?: string[]; results: SearchResult[]; metadata: { totalResults: number; enginesUsed: string[]; processingTime: number; cacheHit: boolean; duplicatesRemoved: number; averageScore: number; }; performance: { engineTimes: Record<string, number>; slowestEngine: string; fastestEngine: string; failedEngines: string[]; }; } /** * 并发搜索管理器类 */ export class ConcurrentSearchManager extends EventEmitter { private logger: Logger; private engines: Map<string, SearchEngineConfig> = new Map(); private queryExpansion: QueryExpansionEngine; private cache: Map<string, AggregatedSearchResult> = new Map(); private rateLimiters: Map<string, { requests: number; resetTime: number }> = new Map(); private healthStatus: Map<string, boolean> = new Map(); private activeRequests: Map<string, AbortController> = new Map(); constructor() { super(); this.logger = new Logger('ConcurrentSearchManager'); this.queryExpansion = new QueryExpansionEngine(); this.initializeEngines(); this.startHealthChecks(); } /** * 执行并发搜索 */ async search(request: SearchRequest): Promise<AggregatedSearchResult> { const startTime = Date.now(); this.logger.info(`Starting concurrent search for: "${request.query}"`); // 检查缓存 const cacheKey = this.generateCacheKey(request); const cachedResult = this.cache.get(cacheKey); if (cachedResult) { this.logger.debug(`Cache hit for query: ${request.query}`); return { ...cachedResult, metadata: { ...cachedResult.metadata, cacheHit: true } }; } // 查询扩展 let expandedQueries: string[] = [request.query]; if (request.options.enableExpansion) { const expansionResult = await this.queryExpansion.expandQuery( request.query, request.options.expansionConfig ); expandedQueries = expansionResult.expandedQueries.map(eq => eq.query); } // 选择可用的搜索引擎 const availableEngines = this.selectAvailableEngines(request.engines); if (availableEngines.length === 0) { throw new Error('No available search engines'); } // 创建并发搜索任务 const searchTasks = this.createSearchTasks(request, expandedQueries, availableEngines); // 执行并发搜索 const searchResults = await this.executeSearchTasks(searchTasks, request.options.timeout); // 聚合和处理结果 const aggregatedResult = this.aggregateResults( request, expandedQueries, searchResults, startTime ); // 缓存结果 this.cache.set(cacheKey, aggregatedResult); // 清理过期缓存 this.cleanupCache(); this.logger.info(`Concurrent search completed in ${aggregatedResult.metadata.processingTime}ms`); this.emit('searchCompleted', aggregatedResult); return aggregatedResult; } /** * 选择可用的搜索引擎 */ private selectAvailableEngines(requestedEngines: string[]): SearchEngineConfig[] { const available: SearchEngineConfig[] = []; for (const engineId of requestedEngines) { const engine = this.engines.get(engineId); if (engine && engine.isEnabled && this.healthStatus.get(engineId)) { if (this.checkRateLimit(engineId)) { available.push(engine); } } } // 按优先级排序 return available.sort((a, b) => b.priority - a.priority); } /** * 创建搜索任务 */ private createSearchTasks( request: SearchRequest, queries: string[], engines: SearchEngineConfig[] ): Array<{ engine: SearchEngineConfig; query: string; controller: AbortController }> { const tasks: Array<{ engine: SearchEngineConfig; query: string; controller: AbortController }> = []; for (const engine of engines) { for (const query of queries.slice(0, 3)) { // 限制每个引擎最多3个查询 const controller = new AbortController(); tasks.push({ engine, query, controller }); } } return tasks; } /** * 执行搜索任务 */ private async executeSearchTasks( tasks: Array<{ engine: SearchEngineConfig; query: string; controller: AbortController }>, timeout: number ): Promise<Array<{ engine: string; query: string; results: SearchResult[]; time: number; error?: string }>> { const results: Array<{ engine: string; query: string; results: SearchResult[]; time: number; error?: string }> = []; // 设置全局超时 const globalTimeout = setTimeout(() => { tasks.forEach(task => task.controller.abort()); }, timeout); try { const promises = tasks.map(async (task) => { const startTime = Date.now(); try { const searchResults = await this.executeEngineSearch( task.engine, task.query, task.controller.signal ); const executionTime = Date.now() - startTime; this.updateRateLimit(task.engine.id); return { engine: task.engine.id, query: task.query, results: searchResults, time: executionTime }; } catch (error) { const executionTime = Date.now() - startTime; this.logger.warn(`Search failed for engine ${task.engine.id}: ${error}`); return { engine: task.engine.id, query: task.query, results: [], time: executionTime, error: error instanceof Error ? error.message : String(error) }; } }); const settledResults = await Promise.allSettled(promises); for (const result of settledResults) { if (result.status === 'fulfilled') { results.push(result.value); } } } finally { clearTimeout(globalTimeout); } return results; } /** * 执行单个引擎搜索 */ private async executeEngineSearch( engine: SearchEngineConfig, query: string, signal: AbortSignal ): Promise<SearchResult[]> { // 模拟搜索引擎调用 return new Promise((resolve, reject) => { const timeout = setTimeout(() => { const mockResults: SearchResult[] = [ { id: `${engine.id}_${Date.now()}_1`, title: `${query} - 搜索结果1`, url: `https://example.com/result1?q=${encodeURIComponent(query)}`, snippet: `这是关于"${query}"的详细信息和分析...`, source: engine.name, engine: engine.id, score: Math.random() * 0.3 + 0.7, // 0.7-1.0 timestamp: new Date(), metadata: { type: engine.type, language: 'zh-CN', domain: 'example.com' } }, { id: `${engine.id}_${Date.now()}_2`, title: `${query} - 深度分析`, url: `https://example.com/result2?q=${encodeURIComponent(query)}`, snippet: `深入探讨"${query}"的各个方面和应用...`, source: engine.name, engine: engine.id, score: Math.random() * 0.3 + 0.6, // 0.6-0.9 timestamp: new Date(), metadata: { type: engine.type, language: 'zh-CN', domain: 'example.com' } } ]; resolve(mockResults); }, Math.random() * 1000 + 500); // 500-1500ms 模拟网络延迟 signal.addEventListener('abort', () => { clearTimeout(timeout); reject(new Error('Search aborted')); }); }); } /** * 聚合搜索结果 */ private aggregateResults( request: SearchRequest, expandedQueries: string[], searchResults: Array<{ engine: string; query: string; results: SearchResult[]; time: number; error?: string }>, startTime: number ): AggregatedSearchResult { // 合并所有结果 const allResults: SearchResult[] = []; const engineTimes: Record<string, number> = {}; const enginesUsed: string[] = []; const failedEngines: string[] = []; for (const result of searchResults) { if (result.error) { failedEngines.push(result.engine); } else { allResults.push(...result.results); if (!enginesUsed.includes(result.engine)) { enginesUsed.push(result.engine); } } engineTimes[result.engine] = (engineTimes[result.engine] || 0) + result.time; } // 去重 const uniqueResults = this.deduplicateResults(allResults); const duplicatesRemoved = allResults.length - uniqueResults.length; // 排序和限制结果数量 const sortedResults = uniqueResults .sort((a, b) => b.score - a.score) .slice(0, request.options.maxResults); // 计算性能指标 const processingTime = Date.now() - startTime; const averageScore = sortedResults.reduce((sum, r) => sum + r.score, 0) / sortedResults.length; const engineTimeEntries = Object.entries(engineTimes); const slowestEngine = engineTimeEntries.reduce((max, [engine, time]) => time > engineTimes[max] ? engine : max, engineTimeEntries[0]?.[0] || '' ); const fastestEngine = engineTimeEntries.reduce((min, [engine, time]) => time < engineTimes[min] ? engine : min, engineTimeEntries[0]?.[0] || '' ); return { requestId: request.id, originalQuery: request.query, expandedQueries: expandedQueries.length > 1 ? expandedQueries : undefined, results: sortedResults, metadata: { totalResults: sortedResults.length, enginesUsed, processingTime, cacheHit: false, duplicatesRemoved, averageScore }, performance: { engineTimes, slowestEngine, fastestEngine, failedEngines } }; } /** * 去重搜索结果 */ private deduplicateResults(results: SearchResult[]): SearchResult[] { const seen = new Set<string>(); const unique: SearchResult[] = []; for (const result of results) { // 使用URL和标题的组合作为去重键 const key = `${result.url}|${result.title}`; if (!seen.has(key)) { seen.add(key); unique.push(result); } } return unique; } /** * 检查速率限制 */ private checkRateLimit(engineId: string): boolean { const engine = this.engines.get(engineId); if (!engine) return false; const now = Date.now(); const limiter = this.rateLimiters.get(engineId); if (!limiter || now > limiter.resetTime) { this.rateLimiters.set(engineId, { requests: 0, resetTime: now + 60000 // 1分钟 }); return true; } return limiter.requests < engine.rateLimit.requestsPerMinute; } /** * 更新速率限制 */ private updateRateLimit(engineId: string): void { const limiter = this.rateLimiters.get(engineId); if (limiter) { limiter.requests++; } } /** * 生成缓存键 */ private generateCacheKey(request: SearchRequest): string { const key = { query: request.query, engines: request.engines.sort(), options: request.options }; return Buffer.from(JSON.stringify(key)).toString('base64'); } /** * 清理过期缓存 */ private cleanupCache(): void { const maxAge = 30 * 60 * 1000; // 30分钟 const now = Date.now(); for (const [key, result] of this.cache.entries()) { if (now - result.metadata.processingTime > maxAge) { this.cache.delete(key); } } } /** * 初始化搜索引擎 */ private initializeEngines(): void { const engines: SearchEngineConfig[] = [ { id: 'duckduckgo', name: 'DuckDuckGo', type: SearchEngineType.WEB, priority: 8, timeout: 5000, maxRetries: 2, rateLimit: { requestsPerMinute: 60, burstLimit: 10 }, isEnabled: true, healthCheck: { interval: 60000, timeout: 3000 } }, { id: 'arxiv', name: 'arXiv', type: SearchEngineType.ACADEMIC, priority: 9, timeout: 8000, maxRetries: 3, rateLimit: { requestsPerMinute: 30, burstLimit: 5 }, isEnabled: true, healthCheck: { interval: 120000, timeout: 5000 } }, { id: 'github', name: 'GitHub', type: SearchEngineType.TECHNICAL, priority: 7, timeout: 6000, maxRetries: 2, rateLimit: { requestsPerMinute: 40, burstLimit: 8 }, isEnabled: true, healthCheck: { interval: 90000, timeout: 4000 } }, { id: 'techcrunch', name: 'TechCrunch', type: SearchEngineType.NEWS, priority: 6, timeout: 4000, maxRetries: 2, rateLimit: { requestsPerMinute: 50, burstLimit: 10 }, isEnabled: true, healthCheck: { interval: 60000, timeout: 3000 } } ]; for (const engine of engines) { this.engines.set(engine.id, engine); this.healthStatus.set(engine.id, true); // 假设初始状态健康 } this.logger.info(`Initialized ${engines.length} search engines`); } /** * 启动健康检查 */ private startHealthChecks(): void { for (const [engineId, engine] of this.engines.entries()) { setInterval(() => { this.performHealthCheck(engineId, engine); }, engine.healthCheck.interval); } } /** * 执行健康检查 */ private async performHealthCheck(engineId: string, engine: SearchEngineConfig): Promise<void> { try { // 模拟健康检查 const isHealthy = Math.random() > 0.1; // 90% 健康率 this.healthStatus.set(engineId, isHealthy); if (!isHealthy) { this.logger.warn(`Health check failed for engine: ${engineId}`); this.emit('engineUnhealthy', engineId); } } catch (error) { this.healthStatus.set(engineId, false); this.logger.error(`Health check error for engine ${engineId}:`, error); } } /** * 获取引擎状态 */ getEngineStatus(): Record<string, { enabled: boolean; healthy: boolean; priority: number }> { const status: Record<string, { enabled: boolean; healthy: boolean; priority: number }> = {}; for (const [engineId, engine] of this.engines.entries()) { status[engineId] = { enabled: engine.isEnabled, healthy: this.healthStatus.get(engineId) || false, priority: engine.priority }; } return status; } /** * 启用/禁用搜索引擎 */ setEngineEnabled(engineId: string, enabled: boolean): void { const engine = this.engines.get(engineId); if (engine) { engine.isEnabled = enabled; this.logger.info(`Engine ${engineId} ${enabled ? 'enabled' : 'disabled'}`); } } /** * 取消搜索请求 */ cancelSearch(requestId: string): void { const controller = this.activeRequests.get(requestId); if (controller) { controller.abort(); this.activeRequests.delete(requestId); this.logger.info(`Cancelled search request: ${requestId}`); } } /** * 获取缓存统计 */ getCacheStats(): { size: number; hitRate: number } { // 简化的缓存统计 return { size: this.cache.size, hitRate: 0.75 // 模拟75%命中率 }; } /** * 清空缓存 */ clearCache(): void { this.cache.clear(); this.logger.info('Cache cleared'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flyanima/open-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server