/**
* 串口引擎实现
*/
import { logger } from '@/utils/logger';
import { EventBus, globalEventBus } from './EventBus';
import { DataProcessor } from './DataProcessor';
import { WriteMutex } from './WriteMutex';
import { PortSession } from './PortSession';
import { URCDetector } from './URCDetector';
import { ISerialEngine, PortConfig, SerialPortInfo, PortStatus, ReadResult, EventType } from '@/types';
import { SerialPortAdapter } from '@/adapters/SerialPortAdapter';
import { PlatformAdapter } from '@/adapters/PlatformAdapter';
import { SerialError } from '@/types';
/**
* 串口实例信息
*/
interface SerialPortInstance {
adapter: SerialPortAdapter;
config: PortConfig;
status: PortStatus;
createdAt: Date;
lastActivity: Date;
stats: {
bytesWritten: number;
bytesRead: number;
writeCount: number;
readCount: number;
errorCount: number;
urcCount: number;
};
}
/**
* 引擎配置
*/
interface EngineConfig {
maxPorts: number;
defaultTimeout: number;
enableURCDetection: boolean;
enableSessionIsolation: boolean;
enableWriteMutex: boolean;
urcConfigPath?: string;
}
/**
* 串口引擎实现类
*/
export class SerialEngine implements ISerialEngine {
private ports: Map<string, SerialPortInstance> = new Map();
private eventBus: EventBus;
private dataProcessor: DataProcessor;
private writeMutex: WriteMutex;
private portSession: PortSession;
private urcDetector: URCDetector;
private platformAdapter: PlatformAdapter;
private isInitialized: boolean = false;
private config: EngineConfig;
private healthCheckInterval?: NodeJS.Timeout;
constructor(
eventBus?: EventBus,
dataProcessor?: DataProcessor,
writeMutex?: WriteMutex,
portSession?: PortSession,
urcDetector?: URCDetector,
platformAdapter?: PlatformAdapter,
config?: Partial<EngineConfig>
) {
this.eventBus = eventBus || globalEventBus;
this.dataProcessor = dataProcessor || new DataProcessor();
this.writeMutex = writeMutex || new WriteMutex();
this.portSession = portSession || new PortSession(urcDetector);
this.urcDetector = urcDetector || new URCDetector();
this.platformAdapter = platformAdapter || new PlatformAdapter();
// 设置默认配置
this.config = {
maxPorts: 50,
defaultTimeout: 5000,
enableURCDetection: true,
enableSessionIsolation: true,
enableWriteMutex: true,
urcConfigPath: 'config/urc-patterns.yaml',
...config
};
// 设置URC检测器到会话管理器
this.portSession.setURCDetector(this.urcDetector);
logger.debug('SerialEngine initialized with dependencies', {
maxPorts: this.config.maxPorts,
enableURCDetection: this.config.enableURCDetection,
enableSessionIsolation: this.config.enableSessionIsolation,
enableWriteMutex: this.config.enableWriteMutex
});
}
/**
* 初始化引擎
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
logger.warn('SerialEngine already initialized');
return;
}
try {
logger.info('Initializing SerialEngine...');
// 初始化URC检测器
if (this.config.enableURCDetection && this.config.urcConfigPath) {
try {
await this.urcDetector.loadConfig(this.config.urcConfigPath);
logger.info('URC detector initialized');
} catch (error) {
logger.warn('Failed to load URC configuration, URC detection disabled', error);
this.config.enableURCDetection = false;
}
}
// 启动健康检查
this.startHealthCheck();
// 注册引擎级别的事件监听器
this.setupEngineEventListeners();
this.isInitialized = true;
logger.info('SerialEngine initialized successfully', {
enableURCDetection: this.config.enableURCDetection,
enableSessionIsolation: this.config.enableSessionIsolation,
enableWriteMutex: this.config.enableWriteMutex,
maxPorts: this.config.maxPorts
});
} catch (error) {
logger.error('Failed to initialize SerialEngine', error instanceof Error ? error : new Error(String(error)));
throw error;
}
}
/**
* 启动健康检查
*/
private startHealthCheck(): void {
this.healthCheckInterval = setInterval(() => {
this.performHealthCheck();
}, 30000); // 每30秒检查一次
logger.debug('Health check started');
}
/**
* 执行健康检查
*/
private performHealthCheck(): void {
try {
const now = Date.now();
const stalePorts: string[] = [];
for (const [port, instance] of this.ports) {
// 检查端口是否长时间无活动
const inactiveTime = now - instance.lastActivity.getTime();
if (inactiveTime > 5 * 60 * 1000) { // 5分钟无活动
stalePorts.push(port);
}
// 检查错误率
const errorRate = instance.stats.errorCount / Math.max(instance.stats.writeCount, 1);
if (errorRate > 0.1) { // 错误率超过10%
logger.warn(`High error rate detected on port: ${port}`, {
errorRate,
errors: instance.stats.errorCount,
total: instance.stats.writeCount
});
}
}
// 清理长时间无活动的端口
for (const port of stalePorts) {
logger.info(`Cleaning up stale port: ${port}`);
this.closePort(port).catch(error => {
logger.error(`Failed to close stale port: ${port}`, error instanceof Error ? error : new Error(String(error)));
});
}
// 发布健康状态
this.eventBus.publish(EventType.HEALTH_CHECK, {
id: `health-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.HEALTH_CHECK,
status: 'healthy',
metrics: {
uptime: process.uptime(),
memoryUsage: process.memoryUsage(),
activePorts: this.getActivePorts().length,
activeSessions: this.portSession.getActiveSessionCount(),
totalRequests: this.getTotalRequests(),
errorCount: this.getTotalErrors(),
averageResponseTime: this.getAverageResponseTime()
}
});
} catch (error) {
logger.error('Health check failed', error instanceof Error ? error : new Error(String(error)));
}
}
/**
* 设置引擎事件监听器
*/
private setupEngineEventListeners(): void {
// 监听URC事件
this.eventBus.subscribe(EventType.SERIAL_URC, (event) => {
this.handleURCEvent(event);
});
// 监听系统错误
this.eventBus.subscribe(EventType.SYSTEM_ERROR, (event) => {
this.handleSystemError(event);
});
logger.debug('Engine event listeners setup');
}
/**
* 处理URC事件
*/
private handleURCEvent(event: any): void {
try {
// 更新端口统计
const portInstance = this.ports.get(event.port);
if (portInstance) {
portInstance.stats.urcCount++;
}
logger.debug(`URC event handled: ${event.port}`, {
data: event.data,
patternId: event.patternId
});
} catch (error) {
logger.error(`Failed to handle URC event: ${event.port}`, error instanceof Error ? error : new Error(String(error)));
}
}
/**
* 处理系统错误
*/
private handleSystemError(event: any): void {
try {
logger.error('System error received', event instanceof Error ? event : new Error(String(event)));
// 根据错误类型采取相应措施
if (event.component === 'SerialEngine') {
// 引擎错误,可能需要重启或清理
this.handleEngineError(event.error);
}
} catch (error) {
logger.error('Failed to handle system error', error instanceof Error ? error : new Error(String(error)));
}
}
/**
* 处理引擎错误
*/
private handleEngineError(error: Error): void {
try {
logger.error('Engine error detected', error instanceof Error ? error : new Error(String(error)));
// 尝试恢复
if (error.message.includes('resource exhausted')) {
// 资源耗尽,清理一些端口
this.cleanupOldPorts();
}
} catch (recoveryError) {
logger.error('Failed to recover from engine error', recoveryError instanceof Error ? recoveryError : new Error(String(recoveryError)));
}
}
/**
* 清理旧端口
*/
private cleanupOldPorts(): void {
const ports = Array.from(this.ports.entries())
.sort((a, b) => a[1].createdAt.getTime() - b[1].createdAt.getTime());
// 关闭最旧的一半端口
const portsToClose = ports.slice(0, Math.floor(ports.length / 2));
for (const [port] of portsToClose) {
logger.warn(`Cleaning up old port due to resource pressure: ${port}`);
this.closePort(port).catch(error => {
logger.error(`Failed to cleanup old port: ${port}`, error instanceof Error ? error : new Error(String(error)));
});
}
}
/**
* 打开串口
*/
async openPort(config: PortConfig): Promise<SerialPortInfo> {
const startTime = Date.now();
try {
// 检查端口数量限制
if (this.ports.size >= this.config.maxPorts) {
throw new SerialError(
'RESOURCE_EXHAUSTED',
`Maximum port limit reached: ${this.config.maxPorts}`,
config.port
);
}
// 检查端口是否已打开
if (this.ports.has(config.port)) {
throw new SerialError('ALREADY_OPEN', 'Port is already open', config.port);
}
// 验证端口路径
const normalizedPath = this.platformAdapter.normalizePortPath(config.port);
if (!this.platformAdapter.validatePortPath(normalizedPath)) {
throw new SerialError('INVALID_CONFIG', `Invalid port path: ${config.port}`, config.port);
}
logger.info(`Opening port: ${normalizedPath}`, config);
// 获取端口详细信息
const portDetails = await this.platformAdapter.getPortDetails(normalizedPath);
// 创建适配器(注入平台适配器)
const adapter = new SerialPortAdapter(this.platformAdapter);
// 打开端口
await adapter.open(config);
// 创建端口实例
const now = new Date();
const portInstance: SerialPortInstance = {
adapter,
config: { ...config, port: normalizedPath },
status: {
port: normalizedPath,
isOpen: true,
baudrate: config.baudrate,
config: config,
lastActivity: now.toISOString()
},
createdAt: now,
lastActivity: now,
stats: {
bytesWritten: 0,
bytesRead: 0,
writeCount: 0,
readCount: 0,
errorCount: 0,
urcCount: 0
}
};
// 保存端口实例
this.ports.set(normalizedPath, portInstance);
// 注册事件监听器
this.setupPortEventListeners(normalizedPath, adapter);
// 获取信号状态
try {
const signals = await adapter.getSignals();
portInstance.status.signals = signals;
} catch (error) {
logger.warn(`Failed to get signals for port: ${normalizedPath}`, error instanceof Error ? error : new Error(String(error)));
}
// 发布端口打开事件
this.eventBus.publish(EventType.PORT_OPENED, {
id: `port-${normalizedPath}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.PORT_OPENED,
port: normalizedPath,
config: config,
deviceType: portDetails.deviceType,
friendlyName: portDetails.friendlyName
});
const duration = Date.now() - startTime;
logger.info(`Port opened successfully: ${normalizedPath}`, {
duration,
deviceType: portDetails.deviceType,
friendlyName: portDetails.friendlyName
});
// 返回端口信息
return {
path: normalizedPath,
manufacturer: portDetails.manufacturer,
serialNumber: portDetails.serialNumber,
pnpId: portDetails.pnpId,
locationId: portDetails.locationId,
vendorId: portDetails.vendorId,
productId: portDetails.productId
};
} catch (error) {
const duration = Date.now() - startTime;
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to open port: ${config.port}`, errorObj, {
duration
});
// 清理资源
if (this.ports.has(config.port)) {
const instance = this.ports.get(config.port)!;
try {
await instance.adapter.destroy();
} catch (cleanupError) {
logger.error(`Failed to cleanup adapter for port: ${config.port}`, cleanupError instanceof Error ? cleanupError : new Error(String(cleanupError)));
}
this.ports.delete(config.port);
}
throw error;
}
}
/**
* 关闭串口
*/
async closePort(port: string): Promise<void> {
try {
const portInstance = this.ports.get(port);
if (!portInstance) {
throw new SerialError('NOT_OPEN', 'Port is not open', port);
}
logger.info(`Closing port: ${port}`);
// 关闭适配器
await portInstance.adapter.close();
// 更新状态
portInstance.status.isOpen = false;
// 移除端口实例
this.ports.delete(port);
// 发布端口关闭事件
this.eventBus.publish(EventType.PORT_CLOSED, {
id: `port-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.PORT_CLOSED,
port: port,
reason: 'user_request'
});
logger.info(`Port closed successfully: ${port}`);
} catch (error) {
logger.error(`Failed to close port: ${port}`, error instanceof Error ? error : new Error(String(error)));
throw error;
}
}
/**
* 写入数据并读取响应
*/
async writeAndRead(
port: string,
data: Buffer,
timeout: number,
sessionId: string
): Promise<ReadResult> {
const startTime = Date.now();
let lockTicket: any = null;
let sessionCreated = false;
try {
const portInstance = this.ports.get(port);
if (!portInstance) {
throw new SerialError('NOT_OPEN', 'Port is not open', port);
}
logger.debug(`Starting write and read: ${port}`, {
sessionId,
dataSize: data.length,
timeout,
enableSessionIsolation: this.config.enableSessionIsolation,
enableWriteMutex: this.config.enableWriteMutex
});
// 1. 获取写入锁(如果启用)
if (this.config.enableWriteMutex) {
lockTicket = await this.writeMutex.acquire(port, timeout + 1000);
logger.debug(`Write lock acquired: ${port}`, {
ticketId: lockTicket.id,
waitTime: Date.now() - startTime
});
}
// 2. 创建会话(如果启用)
if (this.config.enableSessionIsolation) {
this.portSession.createSession(port, {
timeoutMs: timeout,
filterURC: this.config.enableURCDetection
});
sessionCreated = true;
logger.debug(`Session created: ${port}`, { sessionId });
}
// 3. 写入数据
const writeStartTime = Date.now();
await portInstance.adapter.write(data);
await portInstance.adapter.drain();
const writeDuration = Date.now() - writeStartTime;
logger.debug(`Data written: ${port}`, {
sessionId,
dataSize: data.length,
writeDuration
});
// 4. 等待响应数据
const responseBuffer = await this.collectResponseData(port, sessionId, timeout);
// 5. 处理响应数据
const result = await this.processResponse(port, sessionId, responseBuffer, timeout);
// 6. 更新统计信息
portInstance.stats.bytesWritten += data.length;
portInstance.stats.writeCount++;
if (responseBuffer.length > 0) {
portInstance.stats.bytesRead += responseBuffer.length;
portInstance.stats.readCount++;
}
// 7. 更新活动时间
portInstance.lastActivity = new Date();
portInstance.status.lastActivity = portInstance.lastActivity.toISOString();
const totalDuration = Date.now() - startTime;
logger.info(`Write and read completed: ${port}`, {
sessionId,
totalDuration,
writeDuration,
responseSize: responseBuffer.length,
status: result.status,
urcDetected: result.urcDetected
});
return result;
} catch (error) {
// 更新错误统计
const portInstance = this.ports.get(port);
if (portInstance) {
portInstance.stats.errorCount++;
}
const duration = Date.now() - startTime;
logger.error(`Write and read failed: ${port}`, error instanceof Error ? error : new Error(String(error)), {
sessionId,
duration
});
// 转换错误类型
if (error instanceof SerialError) {
throw error;
} else {
const errorObj = error instanceof Error ? error : new Error(String(error));
throw new SerialError('WRITE_FAILED', `Write and read failed: ${errorObj.message}`, port, error);
}
} finally {
// 清理资源
try {
// 释放锁
if (lockTicket && this.config.enableWriteMutex) {
this.writeMutex.release(port, lockTicket);
logger.debug(`Write lock released: ${port}`, { ticketId: lockTicket.id });
}
// 结束会话
if (sessionCreated && this.config.enableSessionIsolation) {
this.portSession.endSession(sessionId);
logger.debug(`Session ended: ${port}`, { sessionId });
}
} catch (cleanupError) {
logger.error(`Cleanup failed: ${port}`, cleanupError instanceof Error ? cleanupError : new Error(String(cleanupError)));
}
}
}
/**
* 收集响应数据
*/
private async collectResponseData(port: string, sessionId: string, timeout: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new SerialError('TIMEOUT', `Response timeout: ${timeout}ms`, port));
}, timeout);
let buffer = Buffer.alloc(0);
let dataHandler: (data: Buffer) => void;
// 订阅数据事件
const subscriptionId = this.eventBus.subscribe(EventType.SERIAL_DATA, (event: any) => {
if (event.port === port) {
if (this.config.enableSessionIsolation) {
// 使用会话过滤数据
const sessionData = this.portSession.getSessionData(sessionId);
buffer = Buffer.concat([buffer, sessionData]);
} else {
buffer = Buffer.concat([buffer, event.data]);
}
}
});
// 检查是否有数据到达
const checkInterval = setInterval(() => {
if (this.config.enableSessionIsolation) {
const sessionData = this.portSession.getSessionData(sessionId);
if (sessionData.length > 0) {
buffer = Buffer.concat([buffer, sessionData]);
}
}
// 如果有数据或超时,结束收集
if (buffer.length > 0 || Date.now() - Date.now() > timeout) {
clearTimeout(timeoutId);
clearInterval(checkInterval);
this.eventBus.unsubscribe(subscriptionId);
resolve(buffer);
}
}, 10);
// 设置超时
setTimeout(() => {
clearTimeout(timeoutId);
clearInterval(checkInterval);
this.eventBus.unsubscribe(subscriptionId);
resolve(buffer); // 返回已收集的数据,即使为空
}, timeout);
});
}
/**
* 处理响应数据
*/
private async processResponse(
port: string,
sessionId: string,
responseBuffer: Buffer,
timeout: number
): Promise<ReadResult> {
let urcDetected = false;
let filteredUrc: string[] = [];
let processedBuffer = responseBuffer;
// URC检测和过滤
if (this.config.enableURCDetection && responseBuffer.length > 0) {
const { urc, data } = this.urcDetector.extract(responseBuffer, port);
if (urc.length > 0) {
urcDetected = true;
filteredUrc = urc;
processedBuffer = data;
// 发布URC事件
for (const urcLine of urc) {
this.eventBus.publish(EventType.SERIAL_URC, {
id: `urc-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_URC,
port,
data: urcLine,
sessionId
});
}
}
}
// 数据处理
const hexString = this.dataProcessor.bufferToHex(processedBuffer);
const textString = this.dataProcessor.bufferToText(processedBuffer);
// 判断是否为部分响应
const isPartial = await this.isPartialResponse(processedBuffer, port);
const result: ReadResult = {
status: processedBuffer.length > 0 ? 'ok' : 'timeout',
sessionId,
timeoutMs: timeout,
rawHex: hexString,
text: textString,
partial: isPartial,
urcDetected,
filteredUrc: urcDetected ? filteredUrc : undefined
};
return result;
}
/**
* 判断是否为部分响应
*/
private async isPartialResponse(data: Buffer, port: string): Promise<boolean> {
// TODO: 实现更智能的部分响应检测
// 例如:基于协议特定的结束符、校验和等
// 简单实现:检查是否以换行符结束
if (data.length === 0) {
return true;
}
const lastByte = data[data.length - 1];
return lastByte !== 0x0A && lastByte !== 0x0D; // 不是LF或CR
}
/**
* 获取端口状态
*/
getPortStatus(port: string): PortStatus {
const portInstance = this.ports.get(port);
if (!portInstance) {
return {
port,
isOpen: false,
error: 'Port not found'
};
}
// TODO: 获取实时信号状态
// const signals = await portInstance.adapter.get();
return {
...portInstance.status,
isOpen: portInstance.adapter.isOpenPort()
};
}
/**
* 订阅端口事件
*/
subscribe(port: string, types: EventType[], listener: Function): string {
// TODO: 实现端口特定的事件订阅
const subscriptionId = this.eventBus.subscribe(EventType.SERIAL_DATA, listener as any);
logger.debug(`Event subscription created for port: ${port}`, { subscriptionId, types });
return subscriptionId;
}
/**
* 取消订阅
*/
unsubscribe(subscriptionId: string): void {
this.eventBus.unsubscribe(subscriptionId);
logger.debug(`Event subscription removed: ${subscriptionId}`);
}
/**
* 获取所有端口状态
*/
getAllPortStatus(): Record<string, PortStatus> {
const statuses: Record<string, PortStatus> = {};
for (const [port, instance] of this.ports) {
statuses[port] = this.getPortStatus(port);
}
return statuses;
}
/**
* 获取活动端口列表
*/
getActivePorts(): string[] {
return Array.from(this.ports.keys()).filter(port =>
this.ports.get(port)?.adapter.isOpenPort()
);
}
/**
* 关闭所有端口
*/
async closeAllPorts(): Promise<void> {
const ports = Array.from(this.ports.keys());
logger.info(`Closing all ports: ${ports.join(', ')}`);
const closePromises = ports.map(port =>
this.closePort(port).catch(error =>
logger.error(`Failed to close port: ${port}`, error instanceof Error ? error : new Error(String(error)))
)
);
await Promise.all(closePromises);
logger.info('All ports closed');
}
/**
* 设置端口事件监听器
*/
private setupPortEventListeners(port: string, adapter: SerialPortAdapter): void {
// 数据接收事件
adapter.on('data', (data: Buffer) => {
this.handlePortData(port, data);
});
// 错误事件
adapter.on('error', (error: Error) => {
this.handlePortError(port, error);
});
// 关闭事件
adapter.on('close', () => {
this.handlePortClose(port);
});
logger.debug(`Event listeners setup for port: ${port}`);
}
/**
* 处理端口数据
*/
private handlePortData(port: string, data: Buffer): void {
try {
const portInstance = this.ports.get(port);
if (!portInstance) {
logger.warn(`Received data for unknown port: ${port}`);
return;
}
// 更新活动时间和统计
portInstance.lastActivity = new Date();
portInstance.status.lastActivity = portInstance.lastActivity.toISOString();
portInstance.stats.bytesRead += data.length;
portInstance.stats.readCount++;
// URC检测(如果有活跃会话)
const activeSessionId = this.portSession.getActiveSession(port);
if (activeSessionId && this.config.enableSessionIsolation) {
// 添加数据到会话
this.portSession.addSessionData(activeSessionId, data);
// 获取会话中的URC
const urcList = this.portSession.getSessionURC(activeSessionId);
for (const urc of urcList) {
this.eventBus.publish(EventType.SERIAL_URC, {
id: `urc-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_URC,
port,
data: urc,
sessionId: activeSessionId
});
}
} else if (this.config.enableURCDetection) {
// 没有会话,直接检测URC
const { urc, data: cleanData } = this.urcDetector.extract(data, port);
// 发布URC事件
for (const urcLine of urc) {
this.eventBus.publish(EventType.SERIAL_URC, {
id: `urc-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_URC,
port,
data: urcLine
});
// 更新URC统计
portInstance.stats.urcCount++;
}
// 如果有非URC数据,发布数据事件
if (cleanData.length > 0) {
this.eventBus.publish(EventType.SERIAL_DATA, {
id: `data-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_DATA,
port,
data: cleanData
});
}
} else {
// 不启用URC检测,直接发布数据
this.eventBus.publish(EventType.SERIAL_DATA, {
id: `data-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_DATA,
port,
data
});
}
logger.debug(`Data processed: ${port}`, {
size: data.length,
sessionId: activeSessionId,
hasURC: activeSessionId ? this.portSession.getSessionURC(activeSessionId).length > 0 : false
});
} catch (error) {
logger.error(`Failed to handle port data: ${port}`, error instanceof Error ? error : new Error(String(error)));
// 发布错误事件
this.eventBus.publish(EventType.SERIAL_ERROR, {
id: `error-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.SERIAL_ERROR,
port,
error: error instanceof Error ? error : new Error(String(error))
});
}
}
/**
* 处理端口错误
*/
private handlePortError(port: string, error: Error): void {
try {
// 更新端口状态
const portInstance = this.ports.get(port);
if (portInstance) {
portInstance.status.error = error.message;
}
// 发布错误事件
this.eventBus.publish(EventType.PORT_ERROR, {
id: `error-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.PORT_ERROR,
port,
error,
recoverable: true
});
logger.error(`Port error: ${port}`, error instanceof Error ? error : new Error(String(error)));
} catch (err) {
logger.error(`Failed to handle port error: ${port}`, err instanceof Error ? err : new Error(String(err)));
}
}
/**
* 处理端口关闭
*/
private handlePortClose(port: string): void {
try {
// 移除端口实例
this.ports.delete(port);
// 发布关闭事件
this.eventBus.publish(EventType.PORT_CLOSED, {
id: `close-${port}-${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'SerialEngine',
type: EventType.PORT_CLOSED,
port,
reason: 'device_disconnected'
});
logger.info(`Port closed unexpectedly: ${port}`);
} catch (error) {
logger.error(`Failed to handle port close: ${port}`, error instanceof Error ? error : new Error(String(error)));
}
}
/**
* 获取引擎统计信息
*/
getEngineStats(): any {
const portStats: Record<string, any> = {};
for (const [port, instance] of this.ports) {
portStats[port] = {
...instance.stats,
isOpen: instance.adapter.isOpenPort(),
uptime: Date.now() - instance.createdAt.getTime(),
lastActivity: instance.lastActivity.toISOString(),
config: instance.config
};
}
return {
isInitialized: this.isInitialized,
totalPorts: this.ports.size,
activePorts: this.getActivePorts().length,
portStats,
config: this.config,
components: {
eventBus: {
subscriptionCount: this.eventBus.getAllSubscriptions().length
},
writeMutex: {
lockInfo: this.getActivePorts().reduce((acc, port) => {
acc[port] = this.writeMutex.getLockInfo(port);
return acc;
}, {} as Record<string, any>)
},
portSession: {
activeSessions: this.portSession.getActiveSessionCount(),
sessionStats: this.portSession.getAllSessionStats()
},
urcDetector: {
enabled: this.config.enableURCDetection,
patternStats: this.urcDetector.getPatternStats()
}
}
};
}
/**
* 获取总请求数
*/
private getTotalRequests(): number {
return Array.from(this.ports.values())
.reduce((total, instance) => total + instance.stats.writeCount, 0);
}
/**
* 获取总错误数
*/
private getTotalErrors(): number {
return Array.from(this.ports.values())
.reduce((total, instance) => total + instance.stats.errorCount, 0);
}
/**
* 获取平均响应时间(模拟)
*/
private getAverageResponseTime(): number {
// TODO: 实现真实的响应时间统计
return 100; // 模拟值
}
/**
* 重置所有统计信息
*/
resetAllStats(): void {
for (const instance of this.ports.values()) {
instance.stats = {
bytesWritten: 0,
bytesRead: 0,
writeCount: 0,
readCount: 0,
errorCount: 0,
urcCount: 0
};
}
logger.info('All port statistics reset');
}
/**
* 获取端口详细信息
*/
async getPortDetails(port: string): Promise<any> {
const portInstance = this.ports.get(port);
if (!portInstance) {
throw new SerialError('NOT_OPEN', 'Port is not open', port);
}
try {
const [signals, adapterStats] = await Promise.all([
portInstance.adapter.getSignals(),
Promise.resolve(portInstance.adapter.getStats())
]);
return {
...this.getPortStatus(port),
signals,
stats: portInstance.stats,
adapterStats,
sessionInfo: this.portSession.getActiveSession(port) ?
this.portSession.getSessionStats(this.portSession.getActiveSession(port)!) : null
};
} catch (error) {
logger.error(`Failed to get port details: ${port}`, error instanceof Error ? error : new Error(String(error)));
throw error;
}
}
/**
* 批量操作端口
*/
async batchOperation(ports: string[], operation: 'open' | 'close' | 'reset'): Promise<Record<string, any>> {
const results: Record<string, any> = {};
const operations = ports.map(async (port) => {
try {
switch (operation) {
case 'open':
// 需要提供配置,这里只是示例
throw new Error('Batch open requires configuration');
case 'close':
await this.closePort(port);
results[port] = { success: true };
break;
case 'reset':
await this.resetPort(port);
results[port] = { success: true };
break;
}
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
results[port] = { success: false, error: errorObj.message };
}
});
await Promise.all(operations);
return results;
}
/**
* 重置端口
*/
async resetPort(port: string): Promise<void> {
const portInstance = this.ports.get(port);
if (!portInstance) {
throw new SerialError('NOT_OPEN', 'Port is not open', port);
}
logger.info(`Resetting port: ${port}`);
try {
// 结束所有活动会话
this.portSession.endPortSessions(port);
// 强制释放锁
this.writeMutex.forceRelease(port);
// 刷新缓冲区
await portInstance.adapter.flush();
// 重置统计
portInstance.stats = {
bytesWritten: 0,
bytesRead: 0,
writeCount: 0,
readCount: 0,
errorCount: 0,
urcCount: 0
};
logger.info(`Port reset successfully: ${port}`);
} catch (error) {
logger.error(`Failed to reset port: ${port}`, error instanceof Error ? error : new Error(String(error)));
throw error;
}
}
/**
* 更新配置
*/
updateConfig(newConfig: Partial<EngineConfig>): void {
this.config = { ...this.config, ...newConfig };
// 更新URC检测器配置
if (newConfig.enableURCDetection !== undefined) {
this.urcDetector.setEnabled(newConfig.enableURCDetection);
}
logger.info('Engine configuration updated', newConfig);
}
/**
* 导出配置
*/
exportConfig(): EngineConfig {
return { ...this.config };
}
/**
* 销毁引擎
*/
async dispose(): Promise<void> {
try {
logger.info('Disposing SerialEngine...');
// 停止健康检查
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
// 关闭所有端口
await this.closeAllPorts();
// 销毁组件
this.urcDetector.dispose();
this.portSession.dispose();
this.writeMutex.dispose();
this.eventBus.clear();
// 清理资源
this.ports.clear();
this.isInitialized = false;
logger.info('SerialEngine disposed successfully');
} catch (error) {
logger.error('Failed to dispose SerialEngine', error instanceof Error ? error : new Error(String(error)));
}
}
}