Skip to main content
Glama

Curupira

by drzln
jsonrpc.ts16.4 kB
/** * @fileoverview JSON-RPC 2.0 protocol implementation * * This file provides a complete implementation of the JSON-RPC 2.0 protocol * with support for batching, notifications, and error handling. */ import { EventEmitter } from 'events' import type { JsonRpcRequest, JsonRpcNotification, JsonRpcResponse, JsonRpcError, JsonRpcMessage, JsonRpcBatch, JsonRpcErrorCode, RequestHandler, NotificationHandler, RequestContext, ProtocolConfig, ProtocolStats, ProtocolMiddleware, ProtocolCapabilities, ProtocolEvent, JSON_RPC_VERSION } from './types.js' import { createJsonRpcId, createRequestId, createSessionId, createTimestamp, isJsonRpcId, type JsonRpcId, type JsonRpcMethod } from '../types/index.js' import { ProtocolErrors, ValidationErrors } from '../errors/index.js' import type { Logger } from '../logging/index.js' import { createLogger } from '../logging/index.js' /** * Default protocol configuration */ const DEFAULT_CONFIG: ProtocolConfig = { version: '2.0', requestTimeout: 60000, // 1 minute batching: true, maxBatchSize: 100, cancellation: true, progress: true, strictMode: true, debug: false } /** * JSON-RPC 2.0 protocol implementation */ export class JsonRpcProtocol extends EventEmitter { private readonly config: Required<ProtocolConfig> private readonly logger: Logger private readonly handlers = new Map<string, RequestHandler>() private readonly notificationHandlers = new Map<string, NotificationHandler>() private readonly pendingRequests = new Map<JsonRpcId, PendingRequest<any>>() private readonly middlewares: ProtocolMiddleware[] = [] private readonly sessionId = createSessionId(Math.random().toString(36).substring(2, 15)) private _stats: ProtocolStats = { requestsSent: 0, requestsReceived: 0, responsesSent: 0, responsesReceived: 0, notificationsSent: 0, notificationsReceived: 0, errors: 0, pendingRequests: 0 } constructor(config: ProtocolConfig = {}) { super() this.config = { ...DEFAULT_CONFIG, ...config } as Required<ProtocolConfig> this.logger = createLogger({ level: config.debug ? 'debug' : 'info', // SimpleLoggerConfig }) } /** * Get protocol capabilities */ get capabilities(): ProtocolCapabilities { return { batch: this.config.batching, notifications: true, cancellation: this.config.cancellation, progress: this.config.progress, maxBatchSize: this.config.maxBatchSize, versions: ['2.0'] } } /** * Get protocol statistics */ get stats(): ProtocolStats { return { ...this._stats, pendingRequests: this.pendingRequests.size } } /** * Register a request handler */ registerHandler(method: string, handler: RequestHandler): void { if (this.handlers.has(method)) { throw ValidationErrors.invalidConfiguration( `Handler already registered for method: ${method}` ) } this.handlers.set(method, handler) this.logger.debug({ method }, 'Registered request handler') } /** * Unregister a request handler */ unregisterHandler(method: string): void { this.handlers.delete(method) this.logger.debug({ method }, 'Unregistered request handler') } /** * Register a notification handler */ registerNotificationHandler( method: string, handler: NotificationHandler ): void { if (this.notificationHandlers.has(method)) { throw ValidationErrors.invalidConfiguration( `Notification handler already registered for method: ${method}` ) } this.notificationHandlers.set(method, handler) this.logger.debug({ method }, 'Registered notification handler') } /** * Send a request and wait for response */ async request<TParams = unknown, TResult = unknown>( method: JsonRpcMethod, params?: TParams, options: { timeout?: number signal?: AbortSignal } = {} ): Promise<TResult> { const id = createJsonRpcId(Date.now().toString()) const request: JsonRpcRequest<TParams> = { jsonrpc: '2.0', id, method, params } this._stats.requestsSent++ // Create pending request const pending = new PendingRequest<TResult>( id, options.timeout || this.config.requestTimeout ) this.pendingRequests.set(id, pending) // Handle cancellation if (options.signal) { options.signal.addEventListener('abort', () => { this.cancelRequest(id) }) } try { // Apply middleware and send await this.sendMessage(request) // Wait for response const result = await pending.promise this._stats.responsesReceived++ return result } catch (error) { this._stats.errors++ throw error } finally { this.pendingRequests.delete(id) } } /** * Send a notification (no response expected) */ async notify<TParams = unknown>( method: JsonRpcMethod, params?: TParams ): Promise<void> { const notification: JsonRpcNotification<TParams> = { jsonrpc: '2.0', method, params } this._stats.notificationsSent++ await this.sendMessage(notification) } /** * Send a batch of requests/notifications */ async batch( messages: Array<{ method: JsonRpcMethod params?: unknown isNotification?: boolean }> ): Promise<Array<unknown | void>> { if (!this.config.batching) { throw ProtocolErrors.unsupportedOperation('Batching is not enabled') } if (messages.length > this.config.maxBatchSize) { throw ValidationErrors.outOfRange( 'batch size', messages.length, 1, this.config.maxBatchSize ) } const batch: JsonRpcBatch = [] const pendingResults: Array<Promise<unknown> | void> = [] for (const msg of messages) { if (msg.isNotification) { batch.push({ jsonrpc: '2.0', method: msg.method, params: msg.params } as JsonRpcNotification) pendingResults.push(undefined) this._stats.notificationsSent++ } else { const id = createJsonRpcId(Date.now().toString()) const request: JsonRpcRequest = { jsonrpc: '2.0', id, method: msg.method, params: msg.params } batch.push(request) const pending = new PendingRequest(id, this.config.requestTimeout) this.pendingRequests.set(id, pending) pendingResults.push(pending.promise) this._stats.requestsSent++ } } await this.sendMessage(batch) // Wait for all responses const results = await Promise.allSettled(pendingResults) return results.map(result => { if (result.status === 'fulfilled') { return result.value } else { throw result.reason } }) } /** * Handle incoming message */ async handleMessage(message: JsonRpcMessage | JsonRpcBatch): Promise<void> { try { if (Array.isArray(message)) { await this.handleBatch(message) } else if ('method' in message) { if ('id' in message) { await this.handleRequest(message as JsonRpcRequest) } else { await this.handleNotification(message as JsonRpcNotification) } } else { await this.handleResponse(message as JsonRpcResponse) } } catch (error) { this._stats.errors++ this.logger.error({ error, message }, 'Error handling message') // Send error response if it was a request if (!Array.isArray(message) && 'id' in message && message.id) { const errorResponse = this.createErrorResponse( message.id, error instanceof Error ? error : new Error('Unknown error') ) await this.sendMessage(errorResponse) } } } /** * Cancel a pending request */ cancelRequest(id: JsonRpcId): void { const pending = this.pendingRequests.get(id) if (pending) { pending.cancel() this.pendingRequests.delete(id) this.emit('protocol:event', { type: 'cancelled', requestId: id } as ProtocolEvent) } } /** * Use middleware */ use(middleware: ProtocolMiddleware): void { this.middlewares.push(middleware) } /** * Clear all handlers */ clear(): void { this.handlers.clear() this.notificationHandlers.clear() // Cancel all pending requests for (const [id, pending] of this.pendingRequests) { pending.cancel() } this.pendingRequests.clear() } /** * Handle incoming request */ private async handleRequest(request: JsonRpcRequest): Promise<void> { this._stats.requestsReceived++ const context: RequestContext = { requestId: request.id, sessionId: this.sessionId, method: request.method, metadata: {} } this.emit('protocol:event', { type: 'request', request, context } as ProtocolEvent) try { // Check if handler exists const handler = this.handlers.get(request.method) if (!handler) { if (this.config.strictMode) { throw this.createJsonRpcError( -32601, `Method not found: ${request.method}` ) } return } // Apply middleware and execute handler const result = await this.executeHandler(handler, request, context) // Send success response const response: JsonRpcResponse = { jsonrpc: '2.0', id: request.id, result } await this.sendMessage(response) this._stats.responsesSent++ } catch (error) { // Send error response const errorResponse = this.createErrorResponse(request.id, error) await this.sendMessage(errorResponse) this._stats.responsesSent++ this._stats.errors++ } } /** * Handle incoming notification */ private async handleNotification( notification: JsonRpcNotification ): Promise<void> { this._stats.notificationsReceived++ const context: RequestContext = { requestId: createJsonRpcId(Date.now().toString()), // Dummy ID for notifications sessionId: this.sessionId, method: notification.method, metadata: {} } this.emit('protocol:event', { type: 'notification', notification, context } as ProtocolEvent) try { const handler = this.notificationHandlers.get(notification.method) if (handler) { await this.executeNotificationHandler(handler, notification, context) } else if (this.config.strictMode) { this.logger.warn( { method: notification.method }, 'No handler for notification' ) } } catch (error) { this._stats.errors++ this.logger.error( { error, notification }, 'Error handling notification' ) } } /** * Handle incoming response */ private async handleResponse(response: JsonRpcResponse): Promise<void> { if (!response.id) { this.logger.warn('Received response without ID') return } const pending = this.pendingRequests.get(response.id) if (!pending) { this.logger.warn( { id: response.id }, 'Received response for unknown request' ) return } this.emit('protocol:event', { type: 'response', response, requestId: response.id } as ProtocolEvent) if ('error' in response) { pending.reject( ProtocolErrors.remoteError( response.error.message, response.error.code, response.error.data ) ) } else { pending.resolve(response.result) } } /** * Handle batch of messages */ private async handleBatch(batch: JsonRpcBatch): Promise<void> { this.emit('protocol:event', { type: 'batch', messages: batch } as ProtocolEvent) const responses: JsonRpcMessage[] = [] for (const message of batch) { if ('method' in message) { if ('id' in message) { // Request - collect response try { await this.handleRequest(message as JsonRpcRequest) } catch (error) { responses.push( this.createErrorResponse( (message as JsonRpcRequest).id, error ) ) } } else { // Notification - no response await this.handleNotification(message as JsonRpcNotification) } } else { // Response await this.handleResponse(message as JsonRpcResponse) } } // Send batch response if there are any if (responses.length > 0) { await this.sendMessage(responses as JsonRpcBatch) } } /** * Execute request handler with middleware */ private async executeHandler( handler: RequestHandler, request: JsonRpcRequest, context: RequestContext ): Promise<unknown> { let index = 0 const middlewares = [...this.middlewares] const next = async (): Promise<unknown> => { if (index >= middlewares.length) { return handler(request.params, context) } const middleware = middlewares[index++] if (middleware.handleRequest) { return middleware.handleRequest(request, context, next) } return next() } return next() } /** * Execute notification handler */ private async executeNotificationHandler( handler: NotificationHandler, notification: JsonRpcNotification, context: RequestContext ): Promise<void> { await handler(notification.params, context) } /** * Create error response */ private createErrorResponse( id: JsonRpcId | null, error: unknown ): JsonRpcResponse { let jsonRpcError: JsonRpcError if (this.config.errorTransformer) { jsonRpcError = this.config.errorTransformer(error) } else if (error instanceof Error && 'code' in error) { jsonRpcError = { code: (error as any).code || -32603, message: error.message, data: (error as any).data } } else if (error instanceof Error) { jsonRpcError = { code: -32603, message: error.message } } else { jsonRpcError = { code: -32603, message: 'Internal error' } } return { jsonrpc: '2.0', id, error: jsonRpcError } } /** * Create JSON-RPC error */ private createJsonRpcError( code: JsonRpcErrorCode | number, message: string, data?: unknown ): Error { const error = new Error(message) as any error.code = code error.data = data return error } /** * Send message (to be implemented by subclasses) */ protected async sendMessage( message: JsonRpcMessage | JsonRpcBatch ): Promise<void> { // This method should be overridden by transport-specific implementations this.emit('send', message) } } /** * Pending request tracker */ class PendingRequest<T = unknown> { public promise: Promise<T> private resolveFunc!: (value: T) => void private rejectFunc!: (error: Error) => void private timeout?: NodeJS.Timeout constructor( public readonly id: JsonRpcId, timeoutMs: number ) { this.promise = new Promise((resolve, reject) => { this.resolveFunc = resolve this.rejectFunc = reject }) // Set timeout this.timeout = setTimeout(() => { this.rejectFunc( ProtocolErrors.timeout(`Request ${id} timed out after ${timeoutMs}ms`) ) }, timeoutMs) } cancel(): void { if (this.timeout) { clearTimeout(this.timeout) } this.rejectFunc( ProtocolErrors.cancelled(`Request ${this.id} was cancelled`) ) } resolve(value: T): void { if (this.timeout) { clearTimeout(this.timeout) } this.resolveFunc(value) } reject(error: Error): void { if (this.timeout) { clearTimeout(this.timeout) } this.rejectFunc(error) } } /** * Create JSON-RPC protocol instance */ export function createJsonRpcProtocol( config?: ProtocolConfig ): JsonRpcProtocol { return new JsonRpcProtocol(config) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/drzln/curupira'

If you have feedback or need assistance with the MCP directory API, please join our Discord server