/**
* PlatformAdapter 单元测试
*/
import { PlatformAdapter } from '@/adapters/PlatformAdapter';
import { SerialPort } from 'serialport';
import { SerialError } from '@/types';
import { ErrorCode } from '@/utils/error-codes';
import * as os from 'os';
import { exec, ChildProcess } from 'child_process';
// Mock serialport
jest.mock('serialport');
const MockSerialPort = SerialPort as jest.MockedClass<typeof SerialPort>;
// Ensure the list method is properly mocked
MockSerialPort.list = jest.fn();
// Mock os
jest.mock('os');
const mockOs = os as jest.Mocked<typeof os>;
// Mock child_process
jest.mock('child_process');
const mockExec = exec as jest.MockedFunction<typeof exec>;
// Mock ChildProcess for proper typing
const createMockChildProcess = (): jest.Mocked<ChildProcess> => {
return {
on: jest.fn(),
emit: jest.fn(),
listeners: jest.fn(),
once: jest.fn(),
prependListener: jest.fn(),
prependOnceListener: jest.fn(),
removeAllListeners: jest.fn(),
removeListener: jest.fn(),
setMaxListeners: jest.fn(),
getMaxListeners: jest.fn(),
addListener: jest.fn(),
eventNames: jest.fn(),
listenerCount: jest.fn(),
pipe: jest.fn(),
unpipe: jest.fn(),
unref: jest.fn(),
ref: jest.fn(),
send: jest.fn(),
disconnect: jest.fn(),
connected: false,
killed: false,
kill: jest.fn(),
pid: undefined,
signalCode: null,
spawnfile: '',
spawnargs: [],
stdin: {
write: jest.fn(),
end: jest.fn(),
destroy: jest.fn(),
pause: jest.fn(),
resume: jest.fn(),
setEncoding: jest.fn(),
pipe: jest.fn(),
unpipe: jest.fn(),
unshift: jest.fn(),
wrap: jest.fn(),
readable: false,
readableHighWaterMark: 0,
readableLength: 0,
readableObjectMode: false,
destroyed: false,
_read: jest.fn(),
_destroy: jest.fn(),
read: jest.fn(),
setRawMode: jest.fn(),
isPaused: jest.fn(),
push: jest.fn(),
_readableState: null as any,
readableFlowing: null as any,
on: jest.fn(),
once: jest.fn(),
emit: jest.fn(),
listeners: jest.fn(),
removeListener: jest.fn(),
removeAllListeners: jest.fn(),
setMaxListeners: jest.fn(),
getMaxListeners: jest.fn(),
prependListener: jest.fn(),
prependOnceListener: jest.fn(),
eventNames: jest.fn(),
listenerCount: jest.fn()
},
stdout: {
on: jest.fn(),
pipe: jest.fn(),
readable: false,
readableHighWaterMark: 0,
readableLength: 0,
readableObjectMode: false,
destroyed: false,
_read: jest.fn(),
_destroy: jest.fn(),
read: jest.fn(),
setEncoding: jest.fn(),
pause: jest.fn(),
resume: jest.fn(),
unpipe: jest.fn(),
unshift: jest.fn(),
wrap: jest.fn(),
push: jest.fn(),
_readableState: null as any,
readableFlowing: null as any,
once: jest.fn(),
emit: jest.fn(),
listeners: jest.fn(),
removeListener: jest.fn(),
removeAllListeners: jest.fn(),
setMaxListeners: jest.fn(),
getMaxListeners: jest.fn(),
prependListener: jest.fn(),
prependOnceListener: jest.fn(),
eventNames: jest.fn(),
listenerCount: jest.fn(),
addListener: jest.fn()
},
stderr: {
on: jest.fn(),
pipe: jest.fn(),
readable: false,
readableHighWaterMark: 0,
readableLength: 0,
readableObjectMode: false,
destroyed: false,
_read: jest.fn(),
_destroy: jest.fn(),
read: jest.fn(),
setEncoding: jest.fn(),
pause: jest.fn(),
resume: jest.fn(),
unpipe: jest.fn(),
unshift: jest.fn(),
wrap: jest.fn(),
push: jest.fn(),
_readableState: null as any,
readableFlowing: null as any,
once: jest.fn(),
emit: jest.fn(),
listeners: jest.fn(),
removeListener: jest.fn(),
removeAllListeners: jest.fn(),
setMaxListeners: jest.fn(),
getMaxListeners: jest.fn(),
prependListener: jest.fn(),
prependOnceListener: jest.fn(),
eventNames: jest.fn(),
listenerCount: jest.fn(),
addListener: jest.fn()
},
_events: null as any,
_eventsCount: 0,
_maxListeners: undefined,
[Symbol.asyncIterator]: jest.fn()
} as any;
};
describe('PlatformAdapter', () => {
let adapter: PlatformAdapter;
beforeEach(() => {
adapter = new PlatformAdapter();
jest.clearAllMocks();
});
afterEach(() => {
adapter.clearCache();
});
describe('listPorts', () => {
it('should return cached ports when cache is valid', async () => {
// Mock 第一次调用
(MockSerialPort.list as jest.Mock).mockResolvedValueOnce([
{ path: 'COM1', manufacturer: 'Test' }
]);
// 第一次调用
const ports1 = await adapter.listPorts();
expect(ports1).toHaveLength(1);
expect(MockSerialPort.list).toHaveBeenCalledTimes(1);
// 第二次调用应该使用缓存
const ports2 = await adapter.listPorts();
expect(ports2).toEqual(ports1);
expect(MockSerialPort.list).toHaveBeenCalledTimes(1);
});
it('should fetch new ports when cache expires', async () => {
// Mock 第一次调用
(MockSerialPort.list as jest.Mock).mockResolvedValueOnce([
{ path: 'COM1', manufacturer: 'Test' }
]);
// 第一次调用
await adapter.listPorts();
// 设置缓存为过期
adapter.setCacheTTL(-1);
// Mock 第二次调用
(MockSerialPort.list as jest.Mock).mockResolvedValueOnce([
{ path: 'COM2', manufacturer: 'Test2' }
]);
// 第二次调用应该重新获取
const ports = await adapter.listPorts();
expect(ports).toHaveLength(1);
expect(ports[0].path).toBe('COM2');
expect(MockSerialPort.list).toHaveBeenCalledTimes(2);
});
it('should filter invalid ports', async () => {
(MockSerialPort.list as jest.Mock).mockResolvedValueOnce([
{ path: 'COM1', manufacturer: 'Test' },
{ path: '', manufacturer: 'Invalid' },
null as any,
{ manufacturer: 'NoPath' }
]);
const ports = await adapter.listPorts();
expect(ports).toHaveLength(1);
expect(ports[0].path).toBe('COM1');
});
it('should handle errors gracefully', async () => {
(MockSerialPort.list as jest.Mock).mockRejectedValueOnce(new Error('Test error'));
await expect(adapter.listPorts()).rejects.toThrow(SerialError);
});
});
describe('checkPermissions', () => {
beforeEach(() => {
mockOs.platform.mockReturnValue('linux');
});
it('should return cached permission result', async () => {
// Mock exec for groups
mockExec.mockImplementation((command, callback) => {
if (command.includes('groups')) {
(callback as any)(null, 'dialout adm cdrom sudo dip plugdev lpadmin', '');
}
return createMockChildProcess();
});
// 第一次调用
const perm1 = await adapter.checkPermissions();
expect(perm1).toBe(true);
// 第二次调用应该使用缓存
const perm2 = await adapter.checkPermissions();
expect(perm2).toBe(perm1);
expect(mockExec).toHaveBeenCalledTimes(1);
});
it('should check Linux permissions correctly', async () => {
mockExec.mockImplementation((command, callback) => {
if (command.includes('groups')) {
(callback as any)(null, 'dialout adm cdrom sudo dip plugdev lpadmin', '');
}
return createMockChildProcess();
});
const hasPermission = await adapter.checkPermissions();
expect(hasPermission).toBe(true);
});
it('should detect lack of dialout group on Linux', async () => {
mockExec.mockImplementation((command, callback) => {
if (command.includes('groups')) {
(callback as any)(null, 'user adm cdrom sudo dip plugdev lpadmin', '');
}
if (command.includes('sudo')) {
(callback as any)(new Error('sudo: a password is required'), '', '');
}
return createMockChildProcess();
});
const hasPermission = await adapter.checkPermissions();
expect(hasPermission).toBe(false);
});
it('should allow access with sudo on Linux', async () => {
mockExec.mockImplementation((command, callback) => {
if (command.includes('groups')) {
(callback as any)(null, 'user adm cdrom dip plugdev lpadmin', '');
}
if (command.includes('sudo')) {
(callback as any)(null, '', '');
}
return createMockChildProcess();
});
const hasPermission = await adapter.checkPermissions();
expect(hasPermission).toBe(true);
});
it('should handle Windows permissions', async () => {
mockOs.platform.mockReturnValue('win32');
mockExec.mockImplementation((command, callback) => {
if (command.includes('net')) {
(callback as any)(new Error('Access denied'), '', '');
}
return createMockChildProcess();
});
const hasPermission = await adapter.checkPermissions();
expect(hasPermission).toBe(true); // Windows默认允许
});
it('should handle macOS permissions', async () => {
mockOs.platform.mockReturnValue('darwin');
mockExec.mockImplementation((command, callback) => {
if (command.includes('groups')) {
(callback as any)(null, 'admin dialout', '');
}
return createMockChildProcess();
});
const hasPermission = await adapter.checkPermissions();
expect(hasPermission).toBe(true);
});
});
describe('getPlatformInfo', () => {
it('should return correct platform information', () => {
mockOs.platform.mockReturnValue('linux');
mockOs.arch.mockReturnValue('x64');
mockOs.release.mockReturnValue('5.4.0');
const info = adapter.getPlatformInfo();
expect(info.platform).toBe('linux');
expect(info.arch).toBe('x64');
expect(info.version).toBe('5.4.0');
expect(info.supportedFeatures).toContain('TTY Ports');
expect(info.supportedFeatures).toContain('udev Rules');
});
it('should cache platform info', () => {
mockOs.platform.mockReturnValue('win32');
const info1 = adapter.getPlatformInfo();
const info2 = adapter.getPlatformInfo();
expect(info1).toBe(info2);
expect(mockOs.platform).toHaveBeenCalledTimes(1);
});
});
describe('normalizePortPath', () => {
it('should normalize Windows COM ports', () => {
mockOs.platform.mockReturnValue('win32');
expect(adapter.normalizePortPath('com1')).toBe('COM1');
expect(adapter.normalizePortPath('COM10')).toBe('\\.\COM10');
expect(adapter.normalizePortPath('\\.\COM5')).toBe('\\.\COM5');
});
it('should normalize Linux ports', () => {
mockOs.platform.mockReturnValue('linux');
expect(adapter.normalizePortPath('ttyUSB0')).toBe('/dev/ttyUSB0');
expect(adapter.normalizePortPath('/dev/ttyS0')).toBe('/dev/ttyS0');
});
it('should normalize macOS ports', () => {
mockOs.platform.mockReturnValue('darwin');
expect(adapter.normalizePortPath('cu.usbserial')).toBe('/dev/cu.usbserial');
expect(adapter.normalizePortPath('/dev/tty.usbserial')).toBe('/dev/tty.usbserial');
});
});
describe('validatePortPath', () => {
it('should validate Windows COM ports', () => {
mockOs.platform.mockReturnValue('win32');
expect(adapter.validatePortPath('COM1')).toBe(true);
expect(adapter.validatePortPath('COM10')).toBe(true);
expect(adapter.validatePortPath('\\.\COM10')).toBe(true);
expect(adapter.validatePortPath('INVALID')).toBe(false);
});
it('should validate Linux ports', () => {
mockOs.platform.mockReturnValue('linux');
expect(adapter.validatePortPath('/dev/ttyUSB0')).toBe(true);
expect(adapter.validatePortPath('/dev/ttyACM0')).toBe(true);
expect(adapter.validatePortPath('/dev/ttyS0')).toBe(true);
expect(adapter.validatePortPath('/dev/invalid')).toBe(false);
});
it('should validate macOS ports', () => {
mockOs.platform.mockReturnValue('darwin');
expect(adapter.validatePortPath('/dev/cu.usbserial')).toBe(true);
expect(adapter.validatePortPath('/dev/tty.usbserial')).toBe(true);
expect(adapter.validatePortPath('/dev/invalid')).toBe(false);
});
});
describe('getDeviceType', () => {
it('should identify Linux device types', () => {
mockOs.platform.mockReturnValue('linux');
expect(adapter.getDeviceType('/dev/ttyUSB0')).toBe('USB Serial');
expect(adapter.getDeviceType('/dev/ttyACM0')).toBe('USB CDC ACM');
expect(adapter.getDeviceType('/dev/ttyS0')).toBe('Built-in Serial');
expect(adapter.getDeviceType('/dev/unknown')).toBe('Unknown');
});
it('should identify Windows device types', () => {
mockOs.platform.mockReturnValue('win32');
expect(adapter.getDeviceType('COM1')).toBe('COM Port');
});
it('should identify macOS device types', () => {
mockOs.platform.mockReturnValue('darwin');
expect(adapter.getDeviceType('/dev/cu.usbserial')).toBe('USB Serial');
expect(adapter.getDeviceType('/dev/tty.usbserial')).toBe('TTY Device');
});
});
describe('getRecommendedConfig', () => {
it('should return config for USB devices', () => {
mockOs.platform.mockReturnValue('linux');
const config = adapter.getRecommendedConfig('/dev/ttyUSB0');
expect(config.baudrate).toBe(115200);
expect(config.dataBits).toBe(8);
expect(config.parity).toBe('none');
});
it('should return config for built-in serial', () => {
mockOs.platform.mockReturnValue('linux');
const config = adapter.getRecommendedConfig('/dev/ttyS0');
expect(config.baudrate).toBe(9600);
});
});
describe('cache management', () => {
it('should clear cache', () => {
adapter.clearCache();
// 验证缓存已清除
expect(adapter['permissionCache'].size).toBe(0);
});
it('should set cache TTL', () => {
adapter.setCacheTTL(10000);
expect(adapter['portListCache']?.ttl).toBe(10000);
});
});
});