/**
* 写入互斥锁实现
*/
import { logger } from '@/utils/logger';
import { IWriteMutex, LockTicket, LockInfo, SerialError } from '@/types';
import { ErrorCode } from '@/utils/error-codes';
import { v4 as uuidv4 } from 'uuid';
/**
* 锁队列项
*/
interface LockQueueItem {
ticket: LockTicket;
resolve: (ticket: LockTicket) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
timestamp: number;
}
/**
* 端口锁状态
*/
interface PortLockState {
isLocked: boolean;
currentHolder?: LockTicket;
queue: LockQueueItem[];
totalRequests: number;
totalWaitTime: number;
maxWaitTime: number;
}
/**
* 写入互斥锁实现类
*/
export class WriteMutex implements IWriteMutex {
private portLocks: Map<string, PortLockState> = new Map();
private cleanupInterval: NodeJS.Timeout;
private readonly defaultTimeout: number = 30000; // 30秒默认超时
constructor() {
// 定期清理过期的锁
this.cleanupInterval = setInterval(() => {
this.cleanupExpiredLocks();
}, 5000); // 每5秒清理一次
logger.debug('WriteMutex initialized');
}
/**
* 获取锁
*/
async acquire(port: string, timeoutMs: number = this.defaultTimeout): Promise<LockTicket> {
try {
logger.debug(`Acquiring lock for port: ${port}`, { timeoutMs });
// 获取或创建端口锁状态
let lockState = this.portLocks.get(port);
if (!lockState) {
lockState = {
isLocked: false,
queue: [],
totalRequests: 0,
totalWaitTime: 0,
maxWaitTime: 0
};
this.portLocks.set(port, lockState);
}
// 更新统计
lockState.totalRequests++;
// 创建锁票据
const ticket: LockTicket = {
id: uuidv4(),
port,
acquiredAt: 0,
requester: 'requester', // TODO: 传入真实的请求者标识
timeoutMs
};
// 如果端口未被锁定,立即获取锁
if (!lockState.isLocked) {
lockState.isLocked = true;
lockState.currentHolder = ticket;
ticket.acquiredAt = Date.now();
logger.debug(`Lock acquired immediately for port: ${port}`, {
ticketId: ticket.id
});
return ticket;
}
// 端口已被锁定,加入队列等待
return new Promise<LockTicket>((resolve, reject) => {
const startTime = Date.now();
// 设置超时定时器
const timeout = setTimeout(() => {
// 从队列中移除
const index = lockState.queue.findIndex(item => item.ticket.id === ticket.id);
if (index !== -1) {
lockState.queue.splice(index, 1);
}
const waitTime = Date.now() - startTime;
logger.warn(`Lock acquisition timeout for port: ${port}`, {
ticketId: ticket.id,
waitTime
});
reject(new SerialError(ErrorCode.MUTEX_TIMEOUT, `Lock acquisition timeout: ${timeoutMs}ms`, port));
}, timeoutMs);
// 添加到队列
const queueItem: LockQueueItem = {
ticket,
resolve,
reject,
timeout,
timestamp: startTime
};
lockState.queue.push(queueItem);
logger.debug(`Lock request queued for port: ${port}`, {
ticketId: ticket.id,
queuePosition: lockState.queue.length
});
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to acquire lock for port: ${port}`, errorObj);
throw errorObj;
}
}
/**
* 释放锁
*/
release(port: string, ticket: LockTicket): void {
try {
const lockState = this.portLocks.get(port);
if (!lockState) {
logger.warn(`No lock state found for port: ${port}`);
return;
}
// 验证当前持有者
if (!lockState.currentHolder || lockState.currentHolder.id !== ticket.id) {
throw new SerialError(ErrorCode.LOCK_NOT_HELD, 'Lock is not held by this ticket', port);
}
logger.debug(`Releasing lock for port: ${port}`, { ticketId: ticket.id });
// 释放当前锁
lockState.isLocked = false;
lockState.currentHolder = undefined;
// 更新统计信息
const holdTime = Date.now() - ticket.acquiredAt;
logger.debug(`Lock held for: ${holdTime}ms`, { port, ticketId: ticket.id });
// 处理队列中的下一个请求
this.processNextRequest(port);
logger.debug(`Lock released for port: ${port}`, { ticketId: ticket.id });
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to release lock for port: ${port}`, errorObj);
throw errorObj;
}
}
/**
* 获取锁信息
*/
getLockInfo(port: string): LockInfo {
const lockState = this.portLocks.get(port);
if (!lockState) {
return {
isLocked: false,
waitQueueSize: 0,
maxWaitTime: 0,
averageWaitTime: 0
};
}
const averageWaitTime = lockState.totalRequests > 0
? lockState.totalWaitTime / lockState.totalRequests
: 0;
return {
isLocked: lockState.isLocked,
currentHolder: lockState.currentHolder?.id,
waitQueueSize: lockState.queue.length,
maxWaitTime: lockState.maxWaitTime,
averageWaitTime
};
}
/**
* 强制释放锁(紧急情况使用)
*/
forceRelease(port: string): void {
try {
const lockState = this.portLocks.get(port);
if (!lockState) {
logger.warn(`No lock state found for port: ${port}`);
return;
}
logger.warn(`Force releasing lock for port: ${port}`, {
currentHolder: lockState.currentHolder?.id,
queueSize: lockState.queue.length
});
// 清除所有超时定时器
for (const item of lockState.queue) {
clearTimeout(item.timeout);
item.reject(new SerialError(ErrorCode.LOCK_EXPIRED, 'Lock force released', port));
}
// 重置锁状态
lockState.isLocked = false;
lockState.currentHolder = undefined;
lockState.queue = [];
logger.info(`Lock force released for port: ${port}`);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to force release lock for port: ${port}`, errorObj);
}
}
/**
* 获取所有端口锁状态
*/
getAllLockInfo(): Record<string, LockInfo> {
const result: Record<string, LockInfo> = {};
for (const [port] of this.portLocks) {
result[port] = this.getLockInfo(port);
}
return result;
}
/**
* 处理队列中的下一个请求
*/
private processNextRequest(port: string): void {
const lockState = this.portLocks.get(port);
if (!lockState || lockState.queue.length === 0) {
return;
}
// 获取队列中的第一个请求
const queueItem = lockState.queue.shift()!;
const { ticket, resolve, timeout } = queueItem;
// 清除超时定时器
clearTimeout(timeout);
// 计算等待时间
const waitTime = Date.now() - queueItem.timestamp;
lockState.totalWaitTime += waitTime;
lockState.maxWaitTime = Math.max(lockState.maxWaitTime, waitTime);
// 授予锁
lockState.isLocked = true;
lockState.currentHolder = ticket;
ticket.acquiredAt = Date.now();
logger.debug(`Lock granted to next requester: ${port}`, {
ticketId: ticket.id,
waitTime
});
// 解析Promise
resolve(ticket);
}
/**
* 清理过期的锁
*/
private cleanupExpiredLocks(): void {
const now = Date.now();
for (const [port, lockState] of this.portLocks) {
// 检查当前持有者是否过期
if (lockState.currentHolder && lockState.currentHolder.timeoutMs) {
const holdTime = now - lockState.currentHolder.acquiredAt;
if (holdTime > lockState.currentHolder.timeoutMs) {
logger.warn(`Current lock expired for port: ${port}`, {
ticketId: lockState.currentHolder.id,
holdTime,
timeout: lockState.currentHolder.timeoutMs
});
// 释放过期的锁
lockState.isLocked = false;
lockState.currentHolder = undefined;
// 处理下一个请求
this.processNextRequest(port);
}
}
// 清理队列中的过期请求
const expiredItems: LockQueueItem[] = [];
const activeItems: LockQueueItem[] = [];
for (const item of lockState.queue) {
const waitTime = now - item.timestamp;
if (waitTime > item.ticket.timeoutMs) {
expiredItems.push(item);
} else {
activeItems.push(item);
}
}
// 处理过期请求
for (const item of expiredItems) {
clearTimeout(item.timeout);
item.reject(new SerialError(ErrorCode.LOCK_EXPIRED, 'Lock request expired', port));
}
// 更新队列
lockState.queue = activeItems;
if (expiredItems.length > 0) {
logger.debug(`Cleaned up ${expiredItems.length} expired lock requests for port: ${port}`);
}
}
}
/**
* 检测死锁
*/
detectDeadlock(): string[] {
const deadlockedPorts: string[] = [];
const now = Date.now();
for (const [port, lockState] of this.portLocks) {
if (lockState.isLocked && lockState.currentHolder) {
const holdTime = now - lockState.currentHolder.acquiredAt;
// 如果锁持有时间超过5分钟且队列中有等待的请求,可能存在死锁
if (holdTime > 5 * 60 * 1000 && lockState.queue.length > 0) {
deadlockedPorts.push(port);
}
}
}
if (deadlockedPorts.length > 0) {
logger.warn('Potential deadlock detected', { ports: deadlockedPorts });
}
return deadlockedPorts;
}
/**
* 销毁互斥锁
*/
dispose(): void {
try {
// 清理定时器
clearInterval(this.cleanupInterval);
// 强制释放所有锁
for (const port of this.portLocks.keys()) {
this.forceRelease(port);
}
// 清理状态
this.portLocks.clear();
logger.info('WriteMutex disposed');
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error('Failed to dispose WriteMutex', errorObj);
}
}
}