Skip to main content
Glama
engine-runner-socket.ts6.84 kB
import { assertNotNullOrUndefined, EngineResponse, EngineSocketEvent, EngineStderr, EngineStdout, isNil, SendFlowResponseRequest, UpdateRunProgressRequest } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { type Socket, Server as SocketIOServer } from 'socket.io' let io: SocketIOServer | null = null const sockets: Record<string, Socket> = {} const resolvePromises: Record<string, (value: boolean) => void> = {} export const engineRunnerSocket = (log: FastifyBaseLogger) => { return { async init(): Promise<void> { try { io = new SocketIOServer({ path: '/worker/ws', maxHttpBufferSize: 1e8, }) io.listen(12345) io.on('connection', (socket: Socket) => { const workerId = socket.handshake.auth['workerId'] as string log.info('Client connected to engine socket server ' + workerId) // Clean up any existing socket for this workerId if (sockets[workerId]) { this.cleanupSocket(workerId) } sockets[workerId] = socket if (!isNil(resolvePromises[workerId])) { resolvePromises[workerId](true) // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete resolvePromises[workerId] } socket.on('disconnect', () => { log.info({ workerId }, 'Client disconnected from engine socket server') this.cleanupSocket(workerId) }) socket.on('error', (error) => { log.error({ error, workerId }, 'Socket error occurred') this.cleanupSocket(workerId) }) }) process.on('SIGTERM', () => { void this.disconnect() }) process.on('SIGINT', () => { void this.disconnect() }) } catch (error) { log.error({ error }, 'Failed to initialize socket server') throw error } }, isConnected(workerId: string): boolean { const socket = sockets[workerId] return !isNil(socket) && socket.connected }, cleanupSocket(workerId: string): void { const socket = sockets[workerId] if (socket) { socket.removeAllListeners() socket.disconnect() // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete sockets[workerId] // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete resolvePromises[workerId] } }, async waitForConnect(workerId: string): Promise<boolean> { if (!isNil(sockets[workerId])) { return sockets[workerId].connected } const promise = new Promise<boolean>((resolve) => { const timeout = setTimeout(() => { // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete resolvePromises[workerId] resolve(false) }, 30000) resolvePromises[workerId] = (value: boolean): void => { // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete resolvePromises[workerId] clearTimeout(timeout) resolve(value) } }) return promise }, send(workerId: string, message: unknown): void { const socket = sockets[workerId] assertNotNullOrUndefined(socket, 'socket') if (!socket.connected) { throw new Error(`Socket for worker ${workerId} is not connected`) } socket.emit(EngineSocketEvent.ENGINE_OPERATION, message) }, subscribe({ workerId, onResult, onStdout, onStderr, updateRunProgress, sendFlowResponse, }: { workerId: string onResult: (result: EngineResponse<unknown>) => void onStdout: (stdout: EngineStdout) => void onStderr: (stderr: EngineStderr) => void updateRunProgress: (data: UpdateRunProgressRequest, log: FastifyBaseLogger) => Promise<void> sendFlowResponse: (data: SendFlowResponseRequest, log: FastifyBaseLogger) => Promise<void> }): void { const socket = sockets[workerId] assertNotNullOrUndefined(socket, 'sockets[workerId]') // Remove any existing listeners before adding new ones this.unsubscribe(workerId) socket.on(EngineSocketEvent.ENGINE_RESPONSE, (data: EngineResponse<unknown>, callback: () => void) => { onResult(data) callback?.() }) socket.on(EngineSocketEvent.ENGINE_STDOUT, (data: EngineStdout) => { onStdout(data) }) socket.on(EngineSocketEvent.ENGINE_STDERR, (data: EngineStderr, callback: () => void) => { onStderr(data) callback?.() }) socket.on(EngineSocketEvent.UPDATE_RUN_PROGRESS, async (data: UpdateRunProgressRequest, callback: () => void) => { await updateRunProgress(data, log) callback() }) socket.on(EngineSocketEvent.SEND_FLOW_RESPONSE, async (data: SendFlowResponseRequest, callback: () => void) => { await sendFlowResponse(data, log) callback() }) }, unsubscribe(workerId: string): void { const socket = sockets[workerId] if (socket) { socket.removeAllListeners(EngineSocketEvent.ENGINE_RESPONSE) socket.removeAllListeners(EngineSocketEvent.ENGINE_STDOUT) socket.removeAllListeners(EngineSocketEvent.ENGINE_STDERR) socket.removeAllListeners(EngineSocketEvent.UPDATE_RUN_PROGRESS) socket.removeAllListeners(EngineSocketEvent.SEND_FLOW_RESPONSE) } }, async disconnect(): Promise<void> { if (io) { // Clean up all sockets Object.keys(sockets).forEach(workerId => { this.cleanupSocket(workerId) }) await io.close() io = null } }, } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server