Skip to main content
Glama
agent-cfg-outbox.worker.ts7.96 kB
import { setTimeout as delay } from 'node:timers/promises'; import { resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; import { DatabaseConfigService, logger, getGuardValue, DEFAULT_AGENT_CFG_REDIS_CHANNEL, } from '@snakagent/core'; import { metrics } from '@snakagent/metrics'; import { Postgres } from '../database.js'; import { RedisClient } from '../redis.js'; import { agentCfgOutbox } from '../queries/agent-cfg-outbox/queries.js'; interface AgentCfgOutboxRow { id: number; agent_id: string; cfg_version: number; event: string; claimed_at: Date | null; processed_at: Date | null; } export interface AgentCfgOutboxWorkerOptions { batchSize?: number; pollIntervalMs?: number; redisChannel?: string; } export class AgentCfgOutboxWorker { private readonly batchSize: number; private readonly pollIntervalMs: number; private readonly redisChannel: string; private running = false; private initialized = false; private initializing: Promise<void> | null = null; private disposed = false; private readonly redisClient = RedisClient.getInstance(); constructor(options?: AgentCfgOutboxWorkerOptions) { const guardBatchSize = getGuardValue( 'agent_cfg_worker.batch_size' ) as number; const guardPollInterval = getGuardValue( 'agent_cfg_worker.poll_interval_ms' ) as number; this.batchSize = options?.batchSize ?? guardBatchSize; this.pollIntervalMs = options?.pollIntervalMs ?? guardPollInterval; this.redisChannel = options?.redisChannel ?? process.env.AGENT_CFG_WORKER_REDIS_CHANNEL ?? DEFAULT_AGENT_CFG_REDIS_CHANNEL; } public async start(): Promise<void> { if (this.running) { logger.warn('AgentCfgOutboxWorker already running'); return; } await this.ensureInitialized(); this.running = true; logger.info('AgentCfgOutboxWorker started'); while (this.running) { let processed = 0; try { processed = await this.processBatch(); } catch (error) { logger.error('AgentCfgOutboxWorker encountered an unexpected error', { error, }); metrics.recordAgentCfgOutboxError('unexpected'); } if (!this.running) { break; } if (processed === 0) { await delay(this.pollIntervalMs); } } logger.info('AgentCfgOutboxWorker stopped'); await this.dispose(); } public async stop(): Promise<void> { this.running = false; } private async ensureInitialized(): Promise<void> { if (this.initialized) { return; } if (this.initializing) { return this.initializing; } this.initializing = this.initialize(); try { await this.initializing; this.initialized = true; } finally { this.initializing = null; } } private async initialize(): Promise<void> { try { if (!DatabaseConfigService.getInstance().isInitialized()) { DatabaseConfigService.getInstance().initialize(); } const credentials = DatabaseConfigService.getInstance().getCredentials(); await Postgres.connect(credentials); await this.redisClient.connect(); this.disposed = false; logger.info('AgentCfgOutboxWorker initialized connections'); } catch (error) { logger.error('Failed to initialize AgentCfgOutboxWorker', { error }); throw error; } } private async processBatch(): Promise<number> { const rows = await this.fetchAndMarkBatch(); if (rows.length === 0) { return 0; } const redis = this.redisClient.getClient(); const processedPerEvent = new Map<string, number>(); const processedRows: Array<{ id: number; event: string }> = []; for (const row of rows) { try { const processedAtDate = this.parseTimestamp(row.processed_at) ?? new Date(); const payload = JSON.stringify({ outbox_id: row.id, agent_id: row.agent_id, cfg_version: row.cfg_version, event: row.event, processed_at: processedAtDate.toISOString(), }); await redis.publish(this.redisChannel, payload); processedPerEvent.set( row.event, (processedPerEvent.get(row.event) ?? 0) + 1 ); processedRows.push({ id: row.id, event: row.event }); } catch (error) { logger.error( `Failed to publish agent_cfg_outbox event ${row.id} to Redis`, { error } ); metrics.recordAgentCfgOutboxError(row.event); await this.requeue(row.id, row.event); } } if (processedRows.length > 0) { try { await agentCfgOutbox.markProcessed(processedRows.map((row) => row.id)); for (const [event, count] of processedPerEvent.entries()) { metrics.recordAgentCfgOutboxProcessed(count, event); } } catch (error) { logger.error('Failed to mark agent_cfg_outbox rows as processed', { error, rows: processedRows, }); for (const row of processedRows) { await this.requeue(row.id, row.event); } } } return Array.from(processedPerEvent.values()).reduce( (total, value) => total + value, 0 ); } private async fetchAndMarkBatch(): Promise<AgentCfgOutboxRow[]> { try { return await metrics.dbResponseTime( 'agent_cfg_outbox_fetch', async () => { return agentCfgOutbox.fetchAndMarkBatch(this.batchSize); } ); } catch (error) { logger.error('Failed to fetch agent_cfg_outbox batch', { error: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, errorType: error?.constructor?.name || typeof error, }); metrics.recordAgentCfgOutboxError('fetch'); return []; } } private async requeue(id: number, event: string): Promise<void> { try { await metrics.dbResponseTime('agent_cfg_outbox_requeue', async () => { await agentCfgOutbox.requeue(id); return true; }); metrics.recordAgentCfgOutboxRequeued(event); } catch (error) { logger.error(`Failed to requeue agent_cfg_outbox event ${id}`, { error, }); metrics.recordAgentCfgOutboxError('requeue'); } } private async dispose(): Promise<void> { if (this.disposed || !this.initialized) { return; } this.disposed = true; this.initialized = false; try { await this.redisClient.shutdown(); } catch (error) { logger.error('Failed to shutdown Redis client', { error }); } try { await Postgres.shutdown(); } catch (error) { logger.error('Failed to shutdown Postgres pool', { error }); } } private parseTimestamp(value: unknown): Date | null { if (value == null) { return null; } if (value instanceof Date) { return value; } if (typeof value === 'string') { const parsed = new Date(value); return Number.isNaN(parsed.getTime()) ? null : parsed; } return null; } } async function bootstrap(): Promise<void> { const worker = new AgentCfgOutboxWorker(); let runningPromise: Promise<void> | null = null; const shutdown = async () => { await worker.stop(); if (runningPromise) { await runningPromise; } process.exit(0); }; process.once('SIGINT', shutdown); process.once('SIGTERM', shutdown); runningPromise = worker.start(); await runningPromise; } const isDirectRun = typeof process !== 'undefined' && typeof process.argv[1] === 'string' && fileURLToPath(import.meta.url) === resolve(process.argv[1]); if (isDirectRun) { bootstrap().catch(async (error) => { logger.error('AgentCfgOutboxWorker failed to start', { error }); metrics.recordAgentCfgOutboxError('startup'); process.exit(1); }); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KasarLabs/snak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server