Skip to main content
Glama

OpenAI SDK Knowledge MCP Server

by seratch
job-processor.ts35.1 kB
import { and, asc, count, eq } from "drizzle-orm"; import { JobQueue, type Job, type WorkItem } from "@/pipeline/job-queue"; import { GitHubCollectorImpl } from "@/pipeline/collectors/github"; import { ForumCollectorImpl } from "@/pipeline/collectors/forum"; import { IssueSummarizerImpl } from "@/pipeline/processors/issue-summarizer"; import { ForumPostSummarizerAgent } from "@/agents/forum-summarizer-agent"; import { CodeSnippetGeneratorAgent } from "@/agents/code-snippet-generator-agent"; import { EmbeddingGeneratorImpl } from "@/pipeline/processors/embeddings"; import { getVectorStore, VectorStore } from "@/storage/vector-store"; import { RateLimiter } from "@/rate-limiter"; import { Logger } from "@/logger"; import { IdUtils } from "@/pipeline/processors/id-utils"; import { TokenCounter } from "@/pipeline/token-counter"; import { Env } from "@/env"; import { getDrizzleDB, type DrizzleDB } from "@/storage/d1-database"; import * as Schema from "@/storage/d1-database/schema"; export class JobProcessor { private env: Env; private drizzleDb: DrizzleDB; private jobQueue: JobQueue; private githubCollector: GitHubCollectorImpl; private forumCollector: ForumCollectorImpl; private issueSummarizer: IssueSummarizerImpl; private forumPostSummarizer: ForumPostSummarizerAgent; private codeSnippetGenerator: CodeSnippetGeneratorAgent; private embeddingGenerator: EmbeddingGeneratorImpl; private vectorStore: VectorStore | null = null; private openaiRateLimiter: RateLimiter; constructor(env: Env) { this.env = env; const openaiApiKey = env.OPENAI_API_KEY; this.drizzleDb = getDrizzleDB(env.DB); this.jobQueue = new JobQueue(env.DB, env.JOB_QUEUE); this.githubCollector = new GitHubCollectorImpl(env.GITHUB_TOKEN); this.forumCollector = new ForumCollectorImpl(); this.embeddingGenerator = new EmbeddingGeneratorImpl(openaiApiKey); this.openaiRateLimiter = new RateLimiter({ requestsPerMinute: 500, retryAttempts: 2, baseDelayMs: 500, }); this.issueSummarizer = new IssueSummarizerImpl(env, this.openaiRateLimiter); this.forumPostSummarizer = new ForumPostSummarizerAgent( env, this.openaiRateLimiter, ); this.codeSnippetGenerator = new CodeSnippetGeneratorAgent( env, this.openaiRateLimiter, ); } async processNextJobs(maxJobs: number = 5): Promise<{ processed: number; succeeded: number; failed: number; errors: string[]; }> { if (!this.vectorStore) { this.vectorStore = await getVectorStore(this.env); } const jobs = await this.jobQueue.getNextJobs(maxJobs); if (jobs.length === 0) { Logger.lazyDebug(() => "No pending jobs to process"); return { processed: 0, succeeded: 0, failed: 0, errors: [] }; } Logger.info( `Processing ${jobs.length} jobs in parallel with controlled concurrency`, ); let succeeded = 0; let failed = 0; const errors: string[] = []; const results = await Promise.allSettled( jobs.map(async (job, index) => { try { await this.jobQueue.markJobRunning(job.id); if (job.jobType === "github_collect" && index > 0) { const delay = Math.random() * 1000 + 500; Logger.lazyDebug( () => `Staggering GitHub collection job ${job.id} by ${delay}ms`, ); await new Promise((resolve) => setTimeout(resolve, delay)); } await this.executeJobLogic(job); await this.jobQueue.markJobCompleted(job.id); return { success: true, jobId: job.id }; } catch (error) { Logger.error("Error executing job logic", { jobId: job.id, jobType: job.jobType, error: error, }); const errorMessage = error instanceof Error ? error.message : "Unknown error"; if ( errorMessage.includes("too many SQL variables") || errorMessage.includes("SQLITE_ERROR") ) { Logger.error(`❌ SQLite Variable Limit Error: ${errorMessage}`, { jobId: job.id, jobType: job.jobType, errorType: "SQL_VARIABLE_LIMIT_EXCEEDED", suggestion: "This error occurs when SQL queries have more than 999 parameters. Check batch sizes in filterExistingFiles, createWorkItems, and storeEmbeddingsInD1 methods.", }); } else if ( errorMessage.includes("SQLITE_TOOBIG") || errorMessage.includes("string or blob too big") ) { Logger.error(`❌ SQLite Content Size Error: ${errorMessage}`, { jobId: job.id, jobType: job.jobType, errorType: "CONTENT_SIZE_EXCEEDED", suggestion: "This error occurs when content exceeds D1 database limits. Content should be validated and truncated using TokenCounter.validateAndTruncateContent() before storage.", maxContentSize: TokenCounter.D1_SAFE_CONTENT_SIZE, maxJsonSize: TokenCounter.D1_SAFE_JSON_SIZE, }); } await this.jobQueue.markJobFailed(job.id, errorMessage); if (job.retryCount < job.maxRetries) { Logger.warn(`Job ${job.id} failed, will retry: ${errorMessage}`); } else { Logger.error( `Job ${job.id} failed permanently after ${job.maxRetries} retries: ${errorMessage}`, ); } return { success: false, jobId: job.id, error: errorMessage }; } }), ); results.forEach((result, index) => { if (result.status === "fulfilled") { if (result.value.success) { succeeded++; Logger.lazyDebug( () => `Job ${result.value.jobId} completed successfully`, ); } else { failed++; const errorMsg = `Job ${result.value.jobId} failed: ${result.value.error}`; Logger.error(errorMsg); errors.push(errorMsg); } } else { failed++; const errorMsg = `Job ${jobs[index].id} failed with exception: ${result.reason}`; Logger.error(errorMsg); errors.push(errorMsg); } }); Logger.info( `Job processing summary: ${succeeded} succeeded, ${failed} failed out of ${jobs.length} total`, ); return { processed: jobs.length, succeeded, failed, errors, }; } async processJob(jobId: number): Promise<void> { const job = await this.jobQueue.getJob(jobId); if (!job) { Logger.error(`Job ${jobId} not found`); return; } await this.jobQueue.markJobRunning(job.id); try { await this.executeJobLogic(job); await this.jobQueue.markJobCompleted(job.id); } catch (error) { const msg = error instanceof Error ? error.message : "Unknown error"; await this.jobQueue.markJobFailed(job.id, msg); Logger.error(`Job ${job.id} failed`, error); } } private async executeJobLogic(job: Job): Promise<void> { if (!this.vectorStore) { this.vectorStore = await getVectorStore(this.env); } Logger.info(`Executing job logic for job`, job); switch (job.jobType) { case "github_collect": await this.processGitHubCollection(job); break; case "forum_collect": await this.processForumCollection(job); break; case "process_item": await this.processWorkItem(job); break; case "process_github_batch": await this.processGitHubBatch(job); break; case "process_forum_batch": await this.processForumBatch(job); break; case "process_batch_item": await this.processBatchItem(job); break; case "process_pending_work_items": await this.processPendingWorkItems(job); break; default: throw new Error(`Unknown job type: ${job.jobType}`); } } private async processGitHubCollection(job: Job): Promise<void> { Logger.info("Processing GitHub collection job", { jobId: job.id, payload: job.payload, }); const payload = JSON.parse(job.payload); const { owner, repo, collectionRunId, maxPages = 2 } = payload; Logger.info("Extracted GitHub collection parameters", { owner, repo, collectionRunId, maxPages, }); if (!owner || !repo) { throw new Error( `Invalid GitHub collection parameters: owner=${owner}, repo=${repo}`, ); } Logger.info( `Collecting GitHub data for ${owner}/${repo} with maxPages=${maxPages}`, ); let issues: any[] = []; let content: any[] = []; try { issues = await this.githubCollector.fetchIssues( owner, repo, "all", undefined, maxPages, ); } catch (error) { Logger.error(`Failed to fetch issues for ${owner}/${repo}:`, error); } Logger.info(`Collecting GitHub issues for ${owner}/${repo} completed`); try { content = await this.githubCollector.fetchRepositoryContent(owner, repo); } catch (error) { Logger.error( `Failed to fetch repository content for ${owner}/${repo}:`, error, ); } Logger.info(`Collecting GitHub content for ${owner}/${repo} completed`); const eligibleFiles = (content || []).filter( (file) => file.type === "file" && file.content && file.content.length > 200, ); const newFiles = await this.filterExistingFiles(eligibleFiles); const collectionData = { issues: issues || [], files: newFiles || [], metadata: { owner, repo, collectionRunId, collectedAt: new Date().toISOString(), totalIssues: (issues || []).length, totalFiles: newFiles.length, skippedFiles: eligibleFiles.length - newFiles.length, }, }; const BATCH_SIZE = 5; const allItems = [ ...collectionData.issues.map((issue: any) => ({ type: "github_issue", data: issue, })), ...collectionData.files.map((file: any) => ({ type: "github_file", data: file, })), ]; Logger.info( `${allItems.length} batch job items for ${owner}/${repo} created`, ); if (allItems.length === 0) { Logger.info(`No items to process for ${owner}/${repo}`); return; } for (let i = 0; i < allItems.length; i += BATCH_SIZE) { const batchItems = allItems.slice(i, i + BATCH_SIZE); const batchData = { issues: batchItems .filter((item) => item.type === "github_issue") .map((item) => item.data), files: batchItems .filter((item) => item.type === "github_file") .map((item) => item.data), metadata: { ...collectionData.metadata, batchNumber: Math.floor(i / BATCH_SIZE) + 1, totalBatches: Math.ceil(allItems.length / BATCH_SIZE), batchSize: batchItems.length, }, }; await this.jobQueue.createJob( "process_github_batch", { collectionRunId, batchData, chunkSize: 10, }, collectionRunId, 5, ); Logger.info( `"process_github_batch" batch job for ${owner}/${repo} created`, ); } Logger.info( `Created ${Math.ceil(allItems.length / BATCH_SIZE)} batch processing jobs for ${owner}/${repo} (${(issues || []).length} issues, ${newFiles.length} new files, ${eligibleFiles.length - newFiles.length} files skipped as already existing)`, ); } private async processForumCollection(job: Job): Promise<void> { const payload = JSON.parse(job.payload); const { categories, collectionRunId } = payload; Logger.info( `Collecting forum data for categories: ${categories?.join(", ") || "default"}`, ); const allCategories = await this.forumCollector.fetchCategories(); const targetCategories = categories ? allCategories.filter((cat) => categories.includes(cat.slug)) : allCategories.slice(0, 50); const collectionData: { posts: Array<{ type: string; id: string; data: any }>; metadata: { categories: string; collectionRunId: number; collectedAt: string; totalPosts: number; }; } = { posts: [], metadata: { categories: categories?.join(", ") || "default", collectionRunId, collectedAt: new Date().toISOString(), totalPosts: 0, }, }; for (const category of targetCategories) { const posts = await this.forumCollector.fetchMultiplePages( (page) => this.forumCollector.fetchCategoryPostsWithId( category.slug, category.id, page, ), 3, 100, ); Logger.lazyDebug( () => `Fetched ${posts.length} posts from category ${category.slug} (ID: ${category.id})`, ); const highQualityPosts = this.forumCollector.filterHighQualityPosts(posts); Logger.info( `Filtered to ${highQualityPosts.length} high-quality posts from ${posts.length} total posts in category ${category.slug}`, ); const categoryPosts = highQualityPosts.slice(0, 150).map((post) => ({ type: "forum_post", id: post.id.toString(), data: { post, category }, })); collectionData.posts.push(...categoryPosts); } collectionData.metadata.totalPosts = collectionData.posts.length; const BATCH_SIZE = 5; if (collectionData.posts.length === 0) { Logger.info( `No posts to process for categories: ${categories?.join(", ") || "default"}`, ); return; } for (let i = 0; i < collectionData.posts.length; i += BATCH_SIZE) { const batchPosts = collectionData.posts.slice(i, i + BATCH_SIZE); const batchData = { posts: batchPosts, metadata: { ...collectionData.metadata, batchNumber: Math.floor(i / BATCH_SIZE) + 1, totalBatches: Math.ceil(collectionData.posts.length / BATCH_SIZE), batchSize: batchPosts.length, }, }; await this.jobQueue.createJob( "process_forum_batch", { collectionRunId, batchData, chunkSize: 10, }, collectionRunId, 5, ); } Logger.info( `Created ${Math.ceil(collectionData.posts.length / BATCH_SIZE)} forum batch processing jobs with ${collectionData.posts.length} total posts`, ); } private async processWorkItem(job: Job): Promise<void> { if (!this.vectorStore) { this.vectorStore = await getVectorStore(this.env); } const payload = JSON.parse(job.payload); if (!payload.workItemId) { throw new Error("workItemId is required in job payload"); } const workItems = await this.drizzleDb .select() .from(Schema.workItems) .where(eq(Schema.workItems.id, payload.workItemId)) .limit(1); const workItem = workItems[0]; if (!workItem) { Logger.lazyDebug( () => `Work item with ID ${payload.workItemId} not found or already processed`, ); return; } Logger.info(`Processing work item ${workItem.id} (${workItem.itemType})`); await this.jobQueue.markWorkItemProcessing(workItem.id); try { const startTime = Date.now(); let document; const sourceData = JSON.parse(workItem.sourceData); switch (workItem.itemType) { case "github_issue": Logger.lazyDebug( () => `Processing GitHub issue: ${sourceData.title}`, ); document = await this.processGitHubIssue(sourceData, workItem); break; case "github_file": Logger.lazyDebug(() => `Processing GitHub file: ${sourceData.path}`); document = await this.processGitHubFile(sourceData, workItem); break; case "forum_post": Logger.lazyDebug( () => `Processing forum post: ${sourceData.post.id}`, ); document = await this.processForumPost(sourceData, workItem); break; default: throw new Error(`Unknown work item type: ${workItem.itemType}`); } if (document === null) { await this.jobQueue.markWorkItemSkipped( workItem.id, "Item filtered out during processing - no useful content found", ); const duration = Date.now() - startTime; Logger.info( `Skipped work item ${workItem.id} (${workItem.itemType}) - filtered out in ${duration}ms`, ); return; } Logger.lazyDebug( () => `Generating embeddings for work item ${workItem.id}`, ); const embedded = await this.embeddingGenerator.batchProcess([document]); Logger.lazyDebug(() => `Storing embeddings for work item ${workItem.id}`); await this.vectorStore.store(embedded); await this.jobQueue.markWorkItemCompleted(workItem.id, document); const duration = Date.now() - startTime; Logger.info( `Completed work item ${workItem.id} (${workItem.itemType}) in ${duration}ms`, ); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error"; Logger.error(`Work item ${workItem.id} processing failed:`, { error: errorMessage, stack: error instanceof Error ? error.stack : undefined, workItemType: workItem.itemType, workItemId: workItem.id, }); await this.jobQueue.markWorkItemFailed(workItem.id, errorMessage); throw error; } } private async processGitHubIssue(issue: any, workItem: WorkItem) { const summary = await this.issueSummarizer.summarizeIssue(issue); if (!summary) { Logger.info( `Issue #${issue.number} filtered out: no useful solution or conclusion`, ); return null; } const issueContent = `${summary.title}\n\n${summary.summary}`; const validatedContent = TokenCounter.validateAndTruncateContent(issueContent); const contentBytes = new TextEncoder().encode(issueContent).length; const validatedBytes = new TextEncoder().encode(validatedContent).length; if (validatedContent !== issueContent) { Logger.warn( `GitHub issue #${issue.number} content truncated from ${contentBytes} to ${validatedBytes} bytes (${issueContent.length} to ${validatedContent.length} chars)`, ); } else { Logger.lazyDebug( () => `GitHub issue #${issue.number} content size: ${contentBytes} bytes (${issueContent.length} chars) - within limits`, ); } return { id: IdUtils.ensureSafeId(`github_issue_${workItem.itemId}`), content: validatedContent, metadata: { title: issue.title, author: issue.author, createdAt: issue.created_at, updatedAt: issue.updated_at, sourceUrl: issue.html_url, tags: issue.labels, }, source: "github", }; } private async processGitHubFile(file: any, workItem: WorkItem) { const codeSnippet = await this.codeSnippetGenerator.generateReusableSnippet( file.content, file.path, ); const validatedContent = TokenCounter.validateAndTruncateContent( codeSnippet.generatedSnippet, ); const contentBytes = new TextEncoder().encode( codeSnippet.generatedSnippet, ).length; const validatedBytes = new TextEncoder().encode(validatedContent).length; if (validatedContent !== codeSnippet.generatedSnippet) { Logger.warn( `GitHub file ${file.path} content truncated from ${contentBytes} to ${validatedBytes} bytes (${codeSnippet.generatedSnippet.length} to ${validatedContent.length} chars)`, ); } else { Logger.lazyDebug( () => `GitHub file ${file.path} content size: ${contentBytes} bytes (${codeSnippet.generatedSnippet.length} chars) - within limits`, ); } return { id: IdUtils.ensureSafeId( `github_file_${workItem.itemId.replace(/[^a-zA-Z0-9]/g, "_")}`, ), content: validatedContent, metadata: { title: file.name, sourceUrl: file.download_url, language: codeSnippet.language, category: codeSnippet.isUnitTest ? "unit-test" : "source-code", }, source: "github", }; } private async processForumPost(data: any, workItem: WorkItem) { const { post, category } = data; const topicDetails = await this.forumCollector.fetchTopicDetails(post.id); const summary = await this.forumPostSummarizer.summarizeForumPost(topicDetails); if (!summary) { Logger.info( `Forum post #${post.id} filtered out: no useful content or solution`, ); return null; } const forumContent = `${summary.title}\n\n${summary.summary}`; const validatedContent = TokenCounter.validateAndTruncateContent(forumContent); const contentBytes = new TextEncoder().encode(forumContent).length; const validatedBytes = new TextEncoder().encode(validatedContent).length; if (validatedContent !== forumContent) { Logger.warn( `Forum post #${post.id} content truncated from ${contentBytes} to ${validatedBytes} bytes (${forumContent.length} to ${validatedContent.length} chars)`, ); } else { Logger.lazyDebug( () => `Forum post #${post.id} content size: ${contentBytes} bytes (${forumContent.length} chars) - within limits`, ); } return { id: IdUtils.ensureSafeId(`forum_${workItem.itemId}`), content: validatedContent, metadata: { title: topicDetails.title, author: post.author, createdAt: post.created_at, updatedAt: topicDetails.last_posted_at || post.created_at, sourceUrl: `https://community.openai.com/t/${post.id}`, category: category.name, tags: topicDetails.tags, }, source: "forum", }; } private async processPendingWorkItems(job: Job): Promise<void> { const payload = JSON.parse(job.payload); const { collectionRunId, batchSize = 5 } = payload; Logger.info( `Processing pending work items for collection run ${collectionRunId} with batch size ${batchSize}`, ); const pendingWorkItems = await this.drizzleDb .select() .from(Schema.workItems) .where( and( eq(Schema.workItems.collectionRunId, collectionRunId), eq(Schema.workItems.status, "pending"), ), ) .orderBy(asc(Schema.workItems.createdAt)) .limit(batchSize); const workItems = pendingWorkItems; if (workItems.length === 0) { Logger.lazyDebug( () => `No pending work items found for collection run ${collectionRunId}`, ); return; } Logger.info( `Processing ${workItems.length} pending work items in parallel`, ); const results = await Promise.allSettled( workItems.map(async (workItem) => { try { Logger.info( `Processing work item ${workItem.id} (${workItem.itemType})`, ); await this.jobQueue.markWorkItemProcessing(workItem.id); const startTime = Date.now(); let document; const sourceData = JSON.parse(workItem.sourceData); switch (workItem.itemType) { case "github_issue": Logger.lazyDebug( () => `Processing GitHub issue: ${sourceData.title}`, ); document = await this.processGitHubIssue(sourceData, workItem); break; case "github_file": Logger.lazyDebug( () => `Processing GitHub file: ${sourceData.path}`, ); document = await this.processGitHubFile(sourceData, workItem); break; case "forum_post": Logger.lazyDebug( () => `Processing forum post: ${sourceData.post.id}`, ); document = await this.processForumPost(sourceData, workItem); break; default: throw new Error(`Unknown work item type: ${workItem.itemType}`); } if (document === null) { await this.jobQueue.markWorkItemSkipped( workItem.id, "Item filtered out during processing - no useful content found", ); const duration = Date.now() - startTime; Logger.info( `Skipped work item ${workItem.id} (${workItem.itemType}) - filtered out in ${duration}ms`, ); return { success: true, workItemId: workItem.id, skipped: true }; } Logger.lazyDebug( () => `Generating embeddings for work item ${workItem.id}`, ); const embedded = await this.embeddingGenerator.batchProcess([ document, ]); Logger.lazyDebug( () => `Storing embeddings for work item ${workItem.id}`, ); const documentContentSize = new TextEncoder().encode( document.content, ).length; Logger.lazyDebug( () => `Document content size before storage: ${documentContentSize} bytes (${document.content.length} chars)`, ); if (!this.vectorStore) { this.vectorStore = await getVectorStore(this.env); } await this.vectorStore.store(embedded); await this.jobQueue.markWorkItemCompleted(workItem.id, document); const duration = Date.now() - startTime; Logger.info( `Completed work item ${workItem.id} (${workItem.itemType}) in ${duration}ms`, ); return { success: true, workItemId: workItem.id, skipped: false }; } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error"; Logger.error(`Work item ${workItem.id} processing failed:`, { error: errorMessage, stack: error instanceof Error ? error.stack : undefined, workItemType: workItem.itemType, workItemId: workItem.id, }); await this.jobQueue.markWorkItemFailed(workItem.id, errorMessage); return { success: false, workItemId: workItem.id, error: errorMessage, }; } }), ); let succeeded = 0; let failed = 0; let skipped = 0; results.forEach((result) => { if (result.status === "fulfilled") { if (result.value.success) { if (result.value.skipped) { skipped++; } else { succeeded++; } } else { failed++; } } else { failed++; } }); Logger.info( `Batch processing summary for collection run ${collectionRunId}: ${succeeded} succeeded, ${skipped} skipped, ${failed} failed out of ${workItems.length} total`, ); const remainingWorkItems = await this.drizzleDb .select({ count: count() }) .from(Schema.workItems) .where( and( eq(Schema.workItems.collectionRunId, collectionRunId), eq(Schema.workItems.status, "pending"), ), ); const remainingCount = remainingWorkItems[0]?.count || 0; if (remainingCount > 0) { Logger.info( `${remainingCount} work items still pending for collection run ${collectionRunId}, creating another batch job`, ); await this.jobQueue.createJob( "process_pending_work_items", { collectionRunId, batchSize }, collectionRunId, 1, ); } } private async processGitHubBatch(job: Job): Promise<void> { const payload = JSON.parse(job.payload); const { collectionRunId, batchData, chunkSize = 10 } = payload; Logger.info( `Processing GitHub batch for collection run ${collectionRunId} with chunk size ${chunkSize}`, ); if (!batchData) { Logger.error( `GitHub batch processing failed: batchData is undefined for collection run ${collectionRunId}`, ); return; } const { issues, files } = batchData; const allItems = [ ...issues.map((issue: any) => ({ type: "github_issue", id: issue.number.toString(), data: issue, })), ...files.map((file: any) => ({ type: "github_file", id: file.path, data: file, })), ]; for (let i = 0; i < allItems.length; i += chunkSize) { const chunk = allItems.slice(i, i + chunkSize); for (const item of chunk) { await this.jobQueue.createJob( "process_batch_item", { collectionRunId, itemType: item.type, itemId: item.id, itemData: item.data, }, collectionRunId, 3, ); } Logger.lazyDebug( () => `Created ${chunk.length} processing jobs for chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(allItems.length / chunkSize)}`, ); } Logger.info( `Created ${allItems.length} individual processing jobs from batch (${issues.length} issues, ${files.length} files)`, ); } private async processForumBatch(job: Job): Promise<void> { const payload = JSON.parse(job.payload); const { collectionRunId, batchData, chunkSize = 10 } = payload; Logger.info( `Processing forum batch for collection run ${collectionRunId} with chunk size ${chunkSize}`, ); if (!batchData) { Logger.info( `Forum batch processing failed: batchData is undefined for collection run ${collectionRunId}`, ); return; } const { posts } = batchData; for (let i = 0; i < posts.length; i += chunkSize) { const chunk = posts.slice(i, i + chunkSize); for (const item of chunk) { await this.jobQueue.createJob( "process_batch_item", { collectionRunId, itemType: item.type, itemId: item.id, itemData: item.data, }, collectionRunId, 3, ); } Logger.lazyDebug( () => `Created ${chunk.length} processing jobs for chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(posts.length / chunkSize)}`, ); } Logger.info( `Created ${posts.length} individual processing jobs from forum batch`, ); } private async processBatchItem(job: Job): Promise<void> { const payload = JSON.parse(job.payload); const { collectionRunId, itemType, itemId, itemData } = payload; Logger.info(`Processing batch item ${itemId} (${itemType})`); try { const startTime = Date.now(); let document; const tempWorkItem = { id: 0, collectionRunId: collectionRunId, itemType: itemType, itemId: itemId, status: "processing", sourceData: JSON.stringify(itemData), retryCount: 0, createdAt: new Date().toISOString(), } as WorkItem; switch (itemType) { case "github_issue": Logger.lazyDebug(() => `Processing GitHub issue: ${itemData.title}`); document = await this.processGitHubIssue(itemData, tempWorkItem); break; case "github_file": Logger.lazyDebug(() => `Processing GitHub file: ${itemData.path}`); document = await this.processGitHubFile(itemData, tempWorkItem); break; case "forum_post": Logger.lazyDebug(() => `Processing forum post: ${itemData.post.id}`); document = await this.processForumPost(itemData, tempWorkItem); break; default: throw new Error(`Unknown batch item type: ${itemType}`); } if (document === null) { const duration = Date.now() - startTime; Logger.info( `Skipped batch item ${itemId} (${itemType}) - filtered out in ${duration}ms`, ); return; } Logger.lazyDebug(() => `Generating embeddings for batch item ${itemId}`); const embedded = await this.embeddingGenerator.batchProcess([document]); Logger.lazyDebug(() => `Storing embeddings for batch item ${itemId}`); const documentContentSize = new TextEncoder().encode( document.content, ).length; Logger.lazyDebug( () => `Document content size before storage: ${documentContentSize} bytes (${document.content.length} chars)`, ); if (!this.vectorStore) { this.vectorStore = await getVectorStore(this.env); } await this.vectorStore.store(embedded); const duration = Date.now() - startTime; Logger.info( `Completed batch item ${itemId} (${itemType}) in ${duration}ms`, ); } catch (error) { const errorMessage = error instanceof Error ? error.message : "Unknown error"; Logger.error(`Batch item ${itemId} processing failed:`, { error: errorMessage, stack: error instanceof Error ? error.stack : undefined, itemType, itemId, }); throw error; } } private async filterExistingFiles(files: any[]): Promise<any[]> { if (files.length === 0) { return files; } const fileIds = files.map((file) => IdUtils.ensureSafeId( `github_file_${file.path.replace(/[^a-zA-Z0-9]/g, "_")}`, ), ); Logger.lazyDebug( () => `🔍 SQL Debug: Checking existence of ${fileIds.length} files using batched queries (SQLite limit: 250 variables)`, ); const BATCH_SIZE = 100; const existingIds = new Set<string>(); for (let i = 0; i < fileIds.length; i += BATCH_SIZE) { const batch = fileIds.slice(i, i + BATCH_SIZE); Logger.lazyDebug( () => `🔍 SQL Debug: Processing batch ${Math.floor(i / BATCH_SIZE) + 1}/${Math.ceil(fileIds.length / BATCH_SIZE)} with ${batch.length} variables`, ); // TODO: replace this with Vectorize const existingDocs = { results: [] }; // const existingDocs = await this.db // .prepare( // ` // SELECT id FROM documents WHERE id IN (${batch.map(() => "?").join(",")}) // `, // ) // .bind(...batch) // .all(); (existingDocs.results || []).forEach((doc: any) => existingIds.add(doc.id), ); } const newFiles = files.filter((_file, index) => { const fileId = fileIds[index]; return !existingIds.has(fileId); }); Logger.info( `📊 File existence check: ${files.length} total files, ${existingIds.size} already exist, ${newFiles.length} new files to process`, ); return newFiles; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seratch/openai-sdk-knowledge-org'

If you have feedback or need assistance with the MCP directory API, please join our Discord server