@ragrabbit/mcp
by madarco
Verified
import { UserError } from "@repo/core";
import db from "@repo/db";
import { and, eq } from "@repo/db/drizzle";
import { Indexed, indexedTable } from "@repo/db/schema";
import { logger } from "@repo/logger";
import { generateEmbeddings } from "../indexing/llamaindex";
import { scrapeDbItem } from "../scraping/db";
import { crawlDbItem } from "../scraping/dbCrawl";
import { setTimeout } from "timers/promises";
export type ProcessResult = {
newIndexedIds: number[];
success: boolean;
error?: string;
};
export async function processWithRetry(indexedId: number, maxRetries = 1): Promise<ProcessResult> {
for (let i = 0; i < maxRetries; i++) {
try {
return await processDbItem(indexedId);
} catch (e) {
logger.error({ indexedId, error: e.message }, "Error processing content");
if (i === maxRetries - 1) {
return await saveDbItemFailure(indexedId, e);
} else {
await setTimeout(1000 * (i + 1));
}
}
}
}
export async function processDbItem(indexedId: number): Promise<ProcessResult> {
logger.info("Processing content", { indexedId });
const indexed = await db.query.indexedTable.findFirst({
where: eq(indexedTable.id, indexedId),
});
if (!indexed) {
throw new UserError("Index page not found");
}
if (indexed.status == "PENDING_CLEAN") {
return await cleanDbCrawlItem(indexed as Indexed);
}
await db
.update(indexedTable)
.set({
status: "PROCESSING",
indexedAt: new Date(),
} as Indexed)
.where(eq(indexedTable.id, indexed.id));
let newIndexedIds = [];
if (indexed.doCrawl && indexed.isSitemap) {
// NB: only sitemap pages are crawled, normal pages are crawled during scraping
logger.info("Crawling from item", { indexedId, isSitemap: indexed.isSitemap });
//Fetch all pages urls:
await crawlDbItem(indexed.id);
const newPages = await db
.update(indexedTable)
.set({
status: "DONE",
indexedAt: new Date(),
} as Indexed)
.where(eq(indexedTable.id, indexed.id))
.returning();
newIndexedIds = newPages.map((p) => p.id);
} else {
//Fetch page content and generate embeddings:
const { indexed, scrapeData, success, newIndexedIds: newIndexedIdsFromScrape } = await scrapeDbItem(indexedId);
if (!scrapeData || !scrapeData.content || !success) {
logger.info("Skipping page", { indexedId });
return {
newIndexedIds: [],
success: true,
};
}
newIndexedIds = newIndexedIdsFromScrape;
const { title, content, description, links } = scrapeData;
await generateEmbeddings(content, {
id: indexed.id.toString(),
url: indexed.url,
title,
description,
organizationId: indexed.organizationId,
});
await db
.update(indexedTable)
.set({
title,
description,
// NB: if is a crawl origin, we mark as pending clean to delete removed pages at the end:
status: indexed.doCrawl ? "PENDING_CLEAN" : "DONE",
indexedAt: new Date(),
} as Indexed)
.where(eq(indexedTable.id, indexed.id));
}
return {
newIndexedIds,
success: true,
};
}
export async function saveDbItemFailure(indexedId: number, error: Error): Promise<ProcessResult> {
logger.error({ indexedId, error: error.message }, "Error processing content");
await db
.update(indexedTable)
.set({
status: "ERROR",
error: error.message,
} as Indexed)
.where(eq(indexedTable.id, indexedId));
return {
newIndexedIds: [],
success: false,
error: error.message,
};
}
export async function cleanDbCrawlItem(indexed: Indexed) {
logger.info("Cleaning completed crawl item", { indexedId: indexed.id });
// Delete any items that were marked as outdated and were not processed during the crawl
await db
.delete(indexedTable)
.where(and(eq(indexedTable.foundFromIndexId, indexed.id), eq(indexedTable.status, "OUTDATED")));
// Mark this crawl as complete:
await db
.update(indexedTable)
.set({
status: "DONE",
indexedAt: new Date(),
} as Indexed)
.where(eq(indexedTable.id, indexed.id));
return {
newIndexedIds: [],
success: true,
};
}