Skip to main content
Glama

Curupira

by drzln
client.ts11.7 kB
/** * @fileoverview Protocol client that connects JSON-RPC/MCP with transports * * This file provides the integration between the protocol layer and transport layer, * handling message serialization, deserialization, and routing. */ import { EventEmitter } from 'events' import type { Transport, TransportMessage, TransportEvent, ConnectionState } from '../transport/index.js' import type { JsonRpcMessage, JsonRpcBatch, ProtocolMessage, ProtocolConfig, JsonRpcRequest, JsonRpcResponse } from './types.js' import { JsonRpcProtocol } from './jsonrpc.js' import { McpProtocol } from './mcp.js' import { createRequestId, createSessionId, createTimestamp, createJsonRpcMethod, type RequestId, type SessionId } from '../types/index.js' import { NetworkErrors, ProtocolErrors } from '../errors/index.js' import type { Logger } from '../logging/index.js' import { createLogger } from '../logging/index.js' /** * Protocol client configuration */ export interface ProtocolClientConfig extends ProtocolConfig { /** Transport to use */ transport: Transport /** Session ID */ sessionId?: SessionId /** Enable automatic reconnection handling */ autoReconnect?: boolean /** Message queue size for offline buffering */ messageQueueSize?: number } /** * Protocol client that integrates protocol with transport */ export class ProtocolClient extends EventEmitter { private readonly config: ProtocolClientConfig private readonly transport: Transport private readonly protocol: JsonRpcProtocol | McpProtocol private readonly logger: Logger private readonly sessionId: SessionId private readonly messageQueue: ProtocolMessage<JsonRpcMessage | JsonRpcBatch>[] = [] private isConnected = false constructor( protocol: JsonRpcProtocol | McpProtocol, config: ProtocolClientConfig ) { super() this.config = config this.transport = config.transport this.protocol = protocol this.sessionId = config.sessionId || createSessionId(Math.random().toString(36).substring(2, 15)) this.logger = createLogger({ level: config.debug ? 'debug' : 'info', // SimpleLoggerConfig }) // Set up transport event handlers this.setupTransportHandlers() // Set up protocol send handler this.setupProtocolHandlers() } /** * Get connection state */ get connectionState(): ConnectionState { return this.transport.state } /** * Get protocol instance */ get protocolInstance(): JsonRpcProtocol | McpProtocol { return this.protocol } /** * Connect to transport */ async connect(): Promise<void> { await this.transport.connect() } /** * Disconnect from transport */ async disconnect(reason?: string): Promise<void> { await this.transport.disconnect(reason) } /** * Send a request */ async request<TParams = unknown, TResult = unknown>( method: string, params?: TParams, options?: { timeout?: number signal?: AbortSignal } ): Promise<TResult> { if (!this.isConnected) { throw NetworkErrors.notConnected('Protocol client is not connected') } return this.protocol.request<TParams, TResult>(createJsonRpcMethod(method), params, options) } /** * Send a notification */ async notify<TParams = unknown>( method: string, params?: TParams ): Promise<void> { if (!this.isConnected) { throw NetworkErrors.notConnected('Protocol client is not connected') } return this.protocol.notify(createJsonRpcMethod(method), params) } /** * Set up transport event handlers */ private setupTransportHandlers(): void { // Handle connection events this.transport.on('connected', () => { this.isConnected = true this.logger.info('Transport connected') this.emit('connected') // Flush message queue this.flushMessageQueue() }) this.transport.on('disconnected', (event: TransportEvent) => { this.isConnected = false this.logger.info({ event }, 'Transport disconnected') this.emit('disconnected', event) }) this.transport.on('error', (event: TransportEvent) => { if (event.type === 'error') { this.logger.error({ error: event.error }, 'Transport error') this.emit('error', event.error) } }) // Handle incoming messages this.transport.onMessage(async (transportMessage: TransportMessage) => { try { await this.handleIncomingMessage(transportMessage) } catch (error) { this.logger.error( { error, transportMessage }, 'Error handling incoming message' ) } }) } /** * Set up protocol send handler */ private setupProtocolHandlers(): void { // Override protocol's sendMessage to use transport (this.protocol as any).sendMessage = async ( message: JsonRpcMessage | JsonRpcBatch ): Promise<void> => { await this.sendProtocolMessage(message) } } /** * Handle incoming transport message */ private async handleIncomingMessage( transportMessage: TransportMessage ): Promise<void> { try { // Extract protocol message const protocolMessage = transportMessage.payload as ProtocolMessage // Validate protocol message if (!protocolMessage?.payload) { throw ProtocolErrors.invalidMessage( 'Invalid protocol message structure', transportMessage ) } // Handle the JSON-RPC message await this.protocol.handleMessage(protocolMessage.payload) } catch (error) { this.logger.error( { error, transportMessage }, 'Failed to process incoming message' ) // Try to send error response if it was a request if (transportMessage.payload && typeof transportMessage.payload === 'object' && 'id' in transportMessage.payload) { const errorResponse: JsonRpcResponse = { jsonrpc: '2.0', id: (transportMessage.payload as any).id, error: { code: -32700, message: 'Parse error', data: error instanceof Error ? error.message : 'Unknown error' } } await this.sendProtocolMessage(errorResponse).catch(err => { this.logger.error({ err }, 'Failed to send error response') }) } } } /** * Send protocol message via transport */ private async sendProtocolMessage( message: JsonRpcMessage | JsonRpcBatch ): Promise<void> { const protocolMessage: ProtocolMessage<JsonRpcMessage | JsonRpcBatch> = { id: createRequestId(Date.now().toString() + '-' + Math.random().toString(36).substring(2, 9)), sessionId: this.sessionId, timestamp: createTimestamp(), version: this.config.version || '2.0', payload: message, metadata: {} } const transportMessage: TransportMessage<ProtocolMessage<JsonRpcMessage | JsonRpcBatch>> = { id: protocolMessage.id, timestamp: protocolMessage.timestamp, payload: protocolMessage, metadata: { sessionId: this.sessionId } } if (this.isConnected) { try { await this.transport.send(transportMessage) } catch (error) { this.logger.error({ error, message }, 'Failed to send message') // Queue message if configured if (this.config.autoReconnect && this.config.messageQueueSize) { this.queueMessage(protocolMessage) } throw error } } else if (this.config.autoReconnect && this.config.messageQueueSize) { // Queue message for later delivery this.queueMessage(protocolMessage) } else { throw NetworkErrors.notConnected('Transport is not connected') } } /** * Queue message for later delivery */ private queueMessage(message: ProtocolMessage<JsonRpcMessage | JsonRpcBatch>): void { if (this.messageQueue.length >= (this.config.messageQueueSize || 100)) { // Remove oldest message this.messageQueue.shift() } this.messageQueue.push(message) this.logger.debug( { queueSize: this.messageQueue.length }, 'Message queued for delivery' ) } /** * Flush queued messages */ private async flushMessageQueue(): Promise<void> { if (this.messageQueue.length === 0) { return } this.logger.info( { count: this.messageQueue.length }, 'Flushing message queue' ) const messages = [...this.messageQueue] this.messageQueue.length = 0 for (const message of messages) { try { const transportMessage: TransportMessage<ProtocolMessage<JsonRpcMessage | JsonRpcBatch>> = { id: message.id, timestamp: message.timestamp, payload: message, metadata: { sessionId: this.sessionId } } await this.transport.send(transportMessage) } catch (error) { this.logger.error( { error, message }, 'Failed to send queued message' ) } } } /** * Destroy the client */ async destroy(): Promise<void> { await this.disconnect('Client destroyed') this.removeAllListeners() this.protocol.clear() } } /** * Create a protocol client for JSON-RPC */ export function createJsonRpcClient( config: ProtocolClientConfig ): ProtocolClient { const protocol = new JsonRpcProtocol(config) return new ProtocolClient(protocol, config) } /** * Create a protocol client for MCP */ export function createMcpClient( name: string, version: string, config: ProtocolClientConfig ): ProtocolClient { const protocol = new McpProtocol({ ...config, name, version }) return new ProtocolClient(protocol, config) } /** * Protocol client builder for convenient setup */ export class ProtocolClientBuilder { private config: Partial<ProtocolClientConfig> = { autoReconnect: true, messageQueueSize: 100 } private protocolType: 'jsonrpc' | 'mcp' = 'jsonrpc' private mcpName?: string private mcpVersion?: string /** * Use JSON-RPC protocol */ useJsonRpc(): this { this.protocolType = 'jsonrpc' return this } /** * Use MCP protocol */ useMcp(name: string, version: string): this { this.protocolType = 'mcp' this.mcpName = name this.mcpVersion = version return this } /** * Set transport */ withTransport(transport: Transport): this { this.config.transport = transport return this } /** * Set session ID */ withSessionId(sessionId: SessionId): this { this.config.sessionId = sessionId return this } /** * Enable/disable auto-reconnect */ withAutoReconnect(enabled: boolean, queueSize?: number): this { this.config.autoReconnect = enabled if (queueSize !== undefined) { this.config.messageQueueSize = queueSize } return this } /** * Set protocol configuration */ withProtocolConfig(config: Partial<ProtocolConfig>): this { Object.assign(this.config, config) return this } /** * Build protocol client */ build(): ProtocolClient { if (!this.config.transport) { throw new Error('Transport is required') } const fullConfig = this.config as ProtocolClientConfig if (this.protocolType === 'mcp') { if (!this.mcpName || !this.mcpVersion) { throw new Error('MCP name and version are required') } return createMcpClient(this.mcpName, this.mcpVersion, fullConfig) } return createJsonRpcClient(fullConfig) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/drzln/curupira'

If you have feedback or need assistance with the MCP directory API, please join our Discord server