Skip to main content
Glama

IT-MCP

by acampkin95
commandQueue.ts12.9 kB
import Database from "better-sqlite3"; import { join } from "node:path"; import { logger } from "../utils/logger.js"; export interface QueuedCommand { readonly jobId: string; readonly toolName: string; readonly params: Record<string, unknown>; readonly requestedCapabilities: readonly string[]; readonly targetAgentId?: string; readonly status: "queued" | "picked" | "executing" | "completed" | "failed" | "timeout"; readonly priority: "low" | "normal" | "high" | "urgent"; readonly createdAt: string; readonly pickedAt?: string; readonly completedAt?: string; readonly result?: Record<string, unknown>; readonly error?: string; readonly retryCount: number; readonly maxRetries: number; } export interface QueueStats { readonly totalQueued: number; readonly totalPicked: number; readonly totalExecuting: number; readonly totalCompleted: number; readonly totalFailed: number; readonly oldestQueued?: string; readonly averageWaitTimeMs?: number; } /** * CommandQueueService manages local command queue with SQLite persistence. * * **Purpose**: * - Offline resilience: Queue commands when PostgreSQL/network unavailable * - Retry logic: Automatic retry with exponential backoff * - Priority handling: Urgent commands processed first * - Audit trail: Track command lifecycle from queue → completion * * **Schema**: * ```sql * CREATE TABLE command_queue ( * job_id TEXT PRIMARY KEY, * tool_name TEXT NOT NULL, * params TEXT NOT NULL, -- JSON * requested_capabilities TEXT NOT NULL, -- JSON array * target_agent_id TEXT, * status TEXT NOT NULL, * priority TEXT NOT NULL, * created_at TEXT NOT NULL, * picked_at TEXT, * completed_at TEXT, * result TEXT, -- JSON * error TEXT, * retry_count INTEGER DEFAULT 0, * max_retries INTEGER DEFAULT 3 * ); * ``` */ export class CommandQueueService { private readonly db: Database.Database; private readonly dbPath: string; public constructor(dbPath: string = join(process.cwd(), "mcp_command_queue.db")) { this.dbPath = dbPath; this.db = new Database(dbPath); this.db.pragma("journal_mode = WAL"); this.initialize(); } private initialize(): void { const schema = ` CREATE TABLE IF NOT EXISTS command_queue ( job_id TEXT PRIMARY KEY, tool_name TEXT NOT NULL, params TEXT NOT NULL, requested_capabilities TEXT NOT NULL, target_agent_id TEXT, status TEXT NOT NULL CHECK(status IN ('queued', 'picked', 'executing', 'completed', 'failed', 'timeout')), priority TEXT NOT NULL CHECK(priority IN ('low', 'normal', 'high', 'urgent')) DEFAULT 'normal', created_at TEXT NOT NULL, picked_at TEXT, completed_at TEXT, result TEXT, error TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3 ); CREATE INDEX IF NOT EXISTS idx_queue_status ON command_queue(status, priority DESC, created_at ASC); CREATE INDEX IF NOT EXISTS idx_queue_agent ON command_queue(target_agent_id, status); CREATE INDEX IF NOT EXISTS idx_queue_created ON command_queue(created_at); `; this.db.exec(schema); logger.info("CommandQueueService initialized", { dbPath: this.dbPath }); } /** * Enqueue a new command for execution */ public enqueue(command: { readonly jobId: string; readonly toolName: string; readonly params: Record<string, unknown>; readonly requestedCapabilities: readonly string[]; readonly targetAgentId?: string; readonly priority?: "low" | "normal" | "high" | "urgent"; readonly maxRetries?: number; }): QueuedCommand { const queuedCommand: QueuedCommand = { jobId: command.jobId, toolName: command.toolName, params: command.params, requestedCapabilities: command.requestedCapabilities, targetAgentId: command.targetAgentId, status: "queued", priority: command.priority ?? "normal", createdAt: new Date().toISOString(), retryCount: 0, maxRetries: command.maxRetries ?? 3, }; const stmt = this.db.prepare(` INSERT INTO command_queue ( job_id, tool_name, params, requested_capabilities, target_agent_id, status, priority, created_at, retry_count, max_retries ) VALUES ( @jobId, @toolName, @params, @requestedCapabilities, @targetAgentId, @status, @priority, @createdAt, @retryCount, @maxRetries ) `); stmt.run({ jobId: queuedCommand.jobId, toolName: queuedCommand.toolName, params: JSON.stringify(queuedCommand.params), requestedCapabilities: JSON.stringify(queuedCommand.requestedCapabilities), targetAgentId: queuedCommand.targetAgentId ?? null, status: queuedCommand.status, priority: queuedCommand.priority, createdAt: queuedCommand.createdAt, retryCount: queuedCommand.retryCount, maxRetries: queuedCommand.maxRetries, }); logger.debug("Command enqueued", { jobId: queuedCommand.jobId, toolName: queuedCommand.toolName, priority: queuedCommand.priority, }); return queuedCommand; } /** * Dequeue next command for processing (priority-ordered) */ public dequeue(agentId?: string): QueuedCommand | null { const stmt = this.db.prepare(` SELECT * FROM command_queue WHERE status = 'queued' AND (target_agent_id IS NULL OR target_agent_id = ?) ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'high' THEN 2 WHEN 'normal' THEN 3 WHEN 'low' THEN 4 END, created_at ASC LIMIT 1 `); const row = stmt.get(agentId ?? null) as { job_id: string; tool_name: string; params: string; requested_capabilities: string; target_agent_id: string | null; status: string; priority: string; created_at: string; picked_at: string | null; completed_at: string | null; result: string | null; error: string | null; retry_count: number; max_retries: number; } | undefined; if (!row) { return null; } // Mark as picked const updateStmt = this.db.prepare(` UPDATE command_queue SET status = 'picked', picked_at = ? WHERE job_id = ? `); updateStmt.run(new Date().toISOString(), row.job_id); return this.rowToCommand(row); } /** * Update command status */ public updateStatus( jobId: string, status: "executing" | "completed" | "failed" | "timeout", result?: Record<string, unknown>, error?: string, ): void { const completedAt = status === "completed" || status === "failed" || status === "timeout" ? new Date().toISOString() : null; const stmt = this.db.prepare(` UPDATE command_queue SET status = ?, completed_at = ?, result = ?, error = ? WHERE job_id = ? `); stmt.run( status, completedAt, result ? JSON.stringify(result) : null, error ?? null, jobId, ); logger.debug("Command status updated", { jobId, status }); } /** * Retry failed command (increment retry count) */ public retryCommand(jobId: string): boolean { const command = this.getCommand(jobId); if (!command) { logger.warn("Cannot retry: command not found", { jobId }); return false; } if (command.retryCount >= command.maxRetries) { logger.warn("Cannot retry: max retries exceeded", { jobId, retryCount: command.retryCount, maxRetries: command.maxRetries, }); return false; } const stmt = this.db.prepare(` UPDATE command_queue SET status = 'queued', picked_at = NULL, completed_at = NULL, error = NULL, retry_count = retry_count + 1 WHERE job_id = ? `); stmt.run(jobId); logger.info("Command requeued for retry", { jobId, retryCount: command.retryCount + 1, }); return true; } /** * Get command by ID */ public getCommand(jobId: string): QueuedCommand | null { const stmt = this.db.prepare(` SELECT * FROM command_queue WHERE job_id = ? `); const row = stmt.get(jobId) as { job_id: string; tool_name: string; params: string; requested_capabilities: string; target_agent_id: string | null; status: string; priority: string; created_at: string; picked_at: string | null; completed_at: string | null; result: string | null; error: string | null; retry_count: number; max_retries: number; } | undefined; return row ? this.rowToCommand(row) : null; } /** * Submit a new command (alias for enqueue with auto job ID) */ public async submitCommand(command: { readonly toolName: string; readonly params: Record<string, unknown>; readonly requestedCapabilities: readonly string[]; readonly targetAgentId?: string; readonly priority?: "low" | "normal" | "high" | "urgent"; readonly maxRetries?: number; }): Promise<string> { const jobId = crypto.randomUUID(); this.enqueue({ jobId, ...command, }); return jobId; } /** * Get command by ID (alias for getCommand for consistency) */ public async getCommandById(jobId: string): Promise<QueuedCommand | null> { return this.getCommand(jobId); } /** * Mark command as failed with error message */ public async markCommandFailed(jobId: string, error: string): Promise<void> { this.updateStatus(jobId, "failed", undefined, error); } /** * Get queue statistics (async version for consistency) */ public async getQueueStats(): Promise<QueueStats> { return this.getStats(); } /** * Get queue statistics */ public getStats(): QueueStats { const stmt = this.db.prepare(` SELECT status, COUNT(*) as count, MIN(created_at) as oldest FROM command_queue GROUP BY status `); const rows = stmt.all() as Array<{ status: string; count: number; oldest: string }>; let totalQueued = 0; let totalPicked = 0; let totalExecuting = 0; let totalCompleted = 0; let totalFailed = 0; let oldestQueued: string | undefined; for (const row of rows) { switch (row.status) { case "queued": totalQueued = row.count; oldestQueued = row.oldest; break; case "picked": totalPicked = row.count; break; case "executing": totalExecuting = row.count; break; case "completed": totalCompleted = row.count; break; case "failed": case "timeout": totalFailed += row.count; break; } } return { totalQueued, totalPicked, totalExecuting, totalCompleted, totalFailed, oldestQueued, }; } /** * Purge completed/failed commands older than N days */ public purgeOldCommands(daysOld: number): number { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - daysOld); const stmt = this.db.prepare(` DELETE FROM command_queue WHERE status IN ('completed', 'failed', 'timeout') AND completed_at < ? `); const result = stmt.run(cutoffDate.toISOString()); logger.info("Purged old commands", { deleted: result.changes, daysOld }); return result.changes; } /** * Convert database row to QueuedCommand */ private rowToCommand(row: { job_id: string; tool_name: string; params: string; requested_capabilities: string; target_agent_id: string | null; status: string; priority: string; created_at: string; picked_at: string | null; completed_at: string | null; result: string | null; error: string | null; retry_count: number; max_retries: number; }): QueuedCommand { return { jobId: row.job_id, toolName: row.tool_name, params: JSON.parse(row.params), requestedCapabilities: JSON.parse(row.requested_capabilities), targetAgentId: row.target_agent_id ?? undefined, status: row.status as QueuedCommand["status"], priority: row.priority as QueuedCommand["priority"], createdAt: row.created_at, pickedAt: row.picked_at ?? undefined, completedAt: row.completed_at ?? undefined, result: row.result ? JSON.parse(row.result) : undefined, error: row.error ?? undefined, retryCount: row.retry_count, maxRetries: row.max_retries, }; } /** * Close database connection */ public close(): void { this.db.close(); logger.info("CommandQueueService closed"); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/acampkin95/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server