Skip to main content
Glama
taskManager.ts11.4 kB
/** * MCP Task Manager * Handles creation, tracking, and lifecycle management of tasks * Based on MCP Specification 2025-11-25 */ import { randomUUID } from 'crypto'; import { decodeCursor, encodeCursor } from '../types/pagination.js'; import { DEFAULT_POLL_INTERVAL, DEFAULT_TASK_TTL, MAX_TASK_TTL, TASK_PAGE_SIZE, TERMINAL_STATUSES, type CancelTaskParams, type CreateTaskResult, type GetTaskParams, type GetTaskResultParams, type ListTasksParams, type ListTasksResult, type StoredTask, type Task, type TaskParams, type TaskStatus, } from '../types/task.js'; /** * Task execution function type * Returns a promise that resolves with the result or rejects with an error */ export type TaskExecutor<T = unknown> = () => Promise<T>; /** * Task status change callback */ export type TaskStatusCallback = (task: Task) => void | Promise<void>; /** * TaskManager handles the lifecycle of long-running tasks */ export class TaskManager { private tasks: Map<string, StoredTask> = new Map(); private statusCallbacks: TaskStatusCallback[] = []; private cleanupInterval: NodeJS.Timeout | null = null; constructor() { // Start periodic cleanup of expired tasks this.startCleanupInterval(); } /** * Register a callback for task status changes */ onStatusChange(callback: TaskStatusCallback): void { this.statusCallbacks.push(callback); } /** * Remove a status change callback */ offStatusChange(callback: TaskStatusCallback): void { const index = this.statusCallbacks.indexOf(callback); if (index !== -1) { this.statusCallbacks.splice(index, 1); } } /** * Create a new task for a request */ createTask( method: string, requestParams: unknown, taskParams?: TaskParams, progressToken?: string | number ): CreateTaskResult { const taskId = randomUUID(); const now = new Date().toISOString(); // Calculate TTL (bounded by MAX_TASK_TTL) const requestedTtl = taskParams?.ttl ?? DEFAULT_TASK_TTL; const ttl = Math.min(requestedTtl, MAX_TASK_TTL); const storedTask: StoredTask = { taskId, status: 'working', createdAt: now, lastUpdatedAt: now, ttl, pollInterval: DEFAULT_POLL_INTERVAL, method, requestParams, progressToken, }; this.tasks.set(taskId, storedTask); return { task: this.toPublicTask(storedTask), }; } /** * Execute a task asynchronously */ async executeTask<T>( taskId: string, executor: TaskExecutor<T> ): Promise<void> { const task = this.tasks.get(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } try { const result = await executor(); await this.completeTask(taskId, result); } catch (error) { await this.failTask(taskId, error); } } /** * Get a task by ID */ getTask(params: GetTaskParams): Task | null { const task = this.tasks.get(params.taskId); if (!task) { return null; } // Check if task has expired if (this.isExpired(task)) { this.tasks.delete(params.taskId); return null; } return this.toPublicTask(task); } /** * Get the result of a completed task * Returns the result if completed, error if failed, null if not found * For non-terminal statuses, returns the task with current status */ getTaskResult(params: GetTaskResultParams): { result?: unknown; error?: { code: number; message: string; data?: unknown }; task?: Task; isTerminal: boolean; } | null { const storedTask = this.tasks.get(params.taskId); if (!storedTask) { return null; } // Check if task has expired if (this.isExpired(storedTask)) { this.tasks.delete(params.taskId); return null; } const isTerminal = TERMINAL_STATUSES.includes(storedTask.status); if (storedTask.status === 'completed') { return { result: storedTask.result, isTerminal: true, }; } if (storedTask.status === 'failed') { return { error: storedTask.error ?? { code: -32603, message: 'Task failed with unknown error', }, isTerminal: true, }; } if (storedTask.status === 'cancelled') { return { error: { code: -32603, message: 'Task was cancelled', }, isTerminal: true, }; } // For non-terminal statuses (working, input_required) return { task: this.toPublicTask(storedTask), isTerminal: false, }; } /** * List all tasks with pagination */ listTasks(params: ListTasksParams): ListTasksResult { // Get all non-expired tasks sorted by creation time (newest first) const allTasks = Array.from(this.tasks.values()) .filter((task) => !this.isExpired(task)) .sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()); // Clean up expired tasks while we're at it this.cleanupExpiredTasks(); let offset = 0; if (params.cursor) { const cursorData = decodeCursor(params.cursor); if (cursorData) { offset = cursorData.offset; } } const paginatedTasks = allTasks.slice(offset, offset + TASK_PAGE_SIZE); const hasMore = offset + TASK_PAGE_SIZE < allTasks.length; const result: ListTasksResult = { tasks: paginatedTasks.map((t) => this.toPublicTask(t)), }; if (hasMore) { result.nextCursor = encodeCursor({ offset: offset + TASK_PAGE_SIZE, limit: TASK_PAGE_SIZE, }); } return result; } /** * Cancel a task */ async cancelTask(params: CancelTaskParams): Promise<Task | null> { const task = this.tasks.get(params.taskId); if (!task) { return null; } // Cannot cancel tasks in terminal status if (TERMINAL_STATUSES.includes(task.status)) { throw new Error(`Cannot cancel task: already in terminal status '${task.status}'`); } task.status = 'cancelled'; task.statusMessage = 'The task was cancelled by request.'; task.lastUpdatedAt = new Date().toISOString(); await this.notifyStatusChange(task); return this.toPublicTask(task); } /** * Update task status */ async updateTaskStatus( taskId: string, status: TaskStatus, statusMessage?: string ): Promise<Task | null> { const task = this.tasks.get(taskId); if (!task) { return null; } // Cannot transition from terminal status if (TERMINAL_STATUSES.includes(task.status)) { return null; } task.status = status; if (statusMessage !== undefined) { task.statusMessage = statusMessage; } task.lastUpdatedAt = new Date().toISOString(); await this.notifyStatusChange(task); return this.toPublicTask(task); } /** * Mark a task as requiring input */ async setInputRequired(taskId: string, message?: string): Promise<Task | null> { return this.updateTaskStatus(taskId, 'input_required', message ?? 'Input required to continue'); } /** * Resume a task from input_required status */ async resumeTask(taskId: string): Promise<Task | null> { const task = this.tasks.get(taskId); if (!task || task.status !== 'input_required') { return null; } return this.updateTaskStatus(taskId, 'working', 'Task resumed'); } /** * Complete a task with a result */ private async completeTask(taskId: string, result: unknown): Promise<void> { const task = this.tasks.get(taskId); if (!task) { return; } task.status = 'completed'; task.result = result; task.lastUpdatedAt = new Date().toISOString(); delete task.statusMessage; await this.notifyStatusChange(task); } /** * Fail a task with an error */ private async failTask(taskId: string, error: unknown): Promise<void> { const task = this.tasks.get(taskId); if (!task) { return; } task.status = 'failed'; task.lastUpdatedAt = new Date().toISOString(); if (error instanceof Error) { task.statusMessage = error.message; task.error = { code: -32603, message: error.message, }; } else if (typeof error === 'object' && error !== null) { const errObj = error as { code?: number; message?: string; data?: unknown }; task.error = { code: errObj.code ?? -32603, message: errObj.message ?? 'Unknown error', data: errObj.data, }; task.statusMessage = task.error.message; } else { task.error = { code: -32603, message: String(error), }; task.statusMessage = task.error.message; } await this.notifyStatusChange(task); } /** * Notify all registered callbacks of a status change */ private async notifyStatusChange(task: StoredTask): Promise<void> { const publicTask = this.toPublicTask(task); for (const callback of this.statusCallbacks) { try { await callback(publicTask); } catch (error) { console.error('Error in task status callback:', error); } } } /** * Convert a stored task to a public task (without internal fields) */ private toPublicTask(task: StoredTask): Task { return { taskId: task.taskId, status: task.status, statusMessage: task.statusMessage, createdAt: task.createdAt, lastUpdatedAt: task.lastUpdatedAt, ttl: task.ttl, pollInterval: task.pollInterval, }; } /** * Check if a task has expired */ private isExpired(task: StoredTask): boolean { const createdAt = new Date(task.createdAt).getTime(); const now = Date.now(); return now - createdAt > task.ttl; } /** * Clean up expired tasks */ private cleanupExpiredTasks(): void { for (const [taskId, task] of this.tasks) { if (this.isExpired(task)) { this.tasks.delete(taskId); } } } /** * Start periodic cleanup of expired tasks */ private startCleanupInterval(): void { // Clean up every minute this.cleanupInterval = setInterval(() => { this.cleanupExpiredTasks(); }, 60000); // Ensure the interval doesn't prevent the process from exiting if (this.cleanupInterval.unref) { this.cleanupInterval.unref(); } } /** * Stop the cleanup interval and clear all tasks */ dispose(): void { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } this.tasks.clear(); this.statusCallbacks = []; } /** * Get the number of active (non-expired) tasks */ getActiveTaskCount(): number { this.cleanupExpiredTasks(); return this.tasks.size; } /** * Check if a task exists and is not expired */ hasTask(taskId: string): boolean { const task = this.tasks.get(taskId); if (!task) { return false; } if (this.isExpired(task)) { this.tasks.delete(taskId); return false; } return true; } /** * Get the stored task (internal use only) */ getStoredTask(taskId: string): StoredTask | null { const task = this.tasks.get(taskId); if (!task || this.isExpired(task)) { return null; } return task; } } // Export singleton instance export const taskManager = new TaskManager();

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssdeanx/ssd-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server