Skip to main content
Glama
lock-manager.ts6.43 kB
/** * Agent Synch MCP Server - Lock Manager * Handles file locking and queuing for concurrent agent access. */ import * as fs from 'fs/promises'; import * as path from 'path'; export interface LockInfo { resourceId: string; agentId: string; acquiredAt: string; expiresAt: string; operation: string; } export interface QueueEntry { agentId: string; resourceId: string; requestedAt: string; operation: string; resolve: (acquired: boolean) => void; } /** * LockManager - Handles concurrent access to shared resources. * Uses in-memory locks with file-based persistence for crash recovery. */ export class LockManager { private locks: Map<string, LockInfo> = new Map(); private queues: Map<string, QueueEntry[]> = new Map(); private lockDir: string; private defaultTimeoutMs: number; constructor(baseDir: string, timeoutMs: number = 30000) { this.lockDir = path.join(baseDir, '.locks'); this.defaultTimeoutMs = timeoutMs; } async initialize(): Promise<void> { await fs.mkdir(this.lockDir, { recursive: true }); // Recover any stale locks on startup await this.recoverLocks(); } /** * Acquire a lock on a resource. Returns immediately if available, * otherwise queues the request and waits. */ async acquireLock( resourceId: string, agentId: string, operation: string, timeoutMs?: number ): Promise<boolean> { const timeout = timeoutMs ?? this.defaultTimeoutMs; // Check if already locked const existingLock = this.locks.get(resourceId); if (existingLock) { // Check if expired if (new Date(existingLock.expiresAt) < new Date()) { await this.releaseLock(resourceId, existingLock.agentId); } else if (existingLock.agentId === agentId) { // Same agent, extend lock return this.extendLock(resourceId, agentId, timeout); } else { // Queue this request return this.queueForLock(resourceId, agentId, operation, timeout); } } // Acquire the lock return this.doAcquireLock(resourceId, agentId, operation, timeout); } private async doAcquireLock( resourceId: string, agentId: string, operation: string, timeoutMs: number ): Promise<boolean> { const now = new Date(); const expiresAt = new Date(now.getTime() + timeoutMs); const lockInfo: LockInfo = { resourceId, agentId, acquiredAt: now.toISOString(), expiresAt: expiresAt.toISOString(), operation, }; this.locks.set(resourceId, lockInfo); await this.persistLock(lockInfo); return true; } private async queueForLock( resourceId: string, agentId: string, operation: string, timeoutMs: number ): Promise<boolean> { return new Promise((resolve) => { const queue = this.queues.get(resourceId) || []; const entry: QueueEntry = { agentId, resourceId, requestedAt: new Date().toISOString(), operation, resolve, }; queue.push(entry); this.queues.set(resourceId, queue); // Set timeout for queue entry setTimeout(() => { const q = this.queues.get(resourceId) || []; const idx = q.findIndex((e) => e.agentId === agentId && e.requestedAt === entry.requestedAt); if (idx !== -1) { q.splice(idx, 1); this.queues.set(resourceId, q); resolve(false); // Timeout } }, timeoutMs); }); } /** * Release a lock and process the queue. */ async releaseLock(resourceId: string, agentId: string): Promise<boolean> { const lock = this.locks.get(resourceId); if (!lock || lock.agentId !== agentId) { return false; } this.locks.delete(resourceId); await this.removeLockFile(resourceId); // Process queue await this.processQueue(resourceId); return true; } private async processQueue(resourceId: string): Promise<void> { const queue = this.queues.get(resourceId) || []; if (queue.length === 0) return; const next = queue.shift()!; this.queues.set(resourceId, queue); const acquired = await this.doAcquireLock( next.resourceId, next.agentId, next.operation, this.defaultTimeoutMs ); next.resolve(acquired); } private extendLock(resourceId: string, agentId: string, timeoutMs: number): boolean { const lock = this.locks.get(resourceId); if (!lock || lock.agentId !== agentId) return false; lock.expiresAt = new Date(Date.now() + timeoutMs).toISOString(); this.locks.set(resourceId, lock); return true; } /** * Get current lock status for a resource. */ getLockStatus(resourceId: string): LockInfo | null { const lock = this.locks.get(resourceId); if (lock && new Date(lock.expiresAt) < new Date()) { this.releaseLock(resourceId, lock.agentId); return null; } return lock || null; } /** * Get queue length for a resource. */ getQueueLength(resourceId: string): number { return (this.queues.get(resourceId) || []).length; } /** * Get all active locks. */ getAllLocks(): LockInfo[] { return Array.from(this.locks.values()).filter( (lock) => new Date(lock.expiresAt) >= new Date() ); } // --- Persistence --- private lockFilePath(resourceId: string): string { const sanitized = resourceId.replace(/[^a-zA-Z0-9-_]/g, '_'); return path.join(this.lockDir, `${sanitized}.lock`); } private async persistLock(lock: LockInfo): Promise<void> { await fs.writeFile(this.lockFilePath(lock.resourceId), JSON.stringify(lock, null, 2)); } private async removeLockFile(resourceId: string): Promise<void> { try { await fs.unlink(this.lockFilePath(resourceId)); } catch { // Ignore if file doesn't exist } } private async recoverLocks(): Promise<void> { try { const files = await fs.readdir(this.lockDir); for (const file of files) { if (file.endsWith('.lock')) { const data = await fs.readFile(path.join(this.lockDir, file), 'utf-8'); const lock = JSON.parse(data) as LockInfo; if (new Date(lock.expiresAt) >= new Date()) { this.locks.set(lock.resourceId, lock); } else { await fs.unlink(path.join(this.lockDir, file)); } } } } catch { // Lock dir doesn't exist yet } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Mnehmos/mnehmos.synch.mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server