Skip to main content
Glama
worker-socket.ts3.67 kB
import { inspect } from 'util' import { emitWithAck, EngineGenericError, EngineOperation, EngineOperationType, EngineResponse, EngineResponseStatus, EngineSocketEvent, EngineStderr, EngineStdout, ERROR_MESSAGES_TO_REDACT, isNil, } from '@activepieces/shared' import { io, type Socket } from 'socket.io-client' import { execute } from './operations' import { utils } from './utils' const WORKER_ID = process.env.WORKER_ID const WS_URL = 'ws://127.0.0.1:12345' let socket: Socket | undefined async function executeFromSocket(operation: EngineOperation, operationType: EngineOperationType): Promise<void> { const result = await execute(operationType, operation) const resultParsed = JSON.parse(JSON.stringify(result)) await workerSocket.sendToWorkerWithAck(EngineSocketEvent.ENGINE_RESPONSE, resultParsed) } export const workerSocket = { init: (): void => { if (isNil(WORKER_ID)) { throw new EngineGenericError('WorkerIdNotSetError', 'WORKER_ID environment variable is not set') } socket = io(WS_URL, { path: '/worker/ws', auth: { workerId: WORKER_ID, }, autoConnect: true, reconnection: true, }) // Redirect console.log/error to socket const originalLog = console.log console.log = function (...args): void { const engineStdout: EngineStdout = { message: args.join(' ') + '\n', } socket?.emit(EngineSocketEvent.ENGINE_STDOUT, engineStdout) originalLog.apply(console, args) } const originalError = console.error console.error = function (...args): void { let sanitizedArgs = [...args] if (typeof args[0] === 'string' && ERROR_MESSAGES_TO_REDACT.some(errorMessage => args[0].includes(errorMessage))) { sanitizedArgs = [sanitizedArgs[0], 'REDACTED'] } const engineStderr: EngineStderr = { message: sanitizedArgs.join(' ') + '\n', } socket?.emit(EngineSocketEvent.ENGINE_STDERR, engineStderr) originalError.apply(console, sanitizedArgs) } socket.on(EngineSocketEvent.ENGINE_OPERATION, async (data: { operation: EngineOperation, operationType: EngineOperationType }) => { const { error: resultError } = await utils.tryCatchAndThrowOnEngineError(() => executeFromSocket(data.operation, data.operationType), ) if (resultError) { const engineError: EngineResponse = { response: undefined, status: EngineResponseStatus.INTERNAL_ERROR, error: utils.formatExecutionError(resultError), } console.error(utils.formatExecutionError(resultError)) await workerSocket.sendToWorkerWithAck(EngineSocketEvent.ENGINE_RESPONSE, engineError) } }) }, sendToWorkerWithAck: async ( type: EngineSocketEvent, data: unknown, ): Promise<void> => { await emitWithAck(socket, type, data, { timeoutMs: 4000, retries: 4, retryDelayMs: 1000, }) }, sendError: async (error: unknown): Promise<void> => { const engineStderr: EngineStderr = { message: inspect(error), } await emitWithAck(socket, EngineSocketEvent.ENGINE_STDERR, engineStderr, { timeoutMs: 3000, retries: 4, retryDelayMs: 1000, }) }, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server