Skip to main content
Glama
GrpcClientFactory.ts7.9 kB
import * as grpc from "@grpc/grpc-js"; import { CancellationToken } from "../../Common/CancellationToken.js"; import { ListenOnlyTypedEvent, TypedEvent } from "../../Common/EventHandler.js"; import { obj2String } from "../../Common/Json.js"; import { Logger } from "../../Common/Logger.js"; import { ErrorStatusObject } from "../ProtocolBuffer/ProtocolBuffer.js"; import { ProtocolBufferPackageDefinition } from "../ProtocolBuffer/ProtocolBufferPackageDefinition.js"; import { ClientCallOptions, IGrpcClient } from "./GrpcClient.js"; export interface IGrpcClientFactory { create( address: string, serviceName: string, proto: ProtocolBufferPackageDefinition, option?: grpc.ClientOptions, ): IGrpcClient; } type ClientCallCallback = (error: grpc.StatusObject | undefined, response: unknown | undefined) => void; type ClientCall = (request: unknown, metadata: grpc.Metadata | {}, callback: ClientCallCallback) => { cancel(): void }; type ServiceClient = { [methodName: string]: ClientCall | Function; close(): void; }; class GrpcClient implements IGrpcClient { private isClosed = false; private readonly client: ServiceClient; private _closed: TypedEvent = new TypedEvent(); get closed(): ListenOnlyTypedEvent { return this._closed; } constructor( readonly address: string, readonly serviceName: string, readonly proto: ProtocolBufferPackageDefinition, readonly option?: grpc.ClientOptions, ) { if (!this.checkProto(serviceName, proto)) { throw new Error(`Not contains service: ${serviceName} ${this.getProtoInformation(proto)}`); } this.client = this.createClient(address, serviceName, proto, option); } callAsync( method: string, request: unknown, headers?: Record<string, string>, options?: ClientCallOptions, token?: CancellationToken, ): Promise<any | ErrorStatusObject> { if (this.isClosed) { throw new Error("Client closed."); } if (!this.checkClient(method)) { throw new Error(`Not contains method: ${method} ${this.getProtoInformation()}`); } const callInfo = this.proto.protocolBuffer.packageName + "." + this.serviceName + "/" + method; Logger.debug("GrpcClient call -> " + callInfo); Logger.debug("GrpcClient request -> " + obj2String(request)); Logger.debug("GrpcClient request header -> " + obj2String(headers)); const methodFunc = this.client[method] as ClientCall; return new Promise((resolve, reject) => { let timerId: NodeJS.Timeout; try { const call = methodFunc.call( this.client, request, this.createMetaData(headers, options), (error?: grpc.StatusObject, value?: unknown) => { let response; if (error) { response = { code: error.code, details: error.details, }; } else { response = value; } Logger.debug("GrpcClient response -> " + obj2String(response)); clearInterval(timerId); resolve(response); }, ); if (token) { timerId = setInterval(() => { if (token.isCancellationRequested) { clearInterval(timerId); call.cancel(); const message = "GrpcClient cancel request -> " + callInfo; Logger.debug(message); reject(new Error(message)); } }, 100); } } catch (e) { const message = `GrpcClient call failed. address: ${this.address} service: ${this.serviceName} ${this.getProtoInformation()} \r\n ${e}`; Logger.warn(message); Logger.warn(e); reject(new Error(message)); } }); } close(): void { if (this.isClosed) { return; } this.isClosed = true; try { this.client.close(); } catch (e) { Logger.warn(e); } finally { this._closed.emit(undefined); } } private checkProto(serviceName: string, proto: ProtocolBufferPackageDefinition): boolean { if (!Object.keys(proto.packageDefinition).includes(proto.protocolBuffer.packageName + "." + serviceName)) { return false; } return true; } private checkClient(method: string): boolean { if (typeof this.client[method] !== "function") { return false; } return true; } private getProtoInformation(proto?: ProtocolBufferPackageDefinition): string { const p = proto ?? this.proto; return `(path: ${p.protocolBuffer.metadata.protoPath}, name: ${p.protocolBuffer.metadata.protoFileName})`; } private createClient( address: string, serviceName: string, proto: ProtocolBufferPackageDefinition, option?: grpc.ClientOptions, ): ServiceClient { const serviceDefinition = proto.packageDefinition[ proto.protocolBuffer.packageName + "." + serviceName ] as grpc.ServiceDefinition; const clientConstructor = grpc.makeClientConstructor(serviceDefinition, serviceName); return new clientConstructor( address, this.createCredentials(option), this.createOptions(address, option), ) as unknown as ServiceClient; } private createMetaData(headers?: Record<string, string>, options?: ClientCallOptions): grpc.Metadata { const metadata = new grpc.Metadata(); if (headers) { const metaHeaders = new grpc.Metadata(); Object.keys(headers).forEach((k) => metaHeaders.add(k, headers[k])); metadata.merge(metaHeaders); } if (options?.deadlineMsec) { metadata.set( "deadline", new Date().setMilliseconds(new Date().getMilliseconds() + options?.deadlineMsec).toString(), ); } return metadata; } private createCredentials(option?: grpc.ClientOptions): grpc.ChannelCredentials { if (option?.SSL) { return grpc.credentials.createSsl(); } else { return grpc.credentials.createInsecure(); } } private createOptions(address: string, option?: grpc.ClientOptions): grpc.ChannelOptions { let options: grpc.ChannelOptions = {}; const httpProxy = process.env.HTTPS_PROXY ?? process.env.HTTP_PROXY; if (httpProxy) { options = { ...options, "grpc.enable_http_proxy": 0, // "grpc.http_proxy": httpProxy, }; } // if (option?.SSL) { // options = { // ...options, // "grpc.ssl_target_name_override": address, // "grpc.default_authority": address, // }; // } return options; } } class GrpcClientFactory implements IGrpcClientFactory { create( address: string, serviceName: string, proto: ProtocolBufferPackageDefinition, option?: grpc.ClientOptions, ): IGrpcClient { return new GrpcClient(address, serviceName, proto, option); } } export const grpcClientFactory: IGrpcClientFactory = new GrpcClientFactory();

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Yuki4-dev/grpc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server