/**
* 串口适配器实现
*/
import { SerialPort } from 'serialport';
import { logger } from '@/utils/logger';
import { ISerialPortAdapter, PortConfig, SerialError } from '@/types';
import { ErrorCode } from '@/utils/error-codes';
import { PlatformAdapter } from './PlatformAdapter';
/**
* 写入队列项
*/
interface WriteQueueItem {
data: Buffer;
resolve: () => void;
reject: (error: Error) => void;
timestamp: number;
}
/**
* 串口统计信息
*/
interface SerialStats {
bytesWritten: number;
bytesRead: number;
writeCount: number;
readCount: number;
errorCount: number;
lastActivity?: Date;
}
/**
* 串口适配器实现类
*/
export class SerialPortAdapter implements ISerialPortAdapter {
private serialPort?: SerialPort;
private config?: PortConfig;
private _isOpen: boolean = false;
private platformAdapter: PlatformAdapter;
private writeQueue: WriteQueueItem[] = [];
private isWriting: boolean = false;
private eventHandlers: Map<string, Function[]> = new Map();
private stats: SerialStats;
private writeTimeout: number = 5000;
private readTimeout: number = 5000;
constructor(platformAdapter?: PlatformAdapter) {
this.platformAdapter = platformAdapter || new PlatformAdapter();
this.stats = {
bytesWritten: 0,
bytesRead: 0,
writeCount: 0,
readCount: 0,
errorCount: 0
};
logger.debug('SerialPortAdapter initialized');
}
/**
* 打开串口
*/
async open(config: PortConfig): Promise<void> {
try {
if (this._isOpen) {
throw new SerialError(ErrorCode.ALREADY_OPEN, 'Port is already open', config.port);
}
// 验证配置
this.validateConfig(config);
// 标准化端口路径
const normalizedPath = this.platformAdapter.normalizePortPath(config.port);
// 验证端口路径
if (!this.platformAdapter.validatePortPath(normalizedPath)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid port path: ${config.port}`, config.port);
}
// 检查权限
const hasPermission = await this.platformAdapter.checkPermissions();
if (!hasPermission) {
throw new SerialError(ErrorCode.PERMISSION_DENIED, 'Insufficient permissions to access port', config.port);
}
// 检查端口是否被占用
const isInUse = await this.platformAdapter.isPortInUse(normalizedPath);
if (isInUse) {
logger.warn(`Port may be in use: ${normalizedPath}`);
}
logger.info(`Opening serial port: ${normalizedPath}`, config);
// 创建配置对象
const serialConfig = {
path: normalizedPath,
baudRate: config.baudrate,
dataBits: config.dataBits,
parity: config.parity,
stopBits: config.stopBits,
flowControl: config.flowControl === 'none' ? false : config.flowControl,
autoOpen: false,
// 添加平台特定配置
...this.getPlatformSpecificConfig(normalizedPath)
};
this.serialPort = new SerialPort(serialConfig);
// 注册事件监听器
this.setupEventListeners();
// 使用Promise包装打开操作
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new SerialError(ErrorCode.TIMEOUT, 'Port open timeout', normalizedPath));
}, this.writeTimeout);
this.serialPort!.open((error) => {
clearTimeout(timeout);
if (error) {
// 转换错误码
const errorCode = this.mapOpenError(error);
reject(new SerialError(errorCode, `Failed to open port: ${error.message}`, normalizedPath, error));
} else {
resolve();
}
});
});
// 保存配置和更新状态
this.config = { ...config, port: normalizedPath };
this._isOpen = true;
this.updateLastActivity();
logger.info(`Serial port opened successfully: ${normalizedPath}`, {
baudRate: config.baudrate,
dataBits: config.dataBits,
parity: config.parity,
stopBits: config.stopBits,
flowControl: config.flowControl
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to open serial port: ${config.port}`, errorObj);
// 清理资源
if (this.serialPort) {
this.serialPort.destroy();
this.serialPort = undefined;
}
throw error;
}
}
/**
* 验证配置
*/
private validateConfig(config: PortConfig): void {
// 验证波特率
const validBaudRates = this.platformAdapter.getStandardBaudRates();
if (!validBaudRates.includes(config.baudrate)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid baud rate: ${config.baudrate}`, config.port);
}
// 验证数据位
const validDataBits = this.platformAdapter.getSupportedDataBits();
if (!validDataBits.includes(config.dataBits)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid data bits: ${config.dataBits}`, config.port);
}
// 验证停止位
const validStopBits = this.platformAdapter.getSupportedStopBits();
if (!validStopBits.includes(config.stopBits)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid stop bits: ${config.stopBits}`, config.port);
}
// 验证校验位
const validParity = this.platformAdapter.getSupportedParity();
if (!validParity.includes(config.parity)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid parity: ${config.parity}`, config.port);
}
// 验证流控制
const validFlowControl = this.platformAdapter.getSupportedFlowControl();
if (!validFlowControl.includes(config.flowControl)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, `Invalid flow control: ${config.flowControl}`, config.port);
}
}
/**
* 获取平台特定配置
*/
private getPlatformSpecificConfig(portPath: string): any {
const platform = require('os').platform();
const config: any = {};
if (platform === 'linux') {
// Linux特定配置
config.lock = false; // 禁用文件锁,避免权限问题
} else if (platform === 'win32') {
// Windows特定配置
config.highWaterMark = 64 * 1024; // 64KB缓冲区
}
return config;
}
/**
* 映射打开错误
*/
private mapOpenError(error: any): ErrorCode {
const message = error.message.toLowerCase();
if (message.includes('busy') || message.includes('access denied')) {
return ErrorCode.PORT_BUSY;
}
if (message.includes('access denied') || message.includes('permission')) {
return ErrorCode.PERMISSION_DENIED;
}
if (message.includes('not found') || message.includes('no such file')) {
return ErrorCode.PORT_NOT_FOUND;
}
if (message.includes('invalid') || message.includes('parameter')) {
return ErrorCode.INVALID_CONFIG;
}
return ErrorCode.SYSTEM_ERROR;
}
/**
* 设置事件监听器
*/
private setupEventListeners(): void {
if (!this.serialPort) return;
// 数据接收事件
this.serialPort.on('data', (data: Buffer) => {
this.handleDataReceived(data);
});
// 错误事件
this.serialPort.on('error', (error: Error) => {
this.handleError(error);
});
// 关闭事件
this.serialPort.on('close', () => {
this.handleClose();
});
// 打开事件
this.serialPort.on('open', () => {
this.handleOpen();
});
logger.debug('Event listeners setup for serial port');
}
/**
* 关闭串口
*/
async close(): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
logger.warn('Port is not open, cannot close');
return;
}
const port = this.config?.port || 'unknown';
logger.info(`Closing serial port: ${port}`);
// 使用Promise包装关闭操作
await new Promise<void>((resolve, reject) => {
this.serialPort!.close((error) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Failed to close port: ${error.message}`, port, error));
} else {
resolve();
}
});
});
this._isOpen = false;
this.serialPort = undefined;
this.config = undefined;
logger.info(`Serial port closed successfully: ${port}`);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error('Failed to close serial port', errorObj);
throw error;
}
}
/**
* 写入数据
*/
async write(data: Buffer): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
throw new SerialError(ErrorCode.NOT_OPEN, 'Port is not open for writing');
}
if (!Buffer.isBuffer(data)) {
throw new SerialError(ErrorCode.INVALID_CONFIG, 'Data must be a Buffer', this.config?.port);
}
if (data.length === 0) {
logger.debug('Empty data buffer, skipping write');
return;
}
logger.debug(`Writing data to port: ${this.config?.port}`, {
size: data.length,
data: data.toString('hex').substring(0, 100) + (data.length > 50 ? '...' : '')
});
// 使用队列管理写入
await this.queueWrite(data);
// 更新统计
this.stats.bytesWritten += data.length;
this.stats.writeCount++;
this.updateLastActivity();
logger.debug(`Data queued for writing: ${this.config?.port}`, {
queueSize: this.writeQueue.length,
totalBytes: this.stats.bytesWritten
});
} catch (error) {
this.stats.errorCount++;
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to write data to port: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 队列化写入操作
*/
private async queueWrite(data: Buffer): Promise<void> {
return new Promise<void>((resolve, reject) => {
const queueItem: WriteQueueItem = {
data,
resolve,
reject,
timestamp: Date.now()
};
// 检查队列大小,防止内存溢出
if (this.writeQueue.length >= 1000) {
reject(new SerialError(ErrorCode.RESOURCE_EXHAUSTED, 'Write queue is full', this.config?.port));
return;
}
// 添加到队列
this.writeQueue.push(queueItem);
// 如果没有正在写入,开始处理队列
if (!this.isWriting) {
this.processWriteQueue();
}
});
}
/**
* 处理写入队列
*/
private async processWriteQueue(): Promise<void> {
if (this.isWriting || this.writeQueue.length === 0) {
return;
}
this.isWriting = true;
try {
while (this.writeQueue.length > 0) {
const item = this.writeQueue.shift()!;
try {
await this.performWrite(item.data);
item.resolve();
} catch (error) {
item.reject(error as Error);
// 清空队列,避免后续写入
this.writeQueue.length = 0;
break;
}
}
} finally {
this.isWriting = false;
}
}
/**
* 执行实际写入操作
*/
private async performWrite(data: Buffer): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this.serialPort || !this._isOpen) {
reject(new SerialError(ErrorCode.NOT_OPEN, 'Port is not open', this.config?.port));
return;
}
// 设置超时
const timeout = setTimeout(() => {
reject(new SerialError(ErrorCode.TIMEOUT, 'Write timeout', this.config?.port));
}, this.writeTimeout);
this.serialPort.write(data, (error) => {
clearTimeout(timeout);
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Write failed: ${error.message}`, this.config?.port, error));
} else {
// 等待数据发送完成
this.drain().then(resolve).catch(reject);
}
});
});
}
/**
* 批量写入数据
*/
async writeBatch(buffers: Buffer[]): Promise<void> {
if (!Array.isArray(buffers) || buffers.length === 0) {
throw new SerialError(ErrorCode.INVALID_CONFIG, 'Buffers must be a non-empty array', this.config?.port);
}
const totalSize = buffers.reduce((sum, buf) => sum + buf.length, 0);
// 检查总大小限制
if (totalSize > 1024 * 1024) { // 1MB限制
throw new SerialError(ErrorCode.INVALID_CONFIG, 'Batch size exceeds limit (1MB)', this.config?.port);
}
logger.debug(`Writing batch data: ${this.config?.port}`, {
bufferCount: buffers.length,
totalSize
});
// 合并所有缓冲区并一次性写入
const combinedBuffer = Buffer.concat(buffers);
await this.write(combinedBuffer);
}
/**
* 带确认的写入
*/
async writeWithConfirmation(data: Buffer, expectedResponse?: Buffer, timeout: number = this.readTimeout): Promise<Buffer> {
// TODO: 实现写入并等待特定响应的逻辑
// 这需要与读取操作配合,暂时先实现基础写入
await this.write(data);
// 如果不需要等待响应,返回空Buffer
if (!expectedResponse) {
return Buffer.alloc(0);
}
// 临时实现:等待一段时间后返回
await new Promise(resolve => setTimeout(resolve, 100));
return Buffer.alloc(0);
}
/**
* 等待数据发送完成
*/
async drain(): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
return;
}
await new Promise<void>((resolve, reject) => {
this.serialPort!.drain((error) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Drain failed: ${error.message}`, this.config?.port, error));
} else {
resolve();
}
});
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to drain port: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 清空缓冲区
*/
async flush(): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
return;
}
await new Promise<void>((resolve, reject) => {
this.serialPort!.flush((error) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Flush failed: ${error.message}`, this.config?.port, error));
} else {
resolve();
}
});
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to flush port: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 设置串口参数
*/
async set(options: any): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
throw new SerialError(ErrorCode.NOT_OPEN, 'Port is not open');
}
await new Promise<void>((resolve, reject) => {
this.serialPort!.set(options, (error) => {
if (error) {
reject(new SerialError(ErrorCode.INVALID_CONFIG, `Set failed: ${error.message}`, this.config?.port, error));
} else {
resolve();
}
});
});
logger.debug(`Port settings updated: ${this.config?.port}`, options);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to set port options: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 获取串口参数
*/
async get(): Promise<any> {
try {
if (!this.serialPort || !this._isOpen) {
throw new SerialError(ErrorCode.NOT_OPEN, 'Port is not open');
}
return new Promise((resolve, reject) => {
this.serialPort!.get((error, options) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Get failed: ${error.message}`, this.config?.port, error));
} else {
resolve(options);
}
});
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to get port options: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 注册事件监听器
*/
on(event: string, handler: Function): void {
// 添加到内部处理器列表
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event)!.push(handler);
// 如果端口已打开,直接注册
if (this.serialPort) {
this.serialPort.on(event, handler as (...args: any[]) => void);
}
logger.debug(`Event listener registered: ${event}`, {
port: this.config?.port,
handlerCount: this.eventHandlers.get(event)?.length
});
}
/**
* 移除事件监听器
*/
off(event: string, handler: Function): void {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index !== -1) {
handlers.splice(index, 1);
}
}
// 如果端口已打开,直接移除
if (this.serialPort) {
this.serialPort.off(event, handler as (...args: any[]) => void);
}
logger.debug(`Event listener removed: ${event}`, {
port: this.config?.port,
remainingHandlers: this.eventHandlers.get(event)?.length
});
}
/**
* 触发事件
*/
private emit(event: string, ...args: any[]): void {
const handlers = this.eventHandlers.get(event);
if (handlers) {
for (const handler of handlers) {
try {
handler(...args);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Event handler error: ${event}`, errorObj);
}
}
}
}
/**
* 处理数据接收事件
*/
private handleDataReceived(data: Buffer): void {
try {
this.stats.bytesRead += data.length;
this.stats.readCount++;
this.updateLastActivity();
logger.debug(`Data received from port: ${this.config?.port}`, {
size: data.length,
totalBytes: this.stats.bytesRead,
data: data.toString('hex').substring(0, 100) + (data.length > 50 ? '...' : '')
});
// 触发data事件
this.emit('data', data);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Error handling received data: ${this.config?.port}`, errorObj);
}
}
/**
* 处理错误事件
*/
private handleError(error: Error): void {
try {
this.stats.errorCount++;
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Serial port error: ${this.config?.port}`, errorObj);
// 根据错误类型决定是否关闭端口
const isFatal = this.isFatalError(error);
if (isFatal) {
logger.warn(`Fatal error detected, closing port: ${this.config?.port}`);
this.close().catch(closeError => {
const closeErrorObj = closeError instanceof Error ? closeError : new Error(String(closeError));
logger.error(`Error closing port after fatal error: ${this.config?.port}`, closeErrorObj);
});
}
// 触发error事件
this.emit('error', error, isFatal);
} catch (handlerError) {
const handlerErrorObj = handlerError instanceof Error ? handlerError : new Error(String(handlerError));
logger.error(`Error in error handler: ${this.config?.port}`, handlerErrorObj);
}
}
/**
* 处理关闭事件
*/
private handleClose(): void {
try {
logger.info(`Serial port closed: ${this.config?.port}`);
this._isOpen = false;
this.isWriting = false;
this.writeQueue.length = 0;
// 触发close事件
this.emit('close');
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Error handling close event: ${this.config?.port}`, errorObj);
}
}
/**
* 处理打开事件
*/
private handleOpen(): void {
try {
logger.info(`Serial port opened: ${this.config?.port}`);
this._isOpen = true;
// 触发open事件
this.emit('open');
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Error handling open event: ${this.config?.port}`, errorObj);
}
}
/**
* 判断是否为致命错误
*/
private isFatalError(error: Error): boolean {
const message = error.message.toLowerCase();
// 系统级错误
if (message.includes('access denied') ||
message.includes('permission denied') ||
message.includes('device disconnected') ||
message.includes('no such device')) {
return true;
}
// 配置错误
if (message.includes('invalid argument') ||
message.includes('bad file descriptor')) {
return true;
}
return false;
}
/**
* 更新最后活动时间
*/
private updateLastActivity(): void {
this.stats.lastActivity = new Date();
}
/**
* 获取统计信息
*/
getStats(): SerialStats {
return { ...this.stats };
}
/**
* 重置统计信息
*/
resetStats(): void {
this.stats = {
bytesWritten: 0,
bytesRead: 0,
writeCount: 0,
readCount: 0,
errorCount: 0
};
logger.debug(`Stats reset for port: ${this.config?.port}`);
}
/**
* 检查端口是否打开
*/
isOpen(): boolean {
return this._isOpen && this.serialPort?.isOpen === true;
}
/**
* 检查端口是否打开
*/
isOpenPort(): boolean {
return this._isOpen && this.serialPort?.isOpen === true;
}
/**
* 获取端口配置
*/
getConfig(): PortConfig | undefined {
return this.config ? { ...this.config } : undefined;
}
/**
* 获取端口路径
*/
getPortPath(): string | undefined {
return this.config?.port;
}
/**
* 获取信号状态
*/
async getSignals(): Promise<any> {
try {
if (!this.serialPort || !this._isOpen) {
throw new SerialError(ErrorCode.NOT_OPEN, 'Port is not open', this.config?.port);
}
return new Promise((resolve, reject) => {
this.serialPort!.get((error, signals) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Failed to get signals: ${error.message}`, this.config?.port, error));
} else {
resolve(signals);
}
});
});
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to get signals: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 设置信号状态
*/
async setSignals(signals: any): Promise<void> {
try {
if (!this.serialPort || !this._isOpen) {
throw new SerialError(ErrorCode.NOT_OPEN, 'Port is not open', this.config?.port);
}
await new Promise<void>((resolve, reject) => {
this.serialPort!.set(signals, (error) => {
if (error) {
reject(new SerialError(ErrorCode.WRITE_FAILED, `Failed to set signals: ${error.message}`, this.config?.port, error));
} else {
resolve();
}
});
});
logger.debug(`Signals set: ${this.config?.port}`, signals);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to set signals: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 强制刷新写入队列
*/
async flushWriteQueue(): Promise<void> {
try {
if (this.writeQueue.length > 0) {
logger.info(`Flushing write queue: ${this.config?.port}`, {
queueSize: this.writeQueue.length
});
// 等待队列处理完成
while (this.isWriting || this.writeQueue.length > 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
logger.info(`Write queue flushed: ${this.config?.port}`);
}
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to flush write queue: ${this.config?.port}`, errorObj);
throw error;
}
}
/**
* 销毁适配器
*/
async destroy(): Promise<void> {
try {
logger.info(`Destroying serial port adapter: ${this.config?.port}`);
// 清空队列
this.writeQueue.length = 0;
// 关闭端口
if (this._isOpen) {
await this.close();
}
// 销毁串口对象
if (this.serialPort) {
this.serialPort.destroy();
this.serialPort = undefined;
}
// 清理事件处理器
this.eventHandlers.clear();
logger.info(`Serial port adapter destroyed: ${this.config?.port}`);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
logger.error(`Failed to destroy adapter: ${this.config?.port}`, errorObj);
}
}
}