Skip to main content
Glama
task-streamer.ts15.1 kB
/** * Task Streaming Service * * Implements task streaming to agents with queuing, load balancing, * and real-time task distribution capabilities. */ import { AtomicTask } from '../types/task.js'; import { ProjectContext } from '../types/project-context.js'; import { AgentOrchestrator } from './agent-orchestrator.js'; import { TaskScheduler } from './task-scheduler.js'; import { AppError, ValidationError } from '../../../utils/errors.js'; import { MemoryManager } from '../../code-map-generator/cache/memoryManager.js'; import logger from '../../../logger.js'; /** * Task stream configuration */ export interface StreamConfig { batchSize: number; streamInterval: number; maxQueueSize: number; priorityThreshold: number; enableRealTimeStreaming: boolean; loadBalancingEnabled: boolean; } /** * Task stream status */ export interface StreamStatus { isActive: boolean; queuedTasks: number; streamedTasks: number; failedTasks: number; averageStreamTime: number; lastStreamAt?: Date; } /** * Task claim information */ export interface TaskClaim { taskId: string; agentId: string; claimedAt: Date; expiresAt: Date; status: 'claimed' | 'released' | 'expired'; } /** * Stream event types */ export type StreamEvent = | 'task_queued' | 'task_streamed' | 'task_claimed' | 'task_released' | 'stream_started' | 'stream_stopped' | 'queue_full' | 'agent_unavailable'; /** * Stream event data */ export interface StreamEventData { event: StreamEvent; taskId?: string; agentId?: string; timestamp: Date; metadata?: Record<string, unknown>; } /** * Task Streaming Service */ export class TaskStreamer { private static instance: TaskStreamer | null = null; private taskQueue: Array<{ task: AtomicTask; context: ProjectContext; epicTitle?: string; priority: number }> = []; private claims = new Map<string, TaskClaim>(); private agentOrchestrator: AgentOrchestrator; private taskScheduler: TaskScheduler; private memoryManager: MemoryManager; private config: StreamConfig; private streamTimer?: NodeJS.Timeout; private status: StreamStatus; private eventListeners = new Map<StreamEvent, Array<(data: StreamEventData) => void>>(); private constructor(config?: Partial<StreamConfig>) { this.config = { batchSize: 3, // Reduced from 5 for faster task distribution streamInterval: 2000, // Reduced from 5000ms for more responsive feedback maxQueueSize: 1000, priorityThreshold: 2, // High priority and above enableRealTimeStreaming: true, loadBalancingEnabled: true, ...config }; this.agentOrchestrator = AgentOrchestrator.getInstance(); this.taskScheduler = new TaskScheduler(); this.memoryManager = new MemoryManager(); this.status = { isActive: false, queuedTasks: 0, streamedTasks: 0, failedTasks: 0, averageStreamTime: 0 }; logger.info({ config: this.config }, 'Task streamer initialized'); } /** * Get singleton instance */ static getInstance(config?: Partial<StreamConfig>): TaskStreamer { if (!TaskStreamer.instance) { TaskStreamer.instance = new TaskStreamer(config); } return TaskStreamer.instance; } /** * Start task streaming */ async startStreaming(): Promise<void> { try { if (this.status.isActive) { logger.warn('Task streaming already active'); return; } this.status.isActive = true; this.status.lastStreamAt = new Date(); if (this.config.enableRealTimeStreaming) { this.streamTimer = setInterval(() => { this.processTaskQueue().catch(error => { logger.error({ err: error }, 'Error in task streaming interval'); }); }, this.config.streamInterval); } this.emitEvent('stream_started', {}); logger.info('Task streaming started'); } catch (error) { logger.error({ err: error }, 'Failed to start task streaming'); throw new AppError('Task streaming startup failed', { cause: error }); } } /** * Stop task streaming */ async stopStreaming(): Promise<void> { try { this.status.isActive = false; if (this.streamTimer) { clearInterval(this.streamTimer); this.streamTimer = undefined; } // Release all active claims await this.releaseAllClaims(); this.emitEvent('stream_stopped', {}); logger.info('Task streaming stopped'); } catch (error) { logger.error({ err: error }, 'Failed to stop task streaming'); throw new AppError('Task streaming shutdown failed', { cause: error }); } } /** * Queue task for streaming */ async queueTask( task: AtomicTask, context: ProjectContext, epicTitle?: string ): Promise<void> { try { if (this.taskQueue.length >= this.config.maxQueueSize) { this.emitEvent('queue_full', { taskId: task.id }); throw new ValidationError('Task queue is full'); } const priority = this.calculateTaskPriority(task); const queueItem = { task, context, epicTitle, priority }; // Insert task in priority order this.insertTaskByPriority(queueItem); this.status.queuedTasks = this.taskQueue.length; this.emitEvent('task_queued', { taskId: task.id }); logger.debug({ taskId: task.id, priority, queueSize: this.taskQueue.length }, 'Task queued for streaming'); // Trigger immediate processing for high-priority tasks if (priority <= this.config.priorityThreshold && this.status.isActive) { await this.processTaskQueue(); } // Memory management this.memoryManager.getMemoryStats(); } catch (error) { logger.error({ err: error, taskId: task.id }, 'Failed to queue task'); throw new AppError('Task queuing failed', { cause: error }); } } /** * Claim a task for execution */ async claimTask(taskId: string, agentId: string): Promise<TaskClaim | null> { try { // Check if task exists in queue const queueIndex = this.taskQueue.findIndex(item => item.task.id === taskId); if (queueIndex === -1) { logger.warn({ taskId, agentId }, 'Attempted to claim non-existent task'); return null; } // Check if already claimed const existingClaim = this.claims.get(taskId); if (existingClaim && existingClaim.status === 'claimed') { logger.warn({ taskId, agentId, existingAgent: existingClaim.agentId }, 'Task already claimed'); return null; } // Create claim const claim: TaskClaim = { taskId, agentId, claimedAt: new Date(), expiresAt: new Date(Date.now() + 30 * 60 * 1000), // 30 minutes status: 'claimed' }; this.claims.set(taskId, claim); this.emitEvent('task_claimed', { taskId, agentId }); logger.info({ taskId, agentId }, 'Task claimed by agent'); return claim; } catch (error) { logger.error({ err: error, taskId, agentId }, 'Failed to claim task'); throw new AppError('Task claim failed', { cause: error }); } } /** * Release a task claim */ async releaseTask(taskId: string, agentId: string): Promise<void> { try { const claim = this.claims.get(taskId); if (!claim) { logger.warn({ taskId, agentId }, 'Attempted to release non-existent claim'); return; } if (claim.agentId !== agentId) { logger.warn({ taskId, agentId, claimOwner: claim.agentId }, 'Attempted to release claim owned by different agent'); return; } claim.status = 'released'; this.claims.delete(taskId); this.emitEvent('task_released', { taskId, agentId }); logger.info({ taskId, agentId }, 'Task claim released'); } catch (error) { logger.error({ err: error, taskId, agentId }, 'Failed to release task'); throw new AppError('Task release failed', { cause: error }); } } /** * Get ready tasks for streaming */ async getReadyTasks(limit?: number): Promise<AtomicTask[]> { try { const batchSize = limit || this.config.batchSize; const readyTasks: AtomicTask[] = []; // Get tasks that are not claimed and dependencies are met for (const queueItem of this.taskQueue.slice(0, batchSize * 2)) { if (readyTasks.length >= batchSize) break; const task = queueItem.task; // Skip if already claimed const claim = this.claims.get(task.id); if (claim && claim.status === 'claimed') continue; // Check if dependencies are satisfied const dependenciesMet = await this.checkDependencies(task); if (!dependenciesMet) continue; readyTasks.push(task); } return readyTasks; } catch (error) { logger.error({ err: error }, 'Failed to get ready tasks'); throw new AppError('Ready tasks retrieval failed', { cause: error }); } } /** * Get stream status */ getStatus(): StreamStatus { return { ...this.status, queuedTasks: this.taskQueue.length }; } /** * Get task queue information */ getQueueInfo(): { totalTasks: number; highPriorityTasks: number; claimedTasks: number; oldestTaskAge: number; } { const now = new Date(); const highPriorityTasks = this.taskQueue.filter(item => item.priority <= this.config.priorityThreshold).length; const claimedTasks = Array.from(this.claims.values()).filter(claim => claim.status === 'claimed').length; let oldestTaskAge = 0; if (this.taskQueue.length > 0) { const oldestTask = this.taskQueue[this.taskQueue.length - 1]; oldestTaskAge = now.getTime() - new Date(oldestTask.task.createdAt).getTime(); } return { totalTasks: this.taskQueue.length, highPriorityTasks, claimedTasks, oldestTaskAge }; } /** * Add event listener */ addEventListener(event: StreamEvent, listener: (data: StreamEventData) => void): void { if (!this.eventListeners.has(event)) { this.eventListeners.set(event, []); } this.eventListeners.get(event)!.push(listener); } /** * Remove event listener */ removeEventListener(event: StreamEvent, listener: (data: StreamEventData) => void): void { const listeners = this.eventListeners.get(event); if (listeners) { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); } } } /** * Process task queue and stream to agents */ private async processTaskQueue(): Promise<void> { try { if (!this.status.isActive || this.taskQueue.length === 0) { return; } const startTime = Date.now(); const readyTasks = await this.getReadyTasks(); if (readyTasks.length === 0) { return; } let streamedCount = 0; for (const task of readyTasks) { const queueItem = this.taskQueue.find(item => item.task.id === task.id); if (!queueItem) continue; try { // Attempt to assign to agent const assignment = await this.agentOrchestrator.assignTask( task, queueItem.context, queueItem.epicTitle ); if (assignment) { // Remove from queue this.removeTaskFromQueue(task.id); streamedCount++; this.emitEvent('task_streamed', { taskId: task.id, agentId: assignment.agentId }); logger.debug({ taskId: task.id, agentId: assignment.agentId }, 'Task streamed to agent'); } else { this.emitEvent('agent_unavailable', { taskId: task.id }); } } catch (error) { logger.error({ err: error, taskId: task.id }, 'Failed to stream task'); this.status.failedTasks++; } } // Update statistics if (streamedCount > 0) { this.status.streamedTasks += streamedCount; this.status.lastStreamAt = new Date(); const streamTime = Date.now() - startTime; this.status.averageStreamTime = (this.status.averageStreamTime + streamTime) / 2; } this.status.queuedTasks = this.taskQueue.length; } catch (error) { logger.error({ err: error }, 'Error processing task queue'); } } /** * Calculate task priority */ private calculateTaskPriority(task: AtomicTask): number { const priorityMap: Record<string, number> = { 'critical': 1, 'high': 2, 'medium': 3, 'low': 4 }; return priorityMap[task.priority.toLowerCase()] || 3; } /** * Insert task in priority order */ private insertTaskByPriority(queueItem: { task: AtomicTask; context: ProjectContext; epicTitle?: string; priority: number }): void { let insertIndex = this.taskQueue.length; for (let i = 0; i < this.taskQueue.length; i++) { if (queueItem.priority < this.taskQueue[i].priority) { insertIndex = i; break; } } this.taskQueue.splice(insertIndex, 0, queueItem); } /** * Remove task from queue */ private removeTaskFromQueue(taskId: string): void { const index = this.taskQueue.findIndex(item => item.task.id === taskId); if (index > -1) { this.taskQueue.splice(index, 1); } } /** * Check if task dependencies are satisfied */ private async checkDependencies(task: AtomicTask): Promise<boolean> { if (!task.dependencies || task.dependencies.length === 0) { return true; } // For now, assume dependencies are satisfied // In a full implementation, this would check actual dependency status return true; } /** * Release all active claims */ private async releaseAllClaims(): Promise<void> { for (const [taskId, claim] of this.claims.entries()) { if (claim.status === 'claimed') { claim.status = 'released'; this.emitEvent('task_released', { taskId, agentId: claim.agentId }); } } this.claims.clear(); } /** * Emit stream event */ private emitEvent(event: StreamEvent, data: Partial<StreamEventData>): void { const eventData: StreamEventData = { event, timestamp: new Date(), ...data }; const listeners = this.eventListeners.get(event); if (listeners) { listeners.forEach(listener => { try { listener(eventData); } catch (error) { logger.error({ err: error, event }, 'Error in stream event listener'); } }); } } /** * Cleanup resources */ destroy(): void { this.stopStreaming().catch(error => { logger.error({ err: error }, 'Error stopping streaming during destroy'); }); this.taskQueue = []; this.claims.clear(); this.eventListeners.clear(); TaskStreamer.instance = null; logger.info('Task streamer destroyed'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server