Skip to main content
Glama
local-piece-cache.ts3.93 kB
import { AppSystemProp, memoryLock, rejectedPromiseHandler } from '@activepieces/server-shared' import { ApEnvironment, isNil, Result, tryCatch } from '@activepieces/shared' import dayjs from 'dayjs' import { FastifyBaseLogger } from 'fastify' import cron from 'node-cron' import semVer from 'semver' import { repoFactory } from '../../core/db/repo-factory' import { pubsub } from '../../helper/pubsub' import { system } from '../../helper/system/system' import { PieceMetadataEntity, PieceMetadataSchema } from './piece-metadata-entity' let cache: PieceMetadataSchema[] | null = null const repo = repoFactory(PieceMetadataEntity) const environment = system.get<ApEnvironment>(AppSystemProp.ENVIRONMENT) export const REDIS_REFRESH_LOCAL_PIECES_CHANNEL = 'refresh-local-pieces-cache' type State = { recentUpdate: string | undefined count: string } export const localPieceCache = (log: FastifyBaseLogger) => ({ async setup(): Promise<void> { await updateCache(log) cron.schedule('*/15 * * * *', () => { log.info('[localPieceCache] Refreshing pieces cache via cron job') rejectedPromiseHandler(updateCache(log), log) }) await pubsub.subscribe(REDIS_REFRESH_LOCAL_PIECES_CHANNEL, () => { log.info('[localPieceCache] Refreshing pieces cache via pubsub') rejectedPromiseHandler(updateCache(log), log) }) }, async getSortedbyNameAscThenVersionDesc(): Promise<PieceMetadataSchema[]> { if (environment === ApEnvironment.TESTING) { const { data, error } = await fetchPieces() if (error) { throw error } return data } if (isNil(cache)) { throw new Error('The cache is not yet initialized, this should not happen') } return cache }, }) async function updateCache(log: FastifyBaseLogger): Promise<void> { const piecesResult = await fetchPieces() if (piecesResult.error) { log.error({ error: piecesResult.error }, '[localPieceCache] Error fetching local pieces') throw piecesResult.error } cache = piecesResult.data } async function fetchPieces(): Promise<Result<PieceMetadataSchema[], Error>> { return memoryLock.runExclusive({ key: 'fetch-pieces', fn: async () => { const newestState: State | undefined = await repo() .createQueryBuilder() .select('MAX(updated)', 'recentUpdate') .addSelect('count(*)', 'count') .getRawOne() if (!isNil(newestState) && cache !== null) { const newestInCache = cache.reduce((acc, piece) => { return Math.max(dayjs(piece.updated).unix(), acc) }, 0) const needUpdate = dayjs(newestState.recentUpdate).unix() !== newestInCache || Number(newestState.count) !== cache.length if (!needUpdate) { return { data: cache as PieceMetadataSchema[], error: null } } } return tryCatch(async () => { const piecesFromDatabase = await repo().find() return piecesFromDatabase.sort((a, b) => { if (a.name !== b.name) { return a.name.localeCompare(b.name) } const aValid = semVer.valid(a.version) const bValid = semVer.valid(b.version) if (!aValid && !bValid) { return b.version.localeCompare(a.version) } if (!aValid) { return 1 } if (!bValid) { return -1 } return semVer.rcompare(a.version, b.version) }) }) }, }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/activepieces/activepieces'

If you have feedback or need assistance with the MCP directory API, please join our Discord server