/**
* Agent Synch MCP Server - Lock Manager
* Handles file locking and queuing for concurrent agent access.
*/
import * as fs from 'fs/promises';
import * as path from 'path';
/**
* LockManager - Handles concurrent access to shared resources.
* Uses in-memory locks with file-based persistence for crash recovery.
*/
export class LockManager {
locks = new Map();
queues = new Map();
lockDir;
defaultTimeoutMs;
constructor(baseDir, timeoutMs = 30000) {
this.lockDir = path.join(baseDir, '.locks');
this.defaultTimeoutMs = timeoutMs;
}
async initialize() {
await fs.mkdir(this.lockDir, { recursive: true });
// Recover any stale locks on startup
await this.recoverLocks();
}
/**
* Acquire a lock on a resource. Returns immediately if available,
* otherwise queues the request and waits.
*/
async acquireLock(resourceId, agentId, operation, timeoutMs) {
const timeout = timeoutMs ?? this.defaultTimeoutMs;
// Check if already locked
const existingLock = this.locks.get(resourceId);
if (existingLock) {
// Check if expired
if (new Date(existingLock.expiresAt) < new Date()) {
await this.releaseLock(resourceId, existingLock.agentId);
}
else if (existingLock.agentId === agentId) {
// Same agent, extend lock
return this.extendLock(resourceId, agentId, timeout);
}
else {
// Queue this request
return this.queueForLock(resourceId, agentId, operation, timeout);
}
}
// Acquire the lock
return this.doAcquireLock(resourceId, agentId, operation, timeout);
}
async doAcquireLock(resourceId, agentId, operation, timeoutMs) {
const now = new Date();
const expiresAt = new Date(now.getTime() + timeoutMs);
const lockInfo = {
resourceId,
agentId,
acquiredAt: now.toISOString(),
expiresAt: expiresAt.toISOString(),
operation,
};
this.locks.set(resourceId, lockInfo);
await this.persistLock(lockInfo);
return true;
}
async queueForLock(resourceId, agentId, operation, timeoutMs) {
return new Promise((resolve) => {
const queue = this.queues.get(resourceId) || [];
const entry = {
agentId,
resourceId,
requestedAt: new Date().toISOString(),
operation,
resolve,
};
queue.push(entry);
this.queues.set(resourceId, queue);
// Set timeout for queue entry
setTimeout(() => {
const q = this.queues.get(resourceId) || [];
const idx = q.findIndex((e) => e.agentId === agentId && e.requestedAt === entry.requestedAt);
if (idx !== -1) {
q.splice(idx, 1);
this.queues.set(resourceId, q);
resolve(false); // Timeout
}
}, timeoutMs);
});
}
/**
* Release a lock and process the queue.
*/
async releaseLock(resourceId, agentId) {
const lock = this.locks.get(resourceId);
if (!lock || lock.agentId !== agentId) {
return false;
}
this.locks.delete(resourceId);
await this.removeLockFile(resourceId);
// Process queue
await this.processQueue(resourceId);
return true;
}
async processQueue(resourceId) {
const queue = this.queues.get(resourceId) || [];
if (queue.length === 0)
return;
const next = queue.shift();
this.queues.set(resourceId, queue);
const acquired = await this.doAcquireLock(next.resourceId, next.agentId, next.operation, this.defaultTimeoutMs);
next.resolve(acquired);
}
extendLock(resourceId, agentId, timeoutMs) {
const lock = this.locks.get(resourceId);
if (!lock || lock.agentId !== agentId)
return false;
lock.expiresAt = new Date(Date.now() + timeoutMs).toISOString();
this.locks.set(resourceId, lock);
return true;
}
/**
* Get current lock status for a resource.
*/
getLockStatus(resourceId) {
const lock = this.locks.get(resourceId);
if (lock && new Date(lock.expiresAt) < new Date()) {
this.releaseLock(resourceId, lock.agentId);
return null;
}
return lock || null;
}
/**
* Get queue length for a resource.
*/
getQueueLength(resourceId) {
return (this.queues.get(resourceId) || []).length;
}
/**
* Get all active locks.
*/
getAllLocks() {
return Array.from(this.locks.values()).filter((lock) => new Date(lock.expiresAt) >= new Date());
}
// --- Persistence ---
lockFilePath(resourceId) {
const sanitized = resourceId.replace(/[^a-zA-Z0-9-_]/g, '_');
return path.join(this.lockDir, `${sanitized}.lock`);
}
async persistLock(lock) {
await fs.writeFile(this.lockFilePath(lock.resourceId), JSON.stringify(lock, null, 2));
}
async removeLockFile(resourceId) {
try {
await fs.unlink(this.lockFilePath(resourceId));
}
catch {
// Ignore if file doesn't exist
}
}
async recoverLocks() {
try {
const files = await fs.readdir(this.lockDir);
for (const file of files) {
if (file.endsWith('.lock')) {
const data = await fs.readFile(path.join(this.lockDir, file), 'utf-8');
const lock = JSON.parse(data);
if (new Date(lock.expiresAt) >= new Date()) {
this.locks.set(lock.resourceId, lock);
}
else {
await fs.unlink(path.join(this.lockDir, file));
}
}
}
}
catch {
// Lock dir doesn't exist yet
}
}
}