/**
* Agent Synch MCP Server - Lock Manager
* Handles file locking and queuing for concurrent agent access.
*/
import * as fs from 'fs/promises';
import * as path from 'path';
export interface LockInfo {
resourceId: string;
agentId: string;
acquiredAt: string;
expiresAt: string;
operation: string;
}
export interface QueueEntry {
agentId: string;
resourceId: string;
requestedAt: string;
operation: string;
resolve: (acquired: boolean) => void;
}
/**
* LockManager - Handles concurrent access to shared resources.
* Uses in-memory locks with file-based persistence for crash recovery.
*/
export class LockManager {
private locks: Map<string, LockInfo> = new Map();
private queues: Map<string, QueueEntry[]> = new Map();
private lockDir: string;
private defaultTimeoutMs: number;
constructor(baseDir: string, timeoutMs: number = 30000) {
this.lockDir = path.join(baseDir, '.locks');
this.defaultTimeoutMs = timeoutMs;
}
async initialize(): Promise<void> {
await fs.mkdir(this.lockDir, { recursive: true });
// Recover any stale locks on startup
await this.recoverLocks();
}
/**
* Acquire a lock on a resource. Returns immediately if available,
* otherwise queues the request and waits.
*/
async acquireLock(
resourceId: string,
agentId: string,
operation: string,
timeoutMs?: number
): Promise<boolean> {
const timeout = timeoutMs ?? this.defaultTimeoutMs;
// Check if already locked
const existingLock = this.locks.get(resourceId);
if (existingLock) {
// Check if expired
if (new Date(existingLock.expiresAt) < new Date()) {
await this.releaseLock(resourceId, existingLock.agentId);
} else if (existingLock.agentId === agentId) {
// Same agent, extend lock
return this.extendLock(resourceId, agentId, timeout);
} else {
// Queue this request
return this.queueForLock(resourceId, agentId, operation, timeout);
}
}
// Acquire the lock
return this.doAcquireLock(resourceId, agentId, operation, timeout);
}
private async doAcquireLock(
resourceId: string,
agentId: string,
operation: string,
timeoutMs: number
): Promise<boolean> {
const now = new Date();
const expiresAt = new Date(now.getTime() + timeoutMs);
const lockInfo: LockInfo = {
resourceId,
agentId,
acquiredAt: now.toISOString(),
expiresAt: expiresAt.toISOString(),
operation,
};
this.locks.set(resourceId, lockInfo);
await this.persistLock(lockInfo);
return true;
}
private async queueForLock(
resourceId: string,
agentId: string,
operation: string,
timeoutMs: number
): Promise<boolean> {
return new Promise((resolve) => {
const queue = this.queues.get(resourceId) || [];
const entry: QueueEntry = {
agentId,
resourceId,
requestedAt: new Date().toISOString(),
operation,
resolve,
};
queue.push(entry);
this.queues.set(resourceId, queue);
// Set timeout for queue entry
setTimeout(() => {
const q = this.queues.get(resourceId) || [];
const idx = q.findIndex((e) => e.agentId === agentId && e.requestedAt === entry.requestedAt);
if (idx !== -1) {
q.splice(idx, 1);
this.queues.set(resourceId, q);
resolve(false); // Timeout
}
}, timeoutMs);
});
}
/**
* Release a lock and process the queue.
*/
async releaseLock(resourceId: string, agentId: string): Promise<boolean> {
const lock = this.locks.get(resourceId);
if (!lock || lock.agentId !== agentId) {
return false;
}
this.locks.delete(resourceId);
await this.removeLockFile(resourceId);
// Process queue
await this.processQueue(resourceId);
return true;
}
private async processQueue(resourceId: string): Promise<void> {
const queue = this.queues.get(resourceId) || [];
if (queue.length === 0) return;
const next = queue.shift()!;
this.queues.set(resourceId, queue);
const acquired = await this.doAcquireLock(
next.resourceId,
next.agentId,
next.operation,
this.defaultTimeoutMs
);
next.resolve(acquired);
}
private extendLock(resourceId: string, agentId: string, timeoutMs: number): boolean {
const lock = this.locks.get(resourceId);
if (!lock || lock.agentId !== agentId) return false;
lock.expiresAt = new Date(Date.now() + timeoutMs).toISOString();
this.locks.set(resourceId, lock);
return true;
}
/**
* Get current lock status for a resource.
*/
getLockStatus(resourceId: string): LockInfo | null {
const lock = this.locks.get(resourceId);
if (lock && new Date(lock.expiresAt) < new Date()) {
this.releaseLock(resourceId, lock.agentId);
return null;
}
return lock || null;
}
/**
* Get queue length for a resource.
*/
getQueueLength(resourceId: string): number {
return (this.queues.get(resourceId) || []).length;
}
/**
* Get all active locks.
*/
getAllLocks(): LockInfo[] {
return Array.from(this.locks.values()).filter(
(lock) => new Date(lock.expiresAt) >= new Date()
);
}
// --- Persistence ---
private lockFilePath(resourceId: string): string {
const sanitized = resourceId.replace(/[^a-zA-Z0-9-_]/g, '_');
return path.join(this.lockDir, `${sanitized}.lock`);
}
private async persistLock(lock: LockInfo): Promise<void> {
await fs.writeFile(this.lockFilePath(lock.resourceId), JSON.stringify(lock, null, 2));
}
private async removeLockFile(resourceId: string): Promise<void> {
try {
await fs.unlink(this.lockFilePath(resourceId));
} catch {
// Ignore if file doesn't exist
}
}
private async recoverLocks(): Promise<void> {
try {
const files = await fs.readdir(this.lockDir);
for (const file of files) {
if (file.endsWith('.lock')) {
const data = await fs.readFile(path.join(this.lockDir, file), 'utf-8');
const lock = JSON.parse(data) as LockInfo;
if (new Date(lock.expiresAt) >= new Date()) {
this.locks.set(lock.resourceId, lock);
} else {
await fs.unlink(path.join(this.lockDir, file));
}
}
}
} catch {
// Lock dir doesn't exist yet
}
}
}