Skip to main content
Glama
index.ts21.3 kB
// ============================================================================= // kivv - Automation Worker with Checkpointed Cron Execution // ============================================================================= // Daily automation workflow: // 1. Fetch each user's topics from database // 2. Search arXiv for papers matching topics (last 24 hours) // 3. Summarize papers using two-stage AI (Haiku triage + Sonnet summary) // 4. Store papers and summaries in database // 5. Use checkpoints to handle failures gracefully // ============================================================================= import { Env, User, Topic, Paper } from '../../shared/types'; import { ArxivClient, ArxivQueryBuilder } from '../../shared/arxiv-client'; import { SummarizationClient } from '../../shared/summarization'; import { hashContent, formatDate } from '../../shared/utils'; // ============================================================================= // Configuration Constants // ============================================================================= const BATCH_SIZE = 8; // Papers per run to stay under 30s cron timeout // ============================================================================= // Types & Interfaces // ============================================================================= /** * Checkpoint structure for resumable cron execution * Stored in KV with key: checkpoint:automation:{date} */ interface Checkpoint { date: string; // YYYY-MM-DD users_processed: number; // Count of users completed papers_found: number; // Total papers found from arXiv papers_summarized: number; // Total papers successfully summarized papers_skipped: number; // Papers that were irrelevant papers_processed_this_run: number; // Track batch progress within current run total_cost: number; // Total AI cost in USD errors: string[]; // Array of error messages last_user_id?: number; // Last successfully processed user ID (for resume) last_paper_arxiv_id?: string; // For resuming within a user's papers completed: boolean; // True when all users processed } /** * Result from processing a single user */ interface UserProcessingResult { papers_found: number; papers_summarized: number; papers_skipped: number; cost: number; batch_exhausted: boolean; // True if we hit batch limit last_arxiv_id?: string; // Last paper arxiv_id processed } /** * Result from running automation */ interface AutomationResult { batch_complete: boolean; // True if batch limit reached total_complete: boolean; // True if all users and papers done checkpoint: Checkpoint; } // ============================================================================= // Cloudflare Workers Export // ============================================================================= export default { /** * Scheduled cron handler - runs daily at 6 AM UTC * Triggered automatically by Cloudflare cron */ async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> { console.log('[CRON] Starting daily automation at', new Date().toISOString()); try { await runAutomation(env); } catch (error) { console.error('[CRON] Fatal error:', error); // Re-throw to mark the cron run as failed in Cloudflare dashboard throw error; } }, /** * Manual trigger endpoint (for testing and debugging) * Requires authentication via Authorization header */ async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> { const url = new URL(request.url); // Health check endpoint if (url.pathname === '/health') { return new Response(JSON.stringify({ status: 'ok', service: 'kivv-automation', timestamp: new Date().toISOString() }), { status: 200, headers: { 'Content-Type': 'application/json' } }); } // Status endpoint - check current checkpoint if (url.pathname === '/status') { const today = formatDate(new Date()); const checkpointKey = `checkpoint:automation:${today}`; const checkpoint = await loadCheckpoint(env, checkpointKey); return new Response(JSON.stringify({ status: 'ok', checkpoint: checkpoint || { message: 'No checkpoint for today' }, timestamp: new Date().toISOString() }), { status: 200, headers: { 'Content-Type': 'application/json' } }); } // Manual trigger endpoint if (url.pathname === '/run' && request.method === 'POST') { // Only allow cron or manual trigger with secret const cronHeader = request.headers.get('cf-cron'); const authHeader = request.headers.get('authorization'); const cronSecret = env.CRON_SECRET || 'test-secret'; if (!cronHeader && authHeader !== `Bearer ${cronSecret}`) { return new Response(JSON.stringify({ error: 'Forbidden', message: 'Invalid or missing authorization' }), { status: 403, headers: { 'Content-Type': 'application/json' } }); } try { console.log('[MANUAL] Manual automation run triggered'); const result = await runAutomation(env); return new Response(JSON.stringify({ success: true, message: result.total_complete ? 'All papers processed for today' : `Batch complete (${BATCH_SIZE} papers)`, batch_complete: result.batch_complete, total_complete: result.total_complete, checkpoint: { users_processed: result.checkpoint.users_processed, papers_found: result.checkpoint.papers_found, papers_summarized: result.checkpoint.papers_summarized, papers_skipped: result.checkpoint.papers_skipped, papers_processed_this_run: result.checkpoint.papers_processed_this_run, total_cost: result.checkpoint.total_cost, completed: result.checkpoint.completed }, timestamp: new Date().toISOString() }), { status: 200, headers: { 'Content-Type': 'application/json' } }); } catch (error) { console.error('[MANUAL] Error:', error); return new Response(JSON.stringify({ success: false, error: error instanceof Error ? error.message : 'Unknown error', timestamp: new Date().toISOString() }), { status: 500, headers: { 'Content-Type': 'application/json' } }); } } // Default response return new Response('kivv Automation Worker\n\nEndpoints:\n- GET /health - Health check\n- POST /run - Manual trigger (requires auth)', { status: 200, headers: { 'Content-Type': 'text/plain' } }); } }; // ============================================================================= // Main Automation Workflow // ============================================================================= /** * Main automation workflow with checkpoint support * Processes all active users and their topics */ async function runAutomation(env: Env): Promise<AutomationResult> { const today = formatDate(new Date()); const checkpointKey = `checkpoint:automation:${today}`; // Load or create checkpoint const checkpoint: Checkpoint = await loadCheckpoint(env, checkpointKey) || { date: today, users_processed: 0, papers_found: 0, papers_summarized: 0, papers_skipped: 0, papers_processed_this_run: 0, total_cost: 0, errors: [], completed: false }; // Reset papers_processed_this_run for new execution checkpoint.papers_processed_this_run = 0; // If already completed today, skip if (checkpoint.completed) { console.log('[AUTOMATION] Already completed for today, skipping'); return { batch_complete: true, total_complete: true, checkpoint }; } // Get all active users const users = await env.DB .prepare('SELECT * FROM users WHERE is_active = 1 ORDER BY id') .all<User>(); if (!users.results || users.results.length === 0) { console.log('[AUTOMATION] No active users found'); checkpoint.completed = true; await saveCheckpoint(env, checkpointKey, checkpoint); return { batch_complete: true, total_complete: true, checkpoint }; } console.log(`[AUTOMATION] Processing ${users.results.length} users (batch limit: ${BATCH_SIZE} papers)`); // Initialize clients const arxivClient = new ArxivClient(); const summarizationClient = new SummarizationClient(env.CLAUDE_API_KEY); let batchExhausted = false; // Process each user for (const user of users.results) { // Skip if already processed (checkpoint resume) but not the last user we were working on if (checkpoint.last_user_id && user.id < checkpoint.last_user_id) { console.log(`[USER:${user.username}] Already processed, skipping`); continue; } // If resuming from this user, clear the last_paper_arxiv_id for fresh start on next user const resumeFromPaper = (checkpoint.last_user_id === user.id) ? checkpoint.last_paper_arxiv_id : undefined; try { console.log(`[USER:${user.username}] Starting processing...`); if (resumeFromPaper) { console.log(`[USER:${user.username}] Resuming from paper: ${resumeFromPaper}`); } const batchRemaining = BATCH_SIZE - checkpoint.papers_processed_this_run; const result = await processUser( env, user, arxivClient, summarizationClient, batchRemaining, resumeFromPaper ); // Update checkpoint - only increment users_processed if we finished this user completely if (!result.batch_exhausted) { checkpoint.users_processed++; checkpoint.last_paper_arxiv_id = undefined; // Clear paper tracking when user is done } else { checkpoint.last_paper_arxiv_id = result.last_arxiv_id; } checkpoint.papers_found += result.papers_found; checkpoint.papers_summarized += result.papers_summarized; checkpoint.papers_skipped += result.papers_skipped; checkpoint.papers_processed_this_run += (result.papers_summarized + result.papers_skipped); checkpoint.total_cost += result.cost; checkpoint.last_user_id = user.id; await saveCheckpoint(env, checkpointKey, checkpoint); console.log(`[USER:${user.username}] Completed: ${result.papers_found} found, ${result.papers_summarized} summarized, ${result.papers_skipped} skipped, $${result.cost.toFixed(4)} cost`); // Check if batch is exhausted if (result.batch_exhausted) { console.log(`[BATCH] Batch limit reached (${BATCH_SIZE} papers processed this run)`); batchExhausted = true; break; } // Check budget circuit breaker if (checkpoint.total_cost >= 1.0) { console.warn('[BUDGET] Daily budget ($1.00) exceeded, stopping automation'); checkpoint.errors.push(`Budget exceeded at $${checkpoint.total_cost.toFixed(4)}`); await saveCheckpoint(env, checkpointKey, checkpoint); break; } } catch (error) { const errorMsg = error instanceof Error ? error.message : 'Unknown error'; console.error(`[USER:${user.username}] Error:`, error); checkpoint.errors.push(`${user.username}: ${errorMsg}`); checkpoint.last_user_id = user.id; // Mark as processed even on error checkpoint.last_paper_arxiv_id = undefined; // Clear paper tracking on error await saveCheckpoint(env, checkpointKey, checkpoint); continue; // Continue with next user } } // Mark as completed only if we processed all users and didn't hit batch limit if (!batchExhausted) { checkpoint.completed = true; await saveCheckpoint(env, checkpointKey, checkpoint); } // Final summary console.log('[AUTOMATION] ===== SUMMARY ====='); console.log(`[AUTOMATION] Users processed: ${checkpoint.users_processed}`); console.log(`[AUTOMATION] Papers found: ${checkpoint.papers_found}`); console.log(`[AUTOMATION] Papers summarized: ${checkpoint.papers_summarized}`); console.log(`[AUTOMATION] Papers skipped: ${checkpoint.papers_skipped}`); console.log(`[AUTOMATION] Papers this run: ${checkpoint.papers_processed_this_run}`); console.log(`[AUTOMATION] Total cost: $${checkpoint.total_cost.toFixed(4)}`); console.log(`[AUTOMATION] Batch exhausted: ${batchExhausted}`); console.log(`[AUTOMATION] Completed: ${checkpoint.completed}`); console.log(`[AUTOMATION] Errors: ${checkpoint.errors.length}`); if (checkpoint.errors.length > 0) { console.log('[AUTOMATION] Error details:', checkpoint.errors); } console.log('[AUTOMATION] =================='); // Cleanup old checkpoints (only if fully completed) if (checkpoint.completed) { await cleanupOldCheckpoints(env); } return { batch_complete: batchExhausted || checkpoint.completed, total_complete: checkpoint.completed, checkpoint }; } // ============================================================================= // User Processing // ============================================================================= /** * Process a single user: fetch topics, search arXiv, summarize, store */ async function processUser( env: Env, user: User, arxivClient: ArxivClient, summarizationClient: SummarizationClient, batchRemaining: number, resumeFromPaperId?: string ): Promise<UserProcessingResult> { // Get user's enabled topics const topics = await env.DB .prepare('SELECT * FROM topics WHERE user_id = ? AND enabled = 1') .bind(user.id) .all<Topic>(); if (!topics.results || topics.results.length === 0) { console.log(`[USER:${user.username}] No enabled topics configured`); return { papers_found: 0, papers_summarized: 0, papers_skipped: 0, cost: 0, batch_exhausted: false }; } console.log(`[USER:${user.username}] Processing ${topics.results.length} topics`); // Build combined search query (OR all topics) const queryBuilder = new ArxivQueryBuilder(); const topicNames: string[] = []; for (const topic of topics.results) { // Parse the arxiv_query to extract search terms // For now, we'll use the arxiv_query directly as it's already formatted topicNames.push(topic.topic_name); } // Combine all topic queries with OR const combinedQuery = topics.results .map(t => `(${t.arxiv_query})`) .join(' OR '); console.log(`[USER:${user.username}] Search query: ${combinedQuery}`); // Search arXiv for papers from last 24 hours const papers = await arxivClient.search({ query: combinedQuery, maxResults: 100, // Process up to 100 papers per user per day sortBy: 'submittedDate', sortOrder: 'descending' }); console.log(`[USER:${user.username}] Found ${papers.length} papers from arXiv`); let papersProcessed = 0; let papersSummarized = 0; let papersSkipped = 0; let totalCost = 0; let shouldSkip = resumeFromPaperId !== undefined; let lastArxivId: string | undefined; // Process each paper for (const paper of papers) { // Skip papers until we find the resume point if (shouldSkip) { if (paper.arxiv_id === resumeFromPaperId) { console.log(`[PAPER:${paper.arxiv_id}] Found resume point, continuing from next paper`); shouldSkip = false; } continue; } // Check batch limit BEFORE processing each paper if (papersProcessed >= batchRemaining) { console.log(`[USER:${user.username}] Batch limit reached, stopping at paper ${paper.arxiv_id}`); return { papers_found: papers.length, papers_summarized: papersSummarized, papers_skipped: papersSkipped, cost: totalCost, batch_exhausted: true, last_arxiv_id: lastArxivId }; } try { lastArxivId = paper.arxiv_id; // Check if paper already exists in database const existing = await env.DB .prepare('SELECT id FROM papers WHERE arxiv_id = ?') .bind(paper.arxiv_id) .first(); if (existing) { console.log(`[PAPER:${paper.arxiv_id}] Already exists in database, skipping`); // Create user_paper_status entry if it doesn't exist const userStatus = await env.DB .prepare('SELECT 1 FROM user_paper_status WHERE user_id = ? AND paper_id = ?') .bind(user.id, existing.id) .first(); if (!userStatus) { await env.DB .prepare(` INSERT INTO user_paper_status (user_id, paper_id, explored, bookmarked, created_at) VALUES (?, ?, 0, 0, ?) `) .bind(user.id, existing.id, new Date().toISOString()) .run(); console.log(`[PAPER:${paper.arxiv_id}] Created user_paper_status entry`); } // Already existing papers count toward batch limit (they use time) papersProcessed++; continue; } // Summarize paper using two-stage AI const result = await summarizationClient.summarize( paper.title, paper.abstract, topicNames, 0.5 // relevance threshold (lowered for testing) ); totalCost += result.total_cost; // Skip if irrelevant (failed triage) if (!result.summary) { console.log(`[PAPER:${paper.arxiv_id}] Skipped (${result.skipped_reason})`); papersSkipped++; papersProcessed++; continue; } // Store paper in database with summary await env.DB .prepare(` INSERT INTO papers (arxiv_id, title, authors, abstract, categories, published_date, pdf_url, summary, summary_generated_at, summary_model, relevance_score, content_hash, collected_for_user_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) .bind( paper.arxiv_id, paper.title, paper.authors, paper.abstract, paper.categories, paper.published_date, paper.pdf_url, result.summary, new Date().toISOString(), 'claude-3-5-sonnet-20241022', result.relevance_score, result.content_hash, user.id, new Date().toISOString() ) .run(); // Get the inserted paper ID const insertedPaper = await env.DB .prepare('SELECT id FROM papers WHERE arxiv_id = ?') .bind(paper.arxiv_id) .first<{ id: number }>(); if (!insertedPaper) { throw new Error(`Failed to retrieve inserted paper ${paper.arxiv_id}`); } // Create user_paper_status entry await env.DB .prepare(` INSERT INTO user_paper_status (user_id, paper_id, explored, bookmarked, created_at) VALUES (?, ?, 0, 0, ?) `) .bind(user.id, insertedPaper.id, new Date().toISOString()) .run(); papersProcessed++; papersSummarized++; console.log(`[PAPER:${paper.arxiv_id}] Stored with summary (relevance: ${result.relevance_score.toFixed(2)})`); } catch (error) { console.error(`[PAPER:${paper.arxiv_id}] Error:`, error); // Continue processing other papers continue; } } return { papers_found: papers.length, papers_summarized: papersSummarized, papers_skipped: papersSkipped, cost: totalCost, batch_exhausted: false, last_arxiv_id: lastArxivId }; } // ============================================================================= // Checkpoint Management // ============================================================================= /** * Load checkpoint from KV storage */ async function loadCheckpoint(env: Env, key: string): Promise<Checkpoint | null> { try { const data = await env.CACHE.get(key); if (!data) return null; const checkpoint = JSON.parse(data) as Checkpoint; console.log(`[CHECKPOINT] Loaded: ${checkpoint.users_processed} users processed, $${checkpoint.total_cost.toFixed(4)} spent`); return checkpoint; } catch (error) { console.error('[CHECKPOINT] Failed to load:', error); return null; } } /** * Save checkpoint to KV storage */ async function saveCheckpoint(env: Env, key: string, checkpoint: Checkpoint): Promise<void> { try { await env.CACHE.put(key, JSON.stringify(checkpoint), { expirationTtl: 7 * 24 * 60 * 60 // 7 days TTL }); console.log(`[CHECKPOINT] Saved: ${checkpoint.users_processed} users, $${checkpoint.total_cost.toFixed(4)} cost`); } catch (error) { console.error('[CHECKPOINT] Failed to save:', error); } } /** * Cleanup old checkpoints (older than 7 days) * KV automatically expires keys based on expirationTtl, but we can manually clean up if needed */ async function cleanupOldCheckpoints(env: Env): Promise<void> { // KV automatically expires keys after 7 days (set in expirationTtl) // No manual cleanup needed for now console.log('[CLEANUP] Old checkpoints will auto-expire after 7 days'); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jeffaf/kivv'

If you have feedback or need assistance with the MCP directory API, please join our Discord server