import { AppSystemProp, exceptionHandler, fileCompressor, WorkerSystemProp } from '@activepieces/server-shared'
import {
ActivepiecesError,
apId,
assertNotNullOrUndefined,
ErrorCode,
File,
FileCompression,
FileId,
FileLocation,
FileType,
isMultipartFile,
isNil,
ProjectId,
} from '@activepieces/shared'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { In, LessThanOrEqual } from 'typeorm'
import { repoFactory } from '../core/db/repo-factory'
import { system } from '../helper/system/system'
import { FileEntity } from './file.entity'
import { s3Helper } from './s3-helper'
const IMAGE_MIME_TYPES = ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml', 'image/tiff', 'image/bmp', 'image/ico', 'image/avif', 'image/apng']
export const fileRepo = repoFactory<File>(FileEntity)
const EXECUTION_DATA_RETENTION_DAYS = system.getNumberOrThrow(AppSystemProp.EXECUTION_DATA_RETENTION_DAYS)
type BaseFile = Pick<File, 'id' | 'projectId' | 'platformId' | 'type' | 'fileName' | 'compression' | 'size' | 'metadata' | 'created' | 'updated'>
const saveFileToDb = async (baseFile: BaseFile, data: SaveParams['data']) => {
assertNotNullOrUndefined(data, 'data is required')
return fileRepo().save({
...baseFile,
location: FileLocation.DB,
data,
})
}
export const fileService = (log: FastifyBaseLogger) => ({
async save(params: SaveParams): Promise<File> {
const baseFile: BaseFile = {
id: params.fileId ?? apId(),
projectId: params.projectId,
platformId: params.platformId,
type: params.type,
fileName: params.fileName,
compression: params.compression,
size: params.size,
metadata: params.metadata,
created: dayjs().toISOString(),
updated: dayjs().toISOString(),
}
const location = getLocationForFile(params.type)
switch (location) {
case FileLocation.DB: {
return saveFileToDb(baseFile, params.data)
}
case FileLocation.S3: {
try {
const s3Key = await s3Helper(log).constructS3Key(params.platformId, params.projectId, params.type, baseFile.id)
if (!isNil(params.data)) {
await s3Helper(log).uploadFile(s3Key, params.data)
}
const savedFile = await fileRepo().save({
...baseFile,
location: FileLocation.S3,
s3Key,
})
return savedFile
}
catch (error) {
exceptionHandler.handle(error, log)
return saveFileToDb(baseFile, params.data)
}
}
}
},
async exists(params: GetOneParams): Promise<boolean> {
const file = await fileRepo().findOneBy({
projectId: params.projectId,
id: params.fileId,
type: params.type,
})
return !isNil(file)
},
async getFile({ projectId, fileId, type }: GetOneParams): Promise<File | null> {
const file = await fileRepo().findOneBy({
projectId,
id: fileId,
type,
})
return file
},
async getFileOrThrow(params: GetOneParams): Promise<File> {
const file = await this.getFile(params)
if (isNil(file)) {
throw new ActivepiecesError({
code: ErrorCode.FILE_NOT_FOUND,
params: {
id: params.fileId,
},
})
}
return file
},
async getDataOrUndefined({ projectId, fileId, type }: GetOneParams): Promise<GetDataResponse | undefined> {
try {
return await this.getDataOrThrow({ projectId, fileId, type })
}
catch (error) {
log.error({
error,
}, '[FileService#getData] error')
return undefined
}
},
async getDataOrThrow({ projectId, fileId, type }: GetOneParams): Promise<GetDataResponse> {
const file = await fileRepo().findOneBy({
projectId,
id: fileId,
type,
})
if (isNil(file)) {
throw new ActivepiecesError({
code: ErrorCode.FILE_NOT_FOUND,
params: {
id: fileId,
},
})
}
const data = await fileCompressor.decompress({
data: file.location === FileLocation.DB ? file.data : await s3Helper(log).getFile(file.s3Key!),
compression: file.compression,
})
return {
metadata: file.metadata,
data,
fileName: file.fileName,
}
},
async deleteStaleBulk(types: FileType[]) {
const retentionDateBoundary = dayjs().subtract(EXECUTION_DATA_RETENTION_DAYS, 'days').toISOString()
const maximumFilesToDeletePerIteration = 4000
let affected: undefined | number = undefined
let totalAffected = 0
while (isNil(affected) || affected === maximumFilesToDeletePerIteration) {
const staleFiles = await fileRepo().find({
select: ['id', 'created', 's3Key'],
where: {
type: In(types),
created: LessThanOrEqual(retentionDateBoundary),
},
take: maximumFilesToDeletePerIteration,
})
const s3Keys = staleFiles.filter(f => !isNil(f.s3Key)).map(f => f.s3Key!)
await s3Helper(log).deleteFiles(s3Keys)
const result = await fileRepo().delete({
type: In(types),
created: LessThanOrEqual(retentionDateBoundary),
id: In(staleFiles.map(file => file.id)),
})
affected = result.affected || 0
totalAffected += affected
log.info({
counts: affected,
types,
}, '[FileService#deleteStaleBulk] iteration completed')
}
log.info({
totalAffected,
types,
}, '[FileService#deleteStaleBulk] completed')
},
async uploadPublicAsset(params: UploadPublicAssetParams): Promise<string | undefined> {
const { file, type, platformId, allowedMimeTypes = IMAGE_MIME_TYPES, maxFileSizeInBytes, metadata } = params
if (isNil(file)) {
return undefined
}
if (!isMultipartFile(file)) {
throw new ActivepiecesError({
code: ErrorCode.VALIDATION,
params: {
message: 'File must be a multipart file',
},
})
}
if (!allowedMimeTypes.includes(file.mimetype ?? '')) {
throw new ActivepiecesError({
code: ErrorCode.VALIDATION,
params: {
message: `Invalid file type. Allowed types: ${allowedMimeTypes.join(', ')}`,
},
})
}
if (!isNil(maxFileSizeInBytes) && file.data.length > maxFileSizeInBytes) {
throw new ActivepiecesError({
code: ErrorCode.VALIDATION,
params: {
message: `File size exceeds ${Math.round(maxFileSizeInBytes / (1024 * 1024))}MB limit`,
},
})
}
const savedFile = await this.save({
data: file.data,
size: file.data.length,
type,
compression: FileCompression.NONE,
platformId,
fileName: file.filename,
metadata: {
...metadata,
mimetype: file.mimetype ?? '',
},
})
return `${system.get(WorkerSystemProp.FRONTEND_URL)}/api/v1/platforms/assets/${savedFile.id}`
},
})
type GetDataResponse = {
metadata?: Record<string, string>
data: Buffer
fileName?: string
}
function getLocationForFile(type: FileType) {
const FILE_LOCATION = system.getOrThrow<FileLocation>(AppSystemProp.FILE_STORAGE_LOCATION)
if (isExecutionDataFileThatExpires(type)) {
return FILE_LOCATION
}
return FileLocation.DB
}
function isExecutionDataFileThatExpires(type: FileType) {
switch (type) {
case FileType.FLOW_RUN_LOG:
case FileType.FLOW_STEP_FILE:
case FileType.TRIGGER_PAYLOAD:
case FileType.TRIGGER_EVENT_FILE:
case FileType.PLATFORM_ASSET:
case FileType.USER_PROFILE_PICTURE:
return true
case FileType.SAMPLE_DATA:
case FileType.SAMPLE_DATA_INPUT:
case FileType.PACKAGE_ARCHIVE:
case FileType.PROJECT_RELEASE:
case FileType.FLOW_VERSION_BACKUP:
return false
default:
throw new Error(`File type ${type} is not supported`)
}
}
type SaveParams = {
fileId?: FileId | undefined
projectId?: ProjectId
data: Buffer | null
size: number
type: FileType
platformId?: string
fileName?: string
compression: FileCompression
metadata?: Record<string, string>
}
type GetOneParams = {
fileId: FileId
projectId?: ProjectId
type?: FileType
}
type UploadPublicAssetParams = {
file: unknown
type: FileType
platformId: string
allowedMimeTypes?: string[]
maxFileSizeInBytes?: number
metadata?: Record<string, string>
}