Skip to main content
Glama
piece-sync-service.ts4.2 kB
import { AppSystemProp, apVersionUtil } from '@activepieces/server-shared' import { PieceSyncMode, PieceType } from '@activepieces/shared' import { FastifyBaseLogger } from 'fastify' import pLimit from 'p-limit' import { system } from '../helper/system/system' import { SystemJobName } from '../helper/system-jobs/common' import { systemJobHandlers } from '../helper/system-jobs/job-handlers' import { systemJobsSchedule } from '../helper/system-jobs/system-job' import { pieceMetadataService, pieceRepos } from './metadata/piece-metadata-service' const CLOUD_API_URL = 'https://cloud.activepieces.com/api/v1/pieces' const syncMode = system.get<PieceSyncMode>(AppSystemProp.PIECES_SYNC_MODE) export const pieceSyncService = (log: FastifyBaseLogger) => ({ async setup(): Promise<void> { systemJobHandlers.registerJobHandler(SystemJobName.PIECES_SYNC, async function syncPiecesJobHandler(): Promise<void> { await pieceSyncService(log).sync() }) await pieceSyncService(log).sync() await systemJobsSchedule(log).upsertJob({ job: { name: SystemJobName.PIECES_SYNC, data: {}, }, schedule: { type: 'repeated', cron: `${Math.floor(Math.random() * 5)} */1 * * *`, }, }) }, async sync(): Promise<void> { if (syncMode !== PieceSyncMode.OFFICIAL_AUTO) { log.info('Piece sync service is disabled') return } try { log.info('Starting piece synchronization') const startTime = performance.now() const [dbPieces, cloudPieces] = await Promise.all([pieceRepos().find(), listCloudPieces()]) const dbMap = new Map<string, true>(dbPieces.map(dbPiece => [`${dbPiece.name}:${dbPiece.version}`, true])) const cloudMap = new Map<string, true>(cloudPieces.map(cloudPiece => [`${cloudPiece.name}:${cloudPiece.version}`, true])) const newPiecesToFetch = cloudPieces.filter(piece => !dbMap.has(`${piece.name}:${piece.version}`)) const limit = pLimit(20) const newPiecesMetadata = await Promise.all(newPiecesToFetch.map(piece => limit(async () => { const url = `${CLOUD_API_URL}/${piece.name}${piece.version ? '?version=' + piece.version : ''}` const response = await fetch(url) if (!response.ok) { log.warn({ name: piece.name, version: piece.version, status: response.status }, 'Error reading piece metadata') return } const pieceMetadata = await response.json() await pieceMetadataService(log).create({ pieceMetadata, packageType: pieceMetadata.packageType, pieceType: pieceMetadata.pieceType, }) }))) const officalPiecesThatIsNotOnCloud = dbPieces.filter(piece => piece.pieceType === PieceType.OFFICIAL && !cloudMap.has(`${piece.name}:${piece.version}`), ) await pieceMetadataService(log).bulkDelete(officalPiecesThatIsNotOnCloud.map(piece => ({ name: piece.name, version: piece.version }))) log.info({ piecesAdded: newPiecesMetadata.length, piecesDeleted: officalPiecesThatIsNotOnCloud.length, durationMs: Math.floor(performance.now() - startTime), }, 'Piece synchronization completed') } catch (error) { log.error({ error }, 'Error syncing pieces') } }, }) async function listCloudPieces(): Promise<PieceRegistryResponse[]> { const queryParams = new URLSearchParams() queryParams.append('edition', system.getEdition()) queryParams.append('release', await apVersionUtil.getCurrentRelease()) const response = await fetch(`${CLOUD_API_URL}/registry?${queryParams.toString()}`) if (!response.ok) { throw new Error(`Failed to fetch cloud pieces: ${response.status}`) } return response.json() } type PieceRegistryResponse = { name: string version: string }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server