Skip to main content
Glama

Activepieces MCP Server

by eldoonreval
server-api.service.tsβ€’8.14 kB
import path from 'path' import { PieceMetadataModel } from '@activepieces/pieces-framework' import { ApQueueJob, exceptionHandler, GetRunForWorkerRequest, PollJobRequest, QueueName, ResumeRunRequest, SavePayloadRequest, SendEngineUpdateRequest, SubmitPayloadsRequest, UpdateFailureCountRequest, UpdateJobRequest } from '@activepieces/server-shared' import { ActivepiecesError, ErrorCode, FlowRun, FlowVersionId, FlowVersionState, GetFlowVersionForWorkerRequest, GetPieceRequestQuery, isNil, PlatformUsageMetric, PopulatedFlow, UpdateRunProgressRequest, WorkerMachineHealthcheckRequest, WorkerMachineHealthcheckResponse } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import pLimit from 'p-limit' import { cacheState } from '../cache/cache-state' import { workerMachine } from '../utils/machine' import { ApAxiosClient } from './ap-axios' const globalCacheFlowPath = path.resolve('cache', 'flows') const flowCache = cacheState(globalCacheFlowPath) const removeTrailingSlash = (url: string): string => { return url.endsWith('/') ? url.slice(0, -1) : url } export const workerApiService = (workerToken: string) => { const apiUrl = removeTrailingSlash(workerMachine.getInternalApiUrl()) const client = new ApAxiosClient(apiUrl, workerToken) return { async heartbeat(): Promise<WorkerMachineHealthcheckResponse | null> { const request: WorkerMachineHealthcheckRequest = await workerMachine.getSystemInfo() try { return await client.post<WorkerMachineHealthcheckResponse>('/v1/worker-machines/heartbeat', request) } catch (error) { if (ApAxiosClient.isApAxiosError(error) && error.error.code === 'ECONNREFUSED') { return null } throw error } }, async poll(queueName: QueueName): Promise<ApQueueJob | null> { try { const request: PollJobRequest = { queueName, } const response = await client.get<ApQueueJob | null>('/v1/workers/poll', { params: request, }) return response } catch (error) { await new Promise((resolve) => setTimeout(resolve, 2000)) return null } }, async resumeRun(request: ResumeRunRequest): Promise<void> { await client.post<unknown>('/v1/workers/resume-run', request) }, async savePayloadsAsSampleData(request: SavePayloadRequest): Promise<void> { await client.post('/v1/workers/save-payloads', request) }, async startRuns(request: SubmitPayloadsRequest): Promise<FlowRun[]> { const arrayOfPayloads = splitPayloadsIntoOneMegabyteBatches(request.payloads) const limit = pLimit(1) const promises = arrayOfPayloads.map(payloads => limit(() => client.post<FlowRun[]>('/v1/workers/submit-payloads', { ...request, payloads, })), ) const results = await Promise.allSettled(promises) const errors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected') if (errors.length > 0) { const errorMessages = errors.map(e => e.reason.message).join(', ') throw new Error(`Failed to start runs: ${errorMessages}`) } return results .filter((r): r is PromiseFulfilledResult<FlowRun[]> => r.status === 'fulfilled') .map(r => r.value) .flat() }, async sendUpdate(request: SendEngineUpdateRequest): Promise<void> { await client.post('/v1/workers/send-engine-update', request) }, } } function splitPayloadsIntoOneMegabyteBatches(payloads: unknown[]): unknown[][] { const batches: unknown[][] = [[]] const ONE_MB = 1024 * 1024 let currentSize = 0 for (const payload of payloads) { const payloadSize = Buffer.byteLength(JSON.stringify(payload)) currentSize += payloadSize if (currentSize > ONE_MB) { batches.push([]) currentSize = payloadSize } batches[batches.length - 1].push(payload) } return batches } async function readFlowFromCache(flowVersionIdToRun: FlowVersionId): Promise<PopulatedFlow | null> { try { const cachedFlow = await flowCache.cacheCheckState(flowVersionIdToRun) return cachedFlow ? JSON.parse(cachedFlow) as PopulatedFlow : null } catch (error) { return null } } export const engineApiService = (engineToken: string, log: FastifyBaseLogger) => { const apiUrl = removeTrailingSlash(workerMachine.getInternalApiUrl()) const client = new ApAxiosClient(apiUrl, engineToken) return { async getFile(fileId: string): Promise<Buffer> { return client.get<Buffer>(`/v1/engine/files/${fileId}`, { responseType: 'arraybuffer', }) }, async updateJobStatus(request: UpdateJobRequest): Promise<void> { await client.post('/v1/engine/update-job', request) }, async updateFailureCount(request: UpdateFailureCountRequest): Promise<void> { await client.post('/v1/engine/update-failure-count', request) }, async getRun(request: GetRunForWorkerRequest): Promise<FlowRun> { return client.get<FlowRun>('/v1/engine/runs/' + request.runId, {}) }, async updateRunStatus(request: UpdateRunProgressRequest): Promise<void> { await client.post('/v1/engine/update-run', request) }, async getPiece(name: string, options: GetPieceRequestQuery): Promise<PieceMetadataModel> { return client.get<PieceMetadataModel>(`/v1/pieces/${encodeURIComponent(name)}`, { params: options, }) }, async checkTaskLimit(): Promise<void> { try { await client.get<unknown>('/v1/engine/check-task-limit', {}) } catch (e) { if (ApAxiosClient.isApAxiosError(e) && e.error.response && e.error.response.status === StatusCodes.PAYMENT_REQUIRED) { throw new ActivepiecesError({ code: ErrorCode.QUOTA_EXCEEDED, params: { metric: PlatformUsageMetric.TASKS, }, }) } exceptionHandler.handle(e, log) } }, async getFlowWithExactPieces(request: GetFlowVersionForWorkerRequest): Promise<PopulatedFlow | null> { const startTime = performance.now() log.debug({ request }, '[EngineApiService#getFlowWithExactPieces] start') const cachedFlow = await readFlowFromCache(request.versionId) if (!isNil(cachedFlow)) { log.debug({ request, took: performance.now() - startTime }, '[EngineApiService#getFlowWithExactPieces] cache hit') return cachedFlow } try { const flow = await client.get<PopulatedFlow | null>('/v1/engine/flows', { params: request, }) const isCachableFlow = !isNil(flow) && flow.version.state === FlowVersionState.LOCKED if (isCachableFlow) { await flowCache.setCache(flow.version.id, JSON.stringify(flow)) } return flow } catch (e) { if (ApAxiosClient.isApAxiosError(e) && e.error.response && e.error.response.status === 404) { return null } throw e } finally { log.debug({ request, took: performance.now() - startTime }, '[EngineApiService#getFlowWithExactPieces] cache miss') } }, } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eldoonreval/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server