Skip to main content
Glama
protocol.js50.6 kB
import { safeParse } from '../server/zod-compat.js'; import { CancelledNotificationSchema, CreateTaskResultSchema, ErrorCode, GetTaskRequestSchema, GetTaskResultSchema, GetTaskPayloadRequestSchema, ListTasksRequestSchema, ListTasksResultSchema, CancelTaskRequestSchema, CancelTaskResultSchema, isJSONRPCErrorResponse, isJSONRPCRequest, isJSONRPCResultResponse, isJSONRPCNotification, McpError, PingRequestSchema, ProgressNotificationSchema, RELATED_TASK_META_KEY, TaskStatusNotificationSchema, isTaskAugmentedRequestParams } from '../types.js'; import { isTerminal } from '../experimental/tasks/interfaces.js'; import { getMethodLiteral, parseWithCompat } from '../server/zod-json-schema-compat.js'; /** * The default request timeout, in miliseconds. */ export const DEFAULT_REQUEST_TIMEOUT_MSEC = 60000; /** * Implements MCP protocol framing on top of a pluggable transport, including * features like request/response linking, notifications, and progress. */ export class Protocol { constructor(_options) { this._options = _options; this._requestMessageId = 0; this._requestHandlers = new Map(); this._requestHandlerAbortControllers = new Map(); this._notificationHandlers = new Map(); this._responseHandlers = new Map(); this._progressHandlers = new Map(); this._timeoutInfo = new Map(); this._pendingDebouncedNotifications = new Set(); // Maps task IDs to progress tokens to keep handlers alive after CreateTaskResult this._taskProgressTokens = new Map(); this._requestResolvers = new Map(); this.setNotificationHandler(CancelledNotificationSchema, notification => { this._oncancel(notification); }); this.setNotificationHandler(ProgressNotificationSchema, notification => { this._onprogress(notification); }); this.setRequestHandler(PingRequestSchema, // Automatic pong by default. _request => ({})); // Install task handlers if TaskStore is provided this._taskStore = _options?.taskStore; this._taskMessageQueue = _options?.taskMessageQueue; if (this._taskStore) { this.setRequestHandler(GetTaskRequestSchema, async (request, extra) => { const task = await this._taskStore.getTask(request.params.taskId, extra.sessionId); if (!task) { throw new McpError(ErrorCode.InvalidParams, 'Failed to retrieve task: Task not found'); } // Per spec: tasks/get responses SHALL NOT include related-task metadata // as the taskId parameter is the source of truth // @ts-expect-error SendResultT cannot contain GetTaskResult, but we include it in our derived types everywhere else return { ...task }; }); this.setRequestHandler(GetTaskPayloadRequestSchema, async (request, extra) => { const handleTaskResult = async () => { const taskId = request.params.taskId; // Deliver queued messages if (this._taskMessageQueue) { let queuedMessage; while ((queuedMessage = await this._taskMessageQueue.dequeue(taskId, extra.sessionId))) { // Handle response and error messages by routing them to the appropriate resolver if (queuedMessage.type === 'response' || queuedMessage.type === 'error') { const message = queuedMessage.message; const requestId = message.id; // Lookup resolver in _requestResolvers map const resolver = this._requestResolvers.get(requestId); if (resolver) { // Remove resolver from map after invocation this._requestResolvers.delete(requestId); // Invoke resolver with response or error if (queuedMessage.type === 'response') { resolver(message); } else { // Convert JSONRPCError to McpError const errorMessage = message; const error = new McpError(errorMessage.error.code, errorMessage.error.message, errorMessage.error.data); resolver(error); } } else { // Handle missing resolver gracefully with error logging const messageType = queuedMessage.type === 'response' ? 'Response' : 'Error'; this._onerror(new Error(`${messageType} handler missing for request ${requestId}`)); } // Continue to next message continue; } // Send the message on the response stream by passing the relatedRequestId // This tells the transport to write the message to the tasks/result response stream await this._transport?.send(queuedMessage.message, { relatedRequestId: extra.requestId }); } } // Now check task status const task = await this._taskStore.getTask(taskId, extra.sessionId); if (!task) { throw new McpError(ErrorCode.InvalidParams, `Task not found: ${taskId}`); } // Block if task is not terminal (we've already delivered all queued messages above) if (!isTerminal(task.status)) { // Wait for status change or new messages await this._waitForTaskUpdate(taskId, extra.signal); // After waking up, recursively call to deliver any new messages or result return await handleTaskResult(); } // If task is terminal, return the result if (isTerminal(task.status)) { const result = await this._taskStore.getTaskResult(taskId, extra.sessionId); this._clearTaskQueue(taskId); return { ...result, _meta: { ...result._meta, [RELATED_TASK_META_KEY]: { taskId: taskId } } }; } return await handleTaskResult(); }; return await handleTaskResult(); }); this.setRequestHandler(ListTasksRequestSchema, async (request, extra) => { try { const { tasks, nextCursor } = await this._taskStore.listTasks(request.params?.cursor, extra.sessionId); // @ts-expect-error SendResultT cannot contain ListTasksResult, but we include it in our derived types everywhere else return { tasks, nextCursor, _meta: {} }; } catch (error) { throw new McpError(ErrorCode.InvalidParams, `Failed to list tasks: ${error instanceof Error ? error.message : String(error)}`); } }); this.setRequestHandler(CancelTaskRequestSchema, async (request, extra) => { try { // Get the current task to check if it's in a terminal state, in case the implementation is not atomic const task = await this._taskStore.getTask(request.params.taskId, extra.sessionId); if (!task) { throw new McpError(ErrorCode.InvalidParams, `Task not found: ${request.params.taskId}`); } // Reject cancellation of terminal tasks if (isTerminal(task.status)) { throw new McpError(ErrorCode.InvalidParams, `Cannot cancel task in terminal status: ${task.status}`); } await this._taskStore.updateTaskStatus(request.params.taskId, 'cancelled', 'Client cancelled task execution.', extra.sessionId); this._clearTaskQueue(request.params.taskId); const cancelledTask = await this._taskStore.getTask(request.params.taskId, extra.sessionId); if (!cancelledTask) { // Task was deleted during cancellation (e.g., cleanup happened) throw new McpError(ErrorCode.InvalidParams, `Task not found after cancellation: ${request.params.taskId}`); } return { _meta: {}, ...cancelledTask }; } catch (error) { // Re-throw McpError as-is if (error instanceof McpError) { throw error; } throw new McpError(ErrorCode.InvalidRequest, `Failed to cancel task: ${error instanceof Error ? error.message : String(error)}`); } }); } } async _oncancel(notification) { if (!notification.params.requestId) { return; } // Handle request cancellation const controller = this._requestHandlerAbortControllers.get(notification.params.requestId); controller?.abort(notification.params.reason); } _setupTimeout(messageId, timeout, maxTotalTimeout, onTimeout, resetTimeoutOnProgress = false) { this._timeoutInfo.set(messageId, { timeoutId: setTimeout(onTimeout, timeout), startTime: Date.now(), timeout, maxTotalTimeout, resetTimeoutOnProgress, onTimeout }); } _resetTimeout(messageId) { const info = this._timeoutInfo.get(messageId); if (!info) return false; const totalElapsed = Date.now() - info.startTime; if (info.maxTotalTimeout && totalElapsed >= info.maxTotalTimeout) { this._timeoutInfo.delete(messageId); throw McpError.fromError(ErrorCode.RequestTimeout, 'Maximum total timeout exceeded', { maxTotalTimeout: info.maxTotalTimeout, totalElapsed }); } clearTimeout(info.timeoutId); info.timeoutId = setTimeout(info.onTimeout, info.timeout); return true; } _cleanupTimeout(messageId) { const info = this._timeoutInfo.get(messageId); if (info) { clearTimeout(info.timeoutId); this._timeoutInfo.delete(messageId); } } /** * Attaches to the given transport, starts it, and starts listening for messages. * * The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward. */ async connect(transport) { this._transport = transport; const _onclose = this.transport?.onclose; this._transport.onclose = () => { _onclose?.(); this._onclose(); }; const _onerror = this.transport?.onerror; this._transport.onerror = (error) => { _onerror?.(error); this._onerror(error); }; const _onmessage = this._transport?.onmessage; this._transport.onmessage = (message, extra) => { _onmessage?.(message, extra); if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { this._onresponse(message); } else if (isJSONRPCRequest(message)) { this._onrequest(message, extra); } else if (isJSONRPCNotification(message)) { this._onnotification(message); } else { this._onerror(new Error(`Unknown message type: ${JSON.stringify(message)}`)); } }; await this._transport.start(); } _onclose() { const responseHandlers = this._responseHandlers; this._responseHandlers = new Map(); this._progressHandlers.clear(); this._taskProgressTokens.clear(); this._pendingDebouncedNotifications.clear(); const error = McpError.fromError(ErrorCode.ConnectionClosed, 'Connection closed'); this._transport = undefined; this.onclose?.(); for (const handler of responseHandlers.values()) { handler(error); } } _onerror(error) { this.onerror?.(error); } _onnotification(notification) { const handler = this._notificationHandlers.get(notification.method) ?? this.fallbackNotificationHandler; // Ignore notifications not being subscribed to. if (handler === undefined) { return; } // Starting with Promise.resolve() puts any synchronous errors into the monad as well. Promise.resolve() .then(() => handler(notification)) .catch(error => this._onerror(new Error(`Uncaught error in notification handler: ${error}`))); } _onrequest(request, extra) { const handler = this._requestHandlers.get(request.method) ?? this.fallbackRequestHandler; // Capture the current transport at request time to ensure responses go to the correct client const capturedTransport = this._transport; // Extract taskId from request metadata if present (needed early for method not found case) const relatedTaskId = request.params?._meta?.[RELATED_TASK_META_KEY]?.taskId; if (handler === undefined) { const errorResponse = { jsonrpc: '2.0', id: request.id, error: { code: ErrorCode.MethodNotFound, message: 'Method not found' } }; // Queue or send the error response based on whether this is a task-related request if (relatedTaskId && this._taskMessageQueue) { this._enqueueTaskMessage(relatedTaskId, { type: 'error', message: errorResponse, timestamp: Date.now() }, capturedTransport?.sessionId).catch(error => this._onerror(new Error(`Failed to enqueue error response: ${error}`))); } else { capturedTransport ?.send(errorResponse) .catch(error => this._onerror(new Error(`Failed to send an error response: ${error}`))); } return; } const abortController = new AbortController(); this._requestHandlerAbortControllers.set(request.id, abortController); const taskCreationParams = isTaskAugmentedRequestParams(request.params) ? request.params.task : undefined; const taskStore = this._taskStore ? this.requestTaskStore(request, capturedTransport?.sessionId) : undefined; const fullExtra = { signal: abortController.signal, sessionId: capturedTransport?.sessionId, _meta: request.params?._meta, sendNotification: async (notification) => { // Include related-task metadata if this request is part of a task const notificationOptions = { relatedRequestId: request.id }; if (relatedTaskId) { notificationOptions.relatedTask = { taskId: relatedTaskId }; } await this.notification(notification, notificationOptions); }, sendRequest: async (r, resultSchema, options) => { // Include related-task metadata if this request is part of a task const requestOptions = { ...options, relatedRequestId: request.id }; if (relatedTaskId && !requestOptions.relatedTask) { requestOptions.relatedTask = { taskId: relatedTaskId }; } // Set task status to input_required when sending a request within a task context // Use the taskId from options (explicit) or fall back to relatedTaskId (inherited) const effectiveTaskId = requestOptions.relatedTask?.taskId ?? relatedTaskId; if (effectiveTaskId && taskStore) { await taskStore.updateTaskStatus(effectiveTaskId, 'input_required'); } return await this.request(r, resultSchema, requestOptions); }, authInfo: extra?.authInfo, requestId: request.id, requestInfo: extra?.requestInfo, taskId: relatedTaskId, taskStore: taskStore, taskRequestedTtl: taskCreationParams?.ttl, closeSSEStream: extra?.closeSSEStream, closeStandaloneSSEStream: extra?.closeStandaloneSSEStream }; // Starting with Promise.resolve() puts any synchronous errors into the monad as well. Promise.resolve() .then(() => { // If this request asked for task creation, check capability first if (taskCreationParams) { // Check if the request method supports task creation this.assertTaskHandlerCapability(request.method); } }) .then(() => handler(request, fullExtra)) .then(async (result) => { if (abortController.signal.aborted) { // Request was cancelled return; } const response = { result, jsonrpc: '2.0', id: request.id }; // Queue or send the response based on whether this is a task-related request if (relatedTaskId && this._taskMessageQueue) { await this._enqueueTaskMessage(relatedTaskId, { type: 'response', message: response, timestamp: Date.now() }, capturedTransport?.sessionId); } else { await capturedTransport?.send(response); } }, async (error) => { if (abortController.signal.aborted) { // Request was cancelled return; } const errorResponse = { jsonrpc: '2.0', id: request.id, error: { code: Number.isSafeInteger(error['code']) ? error['code'] : ErrorCode.InternalError, message: error.message ?? 'Internal error', ...(error['data'] !== undefined && { data: error['data'] }) } }; // Queue or send the error response based on whether this is a task-related request if (relatedTaskId && this._taskMessageQueue) { await this._enqueueTaskMessage(relatedTaskId, { type: 'error', message: errorResponse, timestamp: Date.now() }, capturedTransport?.sessionId); } else { await capturedTransport?.send(errorResponse); } }) .catch(error => this._onerror(new Error(`Failed to send response: ${error}`))) .finally(() => { this._requestHandlerAbortControllers.delete(request.id); }); } _onprogress(notification) { const { progressToken, ...params } = notification.params; const messageId = Number(progressToken); const handler = this._progressHandlers.get(messageId); if (!handler) { this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`)); return; } const responseHandler = this._responseHandlers.get(messageId); const timeoutInfo = this._timeoutInfo.get(messageId); if (timeoutInfo && responseHandler && timeoutInfo.resetTimeoutOnProgress) { try { this._resetTimeout(messageId); } catch (error) { // Clean up if maxTotalTimeout was exceeded this._responseHandlers.delete(messageId); this._progressHandlers.delete(messageId); this._cleanupTimeout(messageId); responseHandler(error); return; } } handler(params); } _onresponse(response) { const messageId = Number(response.id); // Check if this is a response to a queued request const resolver = this._requestResolvers.get(messageId); if (resolver) { this._requestResolvers.delete(messageId); if (isJSONRPCResultResponse(response)) { resolver(response); } else { const error = new McpError(response.error.code, response.error.message, response.error.data); resolver(error); } return; } const handler = this._responseHandlers.get(messageId); if (handler === undefined) { this._onerror(new Error(`Received a response for an unknown message ID: ${JSON.stringify(response)}`)); return; } this._responseHandlers.delete(messageId); this._cleanupTimeout(messageId); // Keep progress handler alive for CreateTaskResult responses let isTaskResponse = false; if (isJSONRPCResultResponse(response) && response.result && typeof response.result === 'object') { const result = response.result; if (result.task && typeof result.task === 'object') { const task = result.task; if (typeof task.taskId === 'string') { isTaskResponse = true; this._taskProgressTokens.set(task.taskId, messageId); } } } if (!isTaskResponse) { this._progressHandlers.delete(messageId); } if (isJSONRPCResultResponse(response)) { handler(response); } else { const error = McpError.fromError(response.error.code, response.error.message, response.error.data); handler(error); } } get transport() { return this._transport; } /** * Closes the connection. */ async close() { await this._transport?.close(); } /** * Sends a request and returns an AsyncGenerator that yields response messages. * The generator is guaranteed to end with either a 'result' or 'error' message. * * @example * ```typescript * const stream = protocol.requestStream(request, resultSchema, options); * for await (const message of stream) { * switch (message.type) { * case 'taskCreated': * console.log('Task created:', message.task.taskId); * break; * case 'taskStatus': * console.log('Task status:', message.task.status); * break; * case 'result': * console.log('Final result:', message.result); * break; * case 'error': * console.error('Error:', message.error); * break; * } * } * ``` * * @experimental Use `client.experimental.tasks.requestStream()` to access this method. */ async *requestStream(request, resultSchema, options) { const { task } = options ?? {}; // For non-task requests, just yield the result if (!task) { try { const result = await this.request(request, resultSchema, options); yield { type: 'result', result }; } catch (error) { yield { type: 'error', error: error instanceof McpError ? error : new McpError(ErrorCode.InternalError, String(error)) }; } return; } // For task-augmented requests, we need to poll for status // First, make the request to create the task let taskId; try { // Send the request and get the CreateTaskResult const createResult = await this.request(request, CreateTaskResultSchema, options); // Extract taskId from the result if (createResult.task) { taskId = createResult.task.taskId; yield { type: 'taskCreated', task: createResult.task }; } else { throw new McpError(ErrorCode.InternalError, 'Task creation did not return a task'); } // Poll for task completion while (true) { // Get current task status const task = await this.getTask({ taskId }, options); yield { type: 'taskStatus', task }; // Check if task is terminal if (isTerminal(task.status)) { if (task.status === 'completed') { // Get the final result const result = await this.getTaskResult({ taskId }, resultSchema, options); yield { type: 'result', result }; } else if (task.status === 'failed') { yield { type: 'error', error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`) }; } else if (task.status === 'cancelled') { yield { type: 'error', error: new McpError(ErrorCode.InternalError, `Task ${taskId} was cancelled`) }; } return; } // When input_required, call tasks/result to deliver queued messages // (elicitation, sampling) via SSE and block until terminal if (task.status === 'input_required') { const result = await this.getTaskResult({ taskId }, resultSchema, options); yield { type: 'result', result }; return; } // Wait before polling again const pollInterval = task.pollInterval ?? this._options?.defaultTaskPollInterval ?? 1000; await new Promise(resolve => setTimeout(resolve, pollInterval)); // Check if cancelled options?.signal?.throwIfAborted(); } } catch (error) { yield { type: 'error', error: error instanceof McpError ? error : new McpError(ErrorCode.InternalError, String(error)) }; } } /** * Sends a request and waits for a response. * * Do not use this method to emit notifications! Use notification() instead. */ request(request, resultSchema, options) { const { relatedRequestId, resumptionToken, onresumptiontoken, task, relatedTask } = options ?? {}; // Send the request return new Promise((resolve, reject) => { const earlyReject = (error) => { reject(error); }; if (!this._transport) { earlyReject(new Error('Not connected')); return; } if (this._options?.enforceStrictCapabilities === true) { try { this.assertCapabilityForMethod(request.method); // If task creation is requested, also check task capabilities if (task) { this.assertTaskCapability(request.method); } } catch (e) { earlyReject(e); return; } } options?.signal?.throwIfAborted(); const messageId = this._requestMessageId++; const jsonrpcRequest = { ...request, jsonrpc: '2.0', id: messageId }; if (options?.onprogress) { this._progressHandlers.set(messageId, options.onprogress); jsonrpcRequest.params = { ...request.params, _meta: { ...(request.params?._meta || {}), progressToken: messageId } }; } // Augment with task creation parameters if provided if (task) { jsonrpcRequest.params = { ...jsonrpcRequest.params, task: task }; } // Augment with related task metadata if relatedTask is provided if (relatedTask) { jsonrpcRequest.params = { ...jsonrpcRequest.params, _meta: { ...(jsonrpcRequest.params?._meta || {}), [RELATED_TASK_META_KEY]: relatedTask } }; } const cancel = (reason) => { this._responseHandlers.delete(messageId); this._progressHandlers.delete(messageId); this._cleanupTimeout(messageId); this._transport ?.send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: messageId, reason: String(reason) } }, { relatedRequestId, resumptionToken, onresumptiontoken }) .catch(error => this._onerror(new Error(`Failed to send cancellation: ${error}`))); // Wrap the reason in an McpError if it isn't already const error = reason instanceof McpError ? reason : new McpError(ErrorCode.RequestTimeout, String(reason)); reject(error); }; this._responseHandlers.set(messageId, response => { if (options?.signal?.aborted) { return; } if (response instanceof Error) { return reject(response); } try { const parseResult = safeParse(resultSchema, response.result); if (!parseResult.success) { // Type guard: if success is false, error is guaranteed to exist reject(parseResult.error); } else { resolve(parseResult.data); } } catch (error) { reject(error); } }); options?.signal?.addEventListener('abort', () => { cancel(options?.signal?.reason); }); const timeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC; const timeoutHandler = () => cancel(McpError.fromError(ErrorCode.RequestTimeout, 'Request timed out', { timeout })); this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false); // Queue request if related to a task const relatedTaskId = relatedTask?.taskId; if (relatedTaskId) { // Store the response resolver for this request so responses can be routed back const responseResolver = (response) => { const handler = this._responseHandlers.get(messageId); if (handler) { handler(response); } else { // Log error when resolver is missing, but don't fail this._onerror(new Error(`Response handler missing for side-channeled request ${messageId}`)); } }; this._requestResolvers.set(messageId, responseResolver); this._enqueueTaskMessage(relatedTaskId, { type: 'request', message: jsonrpcRequest, timestamp: Date.now() }).catch(error => { this._cleanupTimeout(messageId); reject(error); }); // Don't send through transport - queued messages are delivered via tasks/result only // This prevents duplicate delivery for bidirectional transports } else { // No related task - send through transport normally this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => { this._cleanupTimeout(messageId); reject(error); }); } }); } /** * Gets the current status of a task. * * @experimental Use `client.experimental.tasks.getTask()` to access this method. */ async getTask(params, options) { // @ts-expect-error SendRequestT cannot directly contain GetTaskRequest, but we ensure all type instantiations contain it anyways return this.request({ method: 'tasks/get', params }, GetTaskResultSchema, options); } /** * Retrieves the result of a completed task. * * @experimental Use `client.experimental.tasks.getTaskResult()` to access this method. */ async getTaskResult(params, resultSchema, options) { // @ts-expect-error SendRequestT cannot directly contain GetTaskPayloadRequest, but we ensure all type instantiations contain it anyways return this.request({ method: 'tasks/result', params }, resultSchema, options); } /** * Lists tasks, optionally starting from a pagination cursor. * * @experimental Use `client.experimental.tasks.listTasks()` to access this method. */ async listTasks(params, options) { // @ts-expect-error SendRequestT cannot directly contain ListTasksRequest, but we ensure all type instantiations contain it anyways return this.request({ method: 'tasks/list', params }, ListTasksResultSchema, options); } /** * Cancels a specific task. * * @experimental Use `client.experimental.tasks.cancelTask()` to access this method. */ async cancelTask(params, options) { // @ts-expect-error SendRequestT cannot directly contain CancelTaskRequest, but we ensure all type instantiations contain it anyways return this.request({ method: 'tasks/cancel', params }, CancelTaskResultSchema, options); } /** * Emits a notification, which is a one-way message that does not expect a response. */ async notification(notification, options) { if (!this._transport) { throw new Error('Not connected'); } this.assertNotificationCapability(notification.method); // Queue notification if related to a task const relatedTaskId = options?.relatedTask?.taskId; if (relatedTaskId) { // Build the JSONRPC notification with metadata const jsonrpcNotification = { ...notification, jsonrpc: '2.0', params: { ...notification.params, _meta: { ...(notification.params?._meta || {}), [RELATED_TASK_META_KEY]: options.relatedTask } } }; await this._enqueueTaskMessage(relatedTaskId, { type: 'notification', message: jsonrpcNotification, timestamp: Date.now() }); // Don't send through transport - queued messages are delivered via tasks/result only // This prevents duplicate delivery for bidirectional transports return; } const debouncedMethods = this._options?.debouncedNotificationMethods ?? []; // A notification can only be debounced if it's in the list AND it's "simple" // (i.e., has no parameters and no related request ID or related task that could be lost). const canDebounce = debouncedMethods.includes(notification.method) && !notification.params && !options?.relatedRequestId && !options?.relatedTask; if (canDebounce) { // If a notification of this type is already scheduled, do nothing. if (this._pendingDebouncedNotifications.has(notification.method)) { return; } // Mark this notification type as pending. this._pendingDebouncedNotifications.add(notification.method); // Schedule the actual send to happen in the next microtask. // This allows all synchronous calls in the current event loop tick to be coalesced. Promise.resolve().then(() => { // Un-mark the notification so the next one can be scheduled. this._pendingDebouncedNotifications.delete(notification.method); // SAFETY CHECK: If the connection was closed while this was pending, abort. if (!this._transport) { return; } let jsonrpcNotification = { ...notification, jsonrpc: '2.0' }; // Augment with related task metadata if relatedTask is provided if (options?.relatedTask) { jsonrpcNotification = { ...jsonrpcNotification, params: { ...jsonrpcNotification.params, _meta: { ...(jsonrpcNotification.params?._meta || {}), [RELATED_TASK_META_KEY]: options.relatedTask } } }; } // Send the notification, but don't await it here to avoid blocking. // Handle potential errors with a .catch(). this._transport?.send(jsonrpcNotification, options).catch(error => this._onerror(error)); }); // Return immediately. return; } let jsonrpcNotification = { ...notification, jsonrpc: '2.0' }; // Augment with related task metadata if relatedTask is provided if (options?.relatedTask) { jsonrpcNotification = { ...jsonrpcNotification, params: { ...jsonrpcNotification.params, _meta: { ...(jsonrpcNotification.params?._meta || {}), [RELATED_TASK_META_KEY]: options.relatedTask } } }; } await this._transport.send(jsonrpcNotification, options); } /** * Registers a handler to invoke when this protocol object receives a request with the given method. * * Note that this will replace any previous request handler for the same method. */ setRequestHandler(requestSchema, handler) { const method = getMethodLiteral(requestSchema); this.assertRequestHandlerCapability(method); this._requestHandlers.set(method, (request, extra) => { const parsed = parseWithCompat(requestSchema, request); return Promise.resolve(handler(parsed, extra)); }); } /** * Removes the request handler for the given method. */ removeRequestHandler(method) { this._requestHandlers.delete(method); } /** * Asserts that a request handler has not already been set for the given method, in preparation for a new one being automatically installed. */ assertCanSetRequestHandler(method) { if (this._requestHandlers.has(method)) { throw new Error(`A request handler for ${method} already exists, which would be overridden`); } } /** * Registers a handler to invoke when this protocol object receives a notification with the given method. * * Note that this will replace any previous notification handler for the same method. */ setNotificationHandler(notificationSchema, handler) { const method = getMethodLiteral(notificationSchema); this._notificationHandlers.set(method, notification => { const parsed = parseWithCompat(notificationSchema, notification); return Promise.resolve(handler(parsed)); }); } /** * Removes the notification handler for the given method. */ removeNotificationHandler(method) { this._notificationHandlers.delete(method); } /** * Cleans up the progress handler associated with a task. * This should be called when a task reaches a terminal status. */ _cleanupTaskProgressHandler(taskId) { const progressToken = this._taskProgressTokens.get(taskId); if (progressToken !== undefined) { this._progressHandlers.delete(progressToken); this._taskProgressTokens.delete(taskId); } } /** * Enqueues a task-related message for side-channel delivery via tasks/result. * @param taskId The task ID to associate the message with * @param message The message to enqueue * @param sessionId Optional session ID for binding the operation to a specific session * @throws Error if taskStore is not configured or if enqueue fails (e.g., queue overflow) * * Note: If enqueue fails, it's the TaskMessageQueue implementation's responsibility to handle * the error appropriately (e.g., by failing the task, logging, etc.). The Protocol layer * simply propagates the error. */ async _enqueueTaskMessage(taskId, message, sessionId) { // Task message queues are only used when taskStore is configured if (!this._taskStore || !this._taskMessageQueue) { throw new Error('Cannot enqueue task message: taskStore and taskMessageQueue are not configured'); } const maxQueueSize = this._options?.maxTaskQueueSize; await this._taskMessageQueue.enqueue(taskId, message, sessionId, maxQueueSize); } /** * Clears the message queue for a task and rejects any pending request resolvers. * @param taskId The task ID whose queue should be cleared * @param sessionId Optional session ID for binding the operation to a specific session */ async _clearTaskQueue(taskId, sessionId) { if (this._taskMessageQueue) { // Reject any pending request resolvers const messages = await this._taskMessageQueue.dequeueAll(taskId, sessionId); for (const message of messages) { if (message.type === 'request' && isJSONRPCRequest(message.message)) { // Extract request ID from the message const requestId = message.message.id; const resolver = this._requestResolvers.get(requestId); if (resolver) { resolver(new McpError(ErrorCode.InternalError, 'Task cancelled or completed')); this._requestResolvers.delete(requestId); } else { // Log error when resolver is missing during cleanup for better observability this._onerror(new Error(`Resolver missing for request ${requestId} during task ${taskId} cleanup`)); } } } } } /** * Waits for a task update (new messages or status change) with abort signal support. * Uses polling to check for updates at the task's configured poll interval. * @param taskId The task ID to wait for * @param signal Abort signal to cancel the wait * @returns Promise that resolves when an update occurs or rejects if aborted */ async _waitForTaskUpdate(taskId, signal) { // Get the task's poll interval, falling back to default let interval = this._options?.defaultTaskPollInterval ?? 1000; try { const task = await this._taskStore?.getTask(taskId); if (task?.pollInterval) { interval = task.pollInterval; } } catch { // Use default interval if task lookup fails } return new Promise((resolve, reject) => { if (signal.aborted) { reject(new McpError(ErrorCode.InvalidRequest, 'Request cancelled')); return; } // Wait for the poll interval, then resolve so caller can check for updates const timeoutId = setTimeout(resolve, interval); // Clean up timeout and reject if aborted signal.addEventListener('abort', () => { clearTimeout(timeoutId); reject(new McpError(ErrorCode.InvalidRequest, 'Request cancelled')); }, { once: true }); }); } requestTaskStore(request, sessionId) { const taskStore = this._taskStore; if (!taskStore) { throw new Error('No task store configured'); } return { createTask: async (taskParams) => { if (!request) { throw new Error('No request provided'); } return await taskStore.createTask(taskParams, request.id, { method: request.method, params: request.params }, sessionId); }, getTask: async (taskId) => { const task = await taskStore.getTask(taskId, sessionId); if (!task) { throw new McpError(ErrorCode.InvalidParams, 'Failed to retrieve task: Task not found'); } return task; }, storeTaskResult: async (taskId, status, result) => { await taskStore.storeTaskResult(taskId, status, result, sessionId); // Get updated task state and send notification const task = await taskStore.getTask(taskId, sessionId); if (task) { const notification = TaskStatusNotificationSchema.parse({ method: 'notifications/tasks/status', params: task }); await this.notification(notification); if (isTerminal(task.status)) { this._cleanupTaskProgressHandler(taskId); // Don't clear queue here - it will be cleared after delivery via tasks/result } } }, getTaskResult: taskId => { return taskStore.getTaskResult(taskId, sessionId); }, updateTaskStatus: async (taskId, status, statusMessage) => { // Check if task exists const task = await taskStore.getTask(taskId, sessionId); if (!task) { throw new McpError(ErrorCode.InvalidParams, `Task "${taskId}" not found - it may have been cleaned up`); } // Don't allow transitions from terminal states if (isTerminal(task.status)) { throw new McpError(ErrorCode.InvalidParams, `Cannot update task "${taskId}" from terminal status "${task.status}" to "${status}". Terminal states (completed, failed, cancelled) cannot transition to other states.`); } await taskStore.updateTaskStatus(taskId, status, statusMessage, sessionId); // Get updated task state and send notification const updatedTask = await taskStore.getTask(taskId, sessionId); if (updatedTask) { const notification = TaskStatusNotificationSchema.parse({ method: 'notifications/tasks/status', params: updatedTask }); await this.notification(notification); if (isTerminal(updatedTask.status)) { this._cleanupTaskProgressHandler(taskId); // Don't clear queue here - it will be cleared after delivery via tasks/result } } }, listTasks: cursor => { return taskStore.listTasks(cursor, sessionId); } }; } } function isPlainObject(value) { return value !== null && typeof value === 'object' && !Array.isArray(value); } export function mergeCapabilities(base, additional) { const result = { ...base }; for (const key in additional) { const k = key; const addValue = additional[k]; if (addValue === undefined) continue; const baseValue = result[k]; if (isPlainObject(baseValue) && isPlainObject(addValue)) { result[k] = { ...baseValue, ...addValue }; } else { result[k] = addValue; } } return result; } //# sourceMappingURL=protocol.js.map

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Valerio357/bet-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server