Skip to main content
Glama

Open Search MCP

by flyanima
MIT License
2
  • Apple
  • Linux
concurrency-manager.ts11.6 kB
import { defaultLogger as logger } from './logger.js'; export interface QueuedRequest { id: string; toolName: string; priority: number; timestamp: number; resolve: (value: any) => void; reject: (error: Error) => void; executor: () => Promise<any>; } export interface ConcurrencyStats { activeRequests: number; queuedRequests: number; completedRequests: number; failedRequests: number; averageWaitTime: number; averageExecutionTime: number; throughput: number; // requests per second } export interface ConcurrencyConfig { maxConcurrentRequests: number; maxQueueSize: number; requestTimeout: number; priorityLevels: number; enablePrioritization: boolean; } /** * Concurrency Manager for controlling parallel tool executions * Implements request queuing, prioritization, and rate limiting */ export class ConcurrencyManager { private activeRequests: Map<string, { startTime: number; toolName: string }> = new Map(); private requestQueue: QueuedRequest[] = []; private config: ConcurrencyConfig; private stats = { completed: 0, failed: 0, totalWaitTime: 0, totalExecutionTime: 0, startTime: Date.now() }; constructor(config?: Partial<ConcurrencyConfig>) { this.config = { maxConcurrentRequests: parseInt(process.env.MAX_CONCURRENT_REQUESTS || '5'), maxQueueSize: parseInt(process.env.MAX_QUEUE_SIZE || '100'), requestTimeout: parseInt(process.env.REQUEST_TIMEOUT || '30000'), priorityLevels: 5, enablePrioritization: true, ...config }; } /** * Execute a tool with concurrency control */ public async execute<T>( toolName: string, executor: () => Promise<T>, options: { priority?: number; timeout?: number; id?: string; } = {} ): Promise<T> { const requestId = options.id || this.generateRequestId(); const priority = options.priority || 0; const timeout = options.timeout || this.config.requestTimeout; return new Promise<T>((resolve, reject) => { const queuedRequest: QueuedRequest = { id: requestId, toolName, priority, timestamp: Date.now(), resolve, reject, executor }; // Check if we can execute immediately if (this.activeRequests.size < this.config.maxConcurrentRequests) { this.executeRequest(queuedRequest, timeout); } else { this.queueRequest(queuedRequest, timeout); } }); } /** * Queue a request for later execution */ private queueRequest(request: QueuedRequest, timeout: number): void { // Check queue size limit if (this.requestQueue.length >= this.config.maxQueueSize) { request.reject(new Error(`Queue is full (${this.config.maxQueueSize} requests)`)); this.stats.failed++; return; } // Add to queue with prioritization if (this.config.enablePrioritization) { this.insertByPriority(request); } else { this.requestQueue.push(request); } // Set timeout for queued request setTimeout(() => { const index = this.requestQueue.findIndex(r => r.id === request.id); if (index !== -1) { this.requestQueue.splice(index, 1); request.reject(new Error(`Request timeout while queued (${timeout}ms)`)); this.stats.failed++; } }, timeout); logger.debug(`Request queued: ${request.toolName} (priority: ${request.priority}, queue size: ${this.requestQueue.length})`); } /** * Insert request into queue by priority */ private insertByPriority(request: QueuedRequest): void { let insertIndex = this.requestQueue.length; // Find insertion point (higher priority = lower number = earlier execution) for (let i = 0; i < this.requestQueue.length; i++) { if (request.priority < this.requestQueue[i].priority) { insertIndex = i; break; } } this.requestQueue.splice(insertIndex, 0, request); } /** * Execute a request immediately */ private async executeRequest(request: QueuedRequest, timeout: number): Promise<void> { const startTime = Date.now(); const waitTime = startTime - request.timestamp; this.activeRequests.set(request.id, { startTime, toolName: request.toolName }); // Set execution timeout const timeoutHandle = setTimeout(() => { this.activeRequests.delete(request.id); request.reject(new Error(`Request execution timeout (${timeout}ms)`)); this.stats.failed++; this.processQueue(); }, timeout); try { logger.debug(`Executing request: ${request.toolName} (wait time: ${waitTime}ms)`); const result = await request.executor(); clearTimeout(timeoutHandle); this.activeRequests.delete(request.id); const executionTime = Date.now() - startTime; this.updateStats(waitTime, executionTime, true); request.resolve(result); logger.debug(`Request completed: ${request.toolName} (execution time: ${executionTime}ms)`); } catch (error) { clearTimeout(timeoutHandle); this.activeRequests.delete(request.id); const executionTime = Date.now() - startTime; this.updateStats(waitTime, executionTime, false); request.reject(error instanceof Error ? error : new Error(String(error))); logger.error(`Request failed: ${request.toolName} (execution time: ${executionTime}ms)`, error); } finally { // Process next request in queue this.processQueue(); } } /** * Process the next request in queue */ private processQueue(): void { if (this.requestQueue.length === 0 || this.activeRequests.size >= this.config.maxConcurrentRequests) { return; } const nextRequest = this.requestQueue.shift(); if (nextRequest) { this.executeRequest(nextRequest, this.config.requestTimeout); } } /** * Update performance statistics */ private updateStats(waitTime: number, executionTime: number, success: boolean): void { this.stats.totalWaitTime += waitTime; this.stats.totalExecutionTime += executionTime; if (success) { this.stats.completed++; } else { this.stats.failed++; } } /** * Get current concurrency statistics */ public getStats(): ConcurrencyStats { const totalRequests = this.stats.completed + this.stats.failed; const uptime = (Date.now() - this.stats.startTime) / 1000; // seconds return { activeRequests: this.activeRequests.size, queuedRequests: this.requestQueue.length, completedRequests: this.stats.completed, failedRequests: this.stats.failed, averageWaitTime: totalRequests > 0 ? this.stats.totalWaitTime / totalRequests : 0, averageExecutionTime: totalRequests > 0 ? this.stats.totalExecutionTime / totalRequests : 0, throughput: uptime > 0 ? totalRequests / uptime : 0 }; } /** * Get active requests information */ public getActiveRequests(): Array<{ id: string; toolName: string; duration: number }> { const now = Date.now(); return Array.from(this.activeRequests.entries()).map(([id, info]) => ({ id, toolName: info.toolName, duration: now - info.startTime })); } /** * Get queued requests information */ public getQueuedRequests(): Array<{ id: string; toolName: string; priority: number; waitTime: number }> { const now = Date.now(); return this.requestQueue.map(request => ({ id: request.id, toolName: request.toolName, priority: request.priority, waitTime: now - request.timestamp })); } /** * Cancel a specific request */ public cancelRequest(requestId: string): boolean { // Check if it's an active request if (this.activeRequests.has(requestId)) { // Cannot cancel active requests directly return false; } // Check if it's in the queue const queueIndex = this.requestQueue.findIndex(r => r.id === requestId); if (queueIndex !== -1) { const request = this.requestQueue.splice(queueIndex, 1)[0]; request.reject(new Error('Request cancelled')); this.stats.failed++; logger.debug(`Request cancelled: ${request.toolName}`); return true; } return false; } /** * Clear all queued requests */ public clearQueue(): number { const clearedCount = this.requestQueue.length; this.requestQueue.forEach(request => { request.reject(new Error('Queue cleared')); this.stats.failed++; }); this.requestQueue = []; if (clearedCount > 0) { logger.info(`Cleared ${clearedCount} queued requests`); } return clearedCount; } /** * Update concurrency configuration */ public updateConfig(newConfig: Partial<ConcurrencyConfig>): void { const oldConfig = { ...this.config }; this.config = { ...this.config, ...newConfig }; // If max concurrent requests increased, process more from queue if (newConfig.maxConcurrentRequests && newConfig.maxConcurrentRequests > oldConfig.maxConcurrentRequests) { this.processQueue(); } // If queue size decreased, remove excess requests if (newConfig.maxQueueSize && newConfig.maxQueueSize < this.requestQueue.length) { const excessRequests = this.requestQueue.splice(newConfig.maxQueueSize); excessRequests.forEach(request => { request.reject(new Error('Queue size reduced')); this.stats.failed++; }); } logger.info('Concurrency configuration updated', newConfig); } /** * Get current configuration */ public getConfig(): ConcurrencyConfig { return { ...this.config }; } /** * Generate unique request ID */ private generateRequestId(): string { return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * Reset statistics */ public resetStats(): void { this.stats = { completed: 0, failed: 0, totalWaitTime: 0, totalExecutionTime: 0, startTime: Date.now() }; logger.info('Concurrency statistics reset'); } /** * Get health status */ public getHealthStatus(): { healthy: boolean; issues: string[]; recommendations: string[]; } { const stats = this.getStats(); const issues: string[] = []; const recommendations: string[] = []; // Check queue size if (stats.queuedRequests > this.config.maxQueueSize * 0.8) { issues.push(`Queue is ${((stats.queuedRequests / this.config.maxQueueSize) * 100).toFixed(1)}% full`); recommendations.push('Consider increasing maxConcurrentRequests or maxQueueSize'); } // Check failure rate const totalRequests = stats.completedRequests + stats.failedRequests; if (totalRequests > 0) { const failureRate = stats.failedRequests / totalRequests; if (failureRate > 0.1) { // 10% failure rate issues.push(`High failure rate: ${(failureRate * 100).toFixed(1)}%`); recommendations.push('Check for timeout issues or increase request timeout'); } } // Check average wait time if (stats.averageWaitTime > 5000) { // 5 seconds issues.push(`High average wait time: ${stats.averageWaitTime.toFixed(0)}ms`); recommendations.push('Consider increasing maxConcurrentRequests'); } return { healthy: issues.length === 0, issues, recommendations }; } } // Global concurrency manager instance export const concurrencyManager = new ConcurrencyManager();

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flyanima/open-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server