Skip to main content
Glama

Open Search MCP

by flyanima
MIT License
2
  • Apple
  • Linux
api-aggregator.ts8.17 kB
/** * 多API聚合器 - Open Search MCP v2.0 阶段一核心组件 * 实现智能API调度、负载均衡、故障转移 */ import { SearchAPI, SearchOptions, SearchResult, APIHealthMetrics } from '../types.js'; import { Logger } from '../utils/logger.js'; import { LoadBalancer } from './load-balancer.js'; import { APIHealthMonitor } from './api-health-monitor.js'; import { IntelligentCache } from './intelligent-cache.js'; import { createHash } from 'crypto'; export interface APIAggregatorConfig { apis: { [key: string]: { enabled: boolean; priority: number; rateLimit: number; timeout: number; retryAttempts: number; }; }; loadBalancing: { strategy: 'round-robin' | 'weighted' | 'least-connections' | 'health-based'; healthCheckInterval: number; failoverThreshold: number; }; cache: { enabled: boolean; ttl: number; maxSize: number; }; fallback: { enabled: boolean; maxFallbackAttempts: number; fallbackDelay: number; }; } export class APIAggregator { private apis: Map<string, SearchAPI> = new Map(); private config: APIAggregatorConfig; private healthMonitor: APIHealthMonitor; private loadBalancer: LoadBalancer; private cache: IntelligentCache; private logger: Logger; constructor(config: APIAggregatorConfig) { this.config = config; this.logger = new Logger('APIAggregator'); this.healthMonitor = new APIHealthMonitor(config.loadBalancing); this.loadBalancer = new LoadBalancer(config.loadBalancing); this.cache = new IntelligentCache(config.cache); this.initializeAPIs(); this.startHealthMonitoring(); } /** * 主搜索方法 - 智能聚合多个API的搜索结果 */ async search(query: string, options: SearchOptions = {}): Promise<SearchResult[]> { const startTime = Date.now(); try { // 1. 检查缓存 (Simplified - cache not implemented) const cacheKey = this.generateCacheKey(query, options); // if (this.config.cache.enabled) { // const cachedResults = await this.cache.get(cacheKey); // if (cachedResults) { // this.logger.info(`Cache hit for query: ${query}`); // return cachedResults; // } // } // 2. 获取健康的API const healthyAPIs = await this.healthMonitor.getHealthyAPIs(); if (healthyAPIs.length === 0) { throw new Error('No healthy APIs available'); } // 3. 负载均衡选择API const selectedAPIs = this.loadBalancer.selectAPIs(healthyAPIs, options); this.logger.info(`Selected ${selectedAPIs.length} APIs for query: ${query}`); // 4. 并发执行搜索 const searchPromises = selectedAPIs.map(api => this.executeSearchWithFallback(api, query, options) ); // 5. 等待所有搜索完成 const results = await Promise.allSettled(searchPromises); // 6. 聚合和去重结果 const aggregatedResults = this.aggregateResults(results, query); // 7. 缓存结果 (Simplified - cache not implemented) // if (this.config.cache.enabled && aggregatedResults.length > 0) { // await this.cache.set(cacheKey, aggregatedResults); // } // 8. 记录性能指标 const duration = Date.now() - startTime; this.logger.info(`Search completed in ${duration}ms, found ${aggregatedResults.length} results`); return aggregatedResults; } catch (error) { this.logger.error('Search aggregation failed:', error); throw error; } } /** * 带故障转移的搜索执行 */ private async executeSearchWithFallback( api: SearchAPI, query: string, options: SearchOptions ): Promise<SearchResult[]> { const apiConfig = this.config.apis[api.name]; let lastError: Error | null = null; for (let attempt = 0; attempt < apiConfig.retryAttempts; attempt++) { try { // 设置超时 const timeoutPromise = new Promise<never>((_, reject) => { setTimeout(() => reject(new Error('API timeout')), apiConfig.timeout); }); const searchPromise = api.search(query, options); const results = await Promise.race([searchPromise, timeoutPromise]); // 记录成功 this.healthMonitor.recordSuccess(api.name); return results; } catch (error) { lastError = error as Error; this.healthMonitor.recordError(api.name, error as Error); // 如果不是最后一次尝试,等待后重试 if (attempt < apiConfig.retryAttempts - 1) { await this.delay(1000 * (attempt + 1)); // 指数退避 } } } // 尝试故障转移 if (this.config.fallback.enabled) { const fallbackAPI = this.loadBalancer.getFallbackAPI(api); if (fallbackAPI && fallbackAPI !== api) { this.logger.warn(`Attempting fallback from ${api.name} to ${fallbackAPI.name}`); await this.delay(this.config.fallback.fallbackDelay); return await this.executeSearchWithFallback(fallbackAPI, query, options); } } this.logger.error(`All attempts failed for API ${api.name}:`, lastError); return []; } /** * 聚合和去重搜索结果 */ private aggregateResults( results: PromiseSettledResult<SearchResult[]>[], query: string ): SearchResult[] { const allResults: SearchResult[] = []; const seenUrls = new Set<string>(); for (const result of results) { if (result.status === 'fulfilled' && result.value) { for (const searchResult of result.value) { // 简单去重:基于URL if (!seenUrls.has(searchResult.url)) { seenUrls.add(searchResult.url); allResults.push({ ...searchResult, // 添加聚合元数据 aggregationMetadata: { aggregatedAt: new Date().toISOString(), query: query, source: 'api-aggregator' } }); } } } } // 按相关性排序 return allResults.sort((a, b) => (b.relevanceScore || 0) - (a.relevanceScore || 0)); } /** * 生成缓存键 */ private generateCacheKey(query: string, options: SearchOptions): string { const keyData = { query: query.toLowerCase().trim(), options: { maxResults: options.maxResults, language: options.language, timeRange: options.timeRange, sources: options.sources?.sort() } }; return createHash('md5').update(JSON.stringify(keyData)).digest('hex'); } /** * 初始化API实例 */ private initializeAPIs(): void { // 这里会根据配置初始化各种API // 具体的API实现将在后续添加 this.logger.info('Initializing APIs...'); for (const [apiName, apiConfig] of Object.entries(this.config.apis)) { if (apiConfig.enabled) { this.logger.info(`API ${apiName} enabled with priority ${apiConfig.priority}`); } } } /** * 启动健康监控 */ private startHealthMonitoring(): void { setInterval(async () => { await this.healthMonitor.performHealthCheck(); }, this.config.loadBalancing.healthCheckInterval); } /** * 延迟工具函数 */ private delay(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } /** * 获取聚合器状态 */ public getStatus(): { totalAPIs: number; healthyAPIs: number; cacheStats: any; healthMetrics: Map<string, APIHealthMetrics>; } { return { totalAPIs: this.apis.size, healthyAPIs: this.healthMonitor.getHealthyAPIs().length, // cacheStats: this.cache.getStats(), // Simplified - cache not implemented cacheStats: { hits: 0, misses: 0, size: 0 }, healthMetrics: this.healthMonitor.getHealthMetrics() }; } /** * 优雅关闭 */ public async shutdown(): Promise<void> { this.logger.info('Shutting down API Aggregator...'); // await this.cache.close(); // Simplified - cache not implemented // 清理其他资源 } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flyanima/open-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server