@ragrabbit/mcp

by madarco
Verified
import { tasks } from "@repo/jobs"; import db from "@repo/db"; import { indexedTable } from "@repo/db/schema"; import type { ragGenerateEmbeddingsTask, ragGetContentTask, ragIndexAllTask, ragProcessingTask, } from "@repo/jobs/tasks/rag/indexing"; import { env } from "@repo/jobs/env.mjs"; import { and, eq, inArray } from "@repo/db/drizzle"; import { setTimeout } from "timers/promises"; interface JobsInterface { triggerProcessing(indexIds: number[], orgId: number): Promise<void>; triggerEmbeddings(indexIds: number[], orgId: number): Promise<void>; triggerGetContent(indexIds: number[], orgId: number): Promise<void>; triggerIndexAll(orgId: number): Promise<void>; } class PgJobs implements JobsInterface { async triggerProcessing(indexIds: number[], orgId: number) { await db .update(indexedTable) .set({ status: "PENDING" }) .where(and(inArray(indexedTable.id, indexIds), eq(indexedTable.organizationId, orgId))); } async triggerEmbeddings(indexIds: number[], orgId: number) { this.triggerProcessing(indexIds, orgId); } async triggerGetContent(indexIds: number[], orgId: number) { this.triggerProcessing(indexIds, orgId); } async triggerIndexAll(orgId: number) { await db.update(indexedTable).set({ status: "PENDING" }).where(eq(indexedTable.organizationId, orgId)); } } class TriggerDevJobs implements JobsInterface { async triggerProcessing(indexIds: number[], orgId: number) { // Process in batches of 100 const batchSize = 100; for (let i = 0; i < indexIds.length; i += batchSize) { const batchIds = indexIds.slice(i, i + batchSize); await tasks.batchTrigger<typeof ragProcessingTask>( "rag-processing", indexIds.map((id) => ({ payload: { indexedId: id }, options: { tags: ["org:" + orgId] }, })) ); await setTimeout(1000); } } async triggerEmbeddings(indexIds: number[], orgId: number) { await tasks.batchTrigger<typeof ragGenerateEmbeddingsTask>( "rag-generate-embeddings", indexIds.map((id) => ({ payload: { indexedId: id }, options: { tags: ["org:" + orgId] }, })) ); } async triggerGetContent(indexIds: number[], orgId: number) { await tasks.batchTrigger<typeof ragGetContentTask>( "rag-get-content", indexIds.map((id) => ({ payload: { indexedId: id }, options: { tags: ["org:" + orgId] }, })) ); } async triggerIndexAll(orgId: number) { await tasks.trigger<typeof ragIndexAllTask>("rag-index-all", { orgId }, { tags: ["org:" + orgId] }); } } export const jobs: JobsInterface = env.TRIGGER_SECRET_KEY ? new TriggerDevJobs() : new PgJobs();