/**
* 串口服务实现
*/
import { logger } from '@/utils/logger';
import {
ISerialService,
SerialOpenRequest,
SerialOpenResponse,
SerialCloseRequest,
SerialCloseResponse,
SerialWriteRequest,
SerialWriteResponse,
SerialListResponse,
SerialStatusRequest,
SerialStatusResponse,
SerialSubscribeRequest,
SerialSubscribeResponse,
PortConfig,
SerialPortInfo,
ReadResult,
ErrorCode,
SerialError
} from '@/types';
import { SerialEngine } from '@/core/SerialEngine';
import { PlatformAdapter } from '@/adapters/PlatformAdapter';
import { v4 as uuidv4 } from 'uuid';
import { EventType } from '@/types';
/**
* 订阅信息
*/
interface Subscription {
id: string;
clientId: string;
port: string;
types: string[];
createdAt: number;
active: boolean;
}
/**
* 串口服务配置
*/
interface SerialServiceConfig {
defaultTimeout: number;
maxSubscriptions: number;
subscriptionTimeout: number;
enableAutoReconnect: boolean;
maxRetryAttempts: number;
}
/**
* 串口服务实现类
*/
export class SerialService implements ISerialService {
private serialEngine: SerialEngine;
private platformAdapter: PlatformAdapter;
private config: SerialServiceConfig;
private subscriptions: Map<string, Subscription> = new Map();
private eventListeners: Map<string, string> = new Map(); // subscriptionId -> eventId
constructor(
serialEngine: SerialEngine,
platformAdapter: PlatformAdapter,
config?: Partial<SerialServiceConfig>
) {
this.serialEngine = serialEngine;
this.platformAdapter = platformAdapter;
// 设置默认配置
this.config = {
defaultTimeout: 5000,
maxSubscriptions: 100,
subscriptionTimeout: 300000, // 5分钟
enableAutoReconnect: true,
maxRetryAttempts: 3,
...config
};
// 注册事件监听器
this.setupEventListeners();
logger.debug('SerialService initialized', this.config);
}
/**
* 打开串口
*/
async openPort(request: SerialOpenRequest): Promise<SerialOpenResponse> {
const startTime = Date.now();
try {
logger.info(`Opening port: ${request.port}`, request);
// 验证配置
this.validateOpenConfig(request);
// 检查端口是否存在
const availablePorts = await this.platformAdapter.listPorts();
const portExists = availablePorts.some(p => p.path === request.port);
if (!portExists) {
throw new SerialError('PORT_NOT_FOUND', `Port not found: ${request.port}`, request.port);
}
// 转换配置格式
const portConfig: PortConfig = {
port: request.port,
baudrate: request.baudrate,
dataBits: request.data_bits,
parity: request.parity,
stopBits: request.stop_bits,
flowControl: request.flow_control
};
// 打开端口
const portInfo = await this.serialEngine.openPort(portConfig);
const duration = Date.now() - startTime;
// logger.info(`Port opened successfully: ${request.port}`, { duration });
return {
status: 'ok',
port: portInfo.path,
config: portConfig
};
} catch (error) {
const duration = Date.now() - startTime;
logger.error(`Failed to open port: ${request.port}`, error as Error, { duration });
if (error instanceof SerialError) {
return {
status: 'error',
error_code: error.code as any,
message: error.message,
port: request.port
};
} else {
return {
status: 'error',
error_code: 'SYSTEM_ERROR',
message: `Unexpected error: ${(error as Error).message}`,
port: request.port
};
}
}
}
/**
* 关闭串口
*/
async closePort(request: SerialCloseRequest): Promise<SerialCloseResponse> {
const startTime = Date.now();
try {
logger.info(`Closing port: ${request.port}`);
await this.serialEngine.closePort(request.port);
// 清理该端口的所有订阅
this.cleanupPortSubscriptions(request.port);
const duration = Date.now() - startTime;
logger.info(`Port closed successfully: ${request.port}`, { duration });
return {
status: 'ok',
port: request.port
};
} catch (error) {
const duration = Date.now() - startTime;
logger.error(`Failed to close port: ${request.port}`, error as Error, { duration });
if (error instanceof SerialError) {
return {
status: 'error',
error_code: error.code as any,
message: error.message,
port: request.port
};
} else {
return {
status: 'error',
error_code: 'SYSTEM_ERROR',
message: `Unexpected error: ${(error as Error).message}`,
port: request.port
};
}
}
}
/**
* 写入数据并读取响应
*/
async writeAndRead(request: SerialWriteRequest): Promise<SerialWriteResponse> {
const startTime = Date.now();
const sessionId = uuidv4();
try {
logger.info(`Writing to port: ${request.port}`, {
sessionId,
dataSize: request.data.length,
timeout: request.timeout_ms,
filterUrc: request.filter_urc
});
// 验证请求
this.validateWriteRequest(request);
// 转换数据
const data = this.hexToBuffer(request.data);
// 设置默认值
const timeout = request.timeout_ms || this.config.defaultTimeout;
const filterUrc = request.filter_urc !== false;
// 执行写入并读取
const result = await this.serialEngine.writeAndRead(
request.port,
data,
timeout,
sessionId
);
// 处理URC过滤选项
let finalResult = { ...result };
if (!filterUrc && result.filteredUrc) {
// 如果不过滤URC,将URC数据合并到响应中
const urcData = result.filteredUrc.join('\r\n');
if (urcData) {
finalResult.rawHex = this.bufferToHex(
Buffer.concat([this.hexToBuffer(result.rawHex), Buffer.from(urcData)])
);
finalResult.text = result.text + urcData;
}
finalResult.filteredUrc = undefined;
finalResult.urcDetected = false;
}
const duration = Date.now() - startTime;
logger.info(`Write and read completed: ${request.port}`, {
sessionId,
duration,
status: finalResult.status,
responseSize: finalResult.rawHex.length / 2,
urcDetected: finalResult.urcDetected
});
return {
status: finalResult.status,
session_id: finalResult.sessionId,
timeout_ms: finalResult.timeoutMs,
raw_hex: finalResult.rawHex,
text: finalResult.text,
partial: finalResult.partial,
urc_detected: finalResult.urcDetected,
filtered_urc: finalResult.filteredUrc
};
} catch (error) {
const duration = Date.now() - startTime;
logger.error(`Failed to write and read: ${request.port}`, error as Error, {
sessionId,
duration
});
if (error instanceof SerialError) {
return {
status: 'error',
session_id: sessionId,
timeout_ms: request.timeout_ms || this.config.defaultTimeout,
raw_hex: '',
text: '',
partial: false,
urc_detected: false,
error_code: error.code as any,
message: error.message
};
} else {
return {
status: 'error',
session_id: sessionId,
timeout_ms: request.timeout_ms || this.config.defaultTimeout,
raw_hex: '',
text: '',
partial: false,
urc_detected: false,
error_code: 'SYSTEM_ERROR',
message: `Unexpected error: ${(error as Error).message}`
};
}
}
}
/**
* 获取可用串口列表
*/
async listPorts(): Promise<SerialListResponse> {
try {
logger.debug('Listing available ports');
const ports = await this.platformAdapter.listPorts();
// 添加额外信息
const enrichedPorts = await Promise.all(ports.map(async port => ({
...port,
deviceType: this.platformAdapter.getDeviceType(port.path),
isInUse: await this.platformAdapter.isPortInUse(port.path),
recommendedConfig: this.platformAdapter.getRecommendedConfig(port.path)
})));
logger.info(`Found ${enrichedPorts.length} ports`);
return {
status: 'ok',
ports: enrichedPorts
};
} catch (error) {
logger.error('Failed to list ports', error as Error);
throw new SerialError('SYSTEM_ERROR', `Failed to list ports: ${(error as Error).message}`);
}
}
/**
* 获取端口状态
*/
async getPortStatus(port: string): Promise<SerialStatusResponse> {
try {
logger.debug(`Getting status for port: ${port}`);
const status = this.serialEngine.getPortStatus(port);
// 添加额外信息
let signals = undefined;
if (status.isOpen) {
try {
const portDetails = await this.serialEngine.getPortDetails(port);
signals = portDetails.signals;
} catch (error) {
logger.warn(`Failed to get signals for port: ${port}`, error);
}
}
logger.debug(`Status retrieved for port: ${port}`, {
isOpen: status.isOpen,
hasSignals: !!signals
});
return {
status: 'ok',
port: port,
isOpen: status.isOpen,
config: status.config,
signals,
lastActivity: status.lastActivity,
error: status.error
};
} catch (error) {
logger.error(`Failed to get port status: ${port}`, error as Error);
throw new SerialError('SYSTEM_ERROR', `Failed to get port status: ${(error as Error).message}`, port);
}
}
/**
* 订阅端口事件
*/
async subscribe(clientId: string, request: SerialSubscribeRequest): Promise<SerialSubscribeResponse> {
try {
logger.info(`Subscribing to port events: ${request.port}`, {
clientId,
types: request.types
});
// 验证请求
this.validateSubscribeRequest(request, clientId);
// 检查端口是否打开
const portStatus = this.serialEngine.getPortStatus(request.port);
if (!portStatus.isOpen) {
throw new SerialError('NOT_OPEN', `Port is not open: ${request.port}`, request.port);
}
// 生成订阅ID
const subscriptionId = uuidv4();
// 创建订阅记录
const subscription: Subscription = {
id: subscriptionId,
clientId,
port: request.port,
types: request.types,
createdAt: Date.now(),
active: true
};
// 检查订阅数量限制
if (this.subscriptions.size >= this.config.maxSubscriptions) {
throw new SerialError('RESOURCE_EXHAUSTED',
`Maximum subscriptions reached: ${this.config.maxSubscriptions}`,
request.port);
}
// 保存订阅
this.subscriptions.set(subscriptionId, subscription);
// 注册事件监听器
const eventTypes = request.types.map(type => {
switch (type) {
case 'data': return EventType.SERIAL_DATA;
case 'error': return EventType.SERIAL_ERROR;
case 'status': return EventType.SERIAL_STATUS;
case 'urc': return EventType.SERIAL_URC;
default: return EventType.SERIAL_DATA;
}
});
const eventId = this.serialEngine.subscribe(
request.port,
eventTypes,
(event: any) => this.handleSubscriptionEvent(subscriptionId, event)
);
this.eventListeners.set(subscriptionId, eventId);
logger.info(`Subscription created: ${subscriptionId}`, {
clientId,
port: request.port,
types: request.types
});
return {
status: 'ok',
subscription_id: subscriptionId,
port: request.port,
types: request.types
};
} catch (error) {
logger.error(`Failed to subscribe: ${request.port}`, error as Error, {
clientId,
types: request.types
});
throw error;
}
}
/**
* 取消订阅
*/
async unsubscribe(clientId: string, subscriptionId: string): Promise<void> {
try {
logger.debug(`Unsubscribing: ${subscriptionId}`, { clientId });
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription) {
logger.warn(`Subscription not found: ${subscriptionId}`);
return;
}
// 验证客户端ID
if (subscription.clientId !== clientId) {
throw new SerialError('SESSION_ERROR',
`Subscription does not belong to client: ${clientId}`,
subscription.port);
}
// 取消事件监听
const eventId = this.eventListeners.get(subscriptionId);
if (eventId) {
this.serialEngine.unsubscribe(eventId);
this.eventListeners.delete(subscriptionId);
}
// 删除订阅
this.subscriptions.delete(subscriptionId);
logger.info(`Subscription cancelled: ${subscriptionId}`, {
clientId,
port: subscription.port,
activeTime: Date.now() - subscription.createdAt
});
} catch (error) {
logger.error(`Failed to unsubscribe: ${subscriptionId}`, error as Error, {
clientId
});
throw error;
}
}
/**
* 获取客户端的所有订阅
*/
getClientSubscriptions(clientId: string): Subscription[] {
return Array.from(this.subscriptions.values())
.filter(sub => sub.clientId === clientId && sub.active);
}
/**
* 获取所有订阅
*/
getAllSubscriptions(): Subscription[] {
return Array.from(this.subscriptions.values())
.filter(sub => sub.active);
}
/**
* 清理过期订阅
*/
cleanupExpiredSubscriptions(): void {
const now = Date.now();
const expired: string[] = [];
for (const [id, subscription] of this.subscriptions) {
if (now - subscription.createdAt > this.config.subscriptionTimeout) {
expired.push(id);
}
}
for (const id of expired) {
const subscription = this.subscriptions.get(id)!;
// 取消事件监听
const eventId = this.eventListeners.get(id);
if (eventId) {
this.serialEngine.unsubscribe(eventId);
this.eventListeners.delete(id);
}
// 删除订阅
this.subscriptions.delete(id);
logger.info(`Expired subscription cleaned up: ${id}`, {
clientId: subscription.clientId,
port: subscription.port,
age: now - subscription.createdAt
});
}
if (expired.length > 0) {
logger.debug(`Cleaned up ${expired.length} expired subscriptions`);
}
}
/**
* 验证打开配置
*/
private validateOpenConfig(request: SerialOpenRequest): void {
if (!request.port) {
throw new SerialError('INVALID_CONFIG', 'Port is required', '');
}
if (!request.baudrate || request.baudrate <= 0) {
throw new SerialError('INVALID_CONFIG', 'Invalid baud rate', request.port);
}
const validDataBits = [5, 6, 7, 8];
if (!validDataBits.includes(request.data_bits)) {
throw new SerialError('INVALID_CONFIG', 'Invalid data bits', request.port);
}
const validParity = ['none', 'even', 'odd'];
if (!validParity.includes(request.parity)) {
throw new SerialError('INVALID_CONFIG', 'Invalid parity', request.port);
}
const validStopBits = [1, 1.5, 2];
if (!validStopBits.includes(request.stop_bits)) {
throw new SerialError('INVALID_CONFIG', 'Invalid stop bits', request.port);
}
const validFlowControl = ['none', 'rts_cts', 'xon_xoff'];
if (!validFlowControl.includes(request.flow_control)) {
throw new SerialError('INVALID_CONFIG', 'Invalid flow control', request.port);
}
}
/**
* 验证写入请求
*/
private validateWriteRequest(request: SerialWriteRequest): void {
if (!request.port) {
throw new SerialError('INVALID_CONFIG', 'Port is required', '');
}
if (!request.data) {
throw new SerialError('INVALID_CONFIG', 'Data is required', request.port);
}
// 验证十六进制字符串
if (!/^[0-9A-Fa-f]*$/.test(request.data)) {
throw new SerialError('INVALID_CONFIG', 'Invalid hex data', request.port);
}
if (request.timeout_ms && request.timeout_ms <= 0) {
throw new SerialError('INVALID_CONFIG', 'Timeout must be positive', request.port);
}
if (request.timeout_ms && request.timeout_ms > 60000) {
throw new SerialError('INVALID_CONFIG', 'Timeout cannot exceed 60 seconds', request.port);
}
}
/**
* 验证订阅请求
*/
private validateSubscribeRequest(request: SerialSubscribeRequest, clientId: string): void {
if (!request.port) {
throw new SerialError('INVALID_CONFIG', 'Port is required', '');
}
if (!request.types || request.types.length === 0) {
throw new SerialError('INVALID_CONFIG', 'At least one event type is required', request.port);
}
const validTypes = ['data', 'error', 'status', 'urc'];
for (const type of request.types) {
if (!validTypes.includes(type)) {
throw new SerialError('INVALID_CONFIG', `Invalid event type: ${type}`, request.port);
}
}
}
/**
* 处理订阅事件
*/
private handleSubscriptionEvent(subscriptionId: string, event: any): void {
try {
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription || !subscription.active) {
return;
}
// 检查事件类型是否匹配
if (!subscription.types.includes(event.type)) {
return;
}
// 创建通知
const notification = {
method: 'serial.report',
params: {
port: subscription.port,
timestamp: event.timestamp,
type: event.type,
data: event.data || event.message || '',
session_id: event.sessionId
}
};
// TODO: 发送通知到客户端
// 这里需要与MCP层集成
logger.debug('Subscription event', {
subscriptionId,
port: subscription.port,
type: event.type
});
} catch (error) {
logger.error(`Failed to handle subscription event: ${subscriptionId}`, error as Error);
}
}
/**
* 设置事件监听器
*/
private setupEventListeners(): void {
// 监听端口关闭事件,清理相关订阅
// 监听所有端口的关闭事件
// TODO: 需要实现全局事件监听机制
logger.debug('Event listeners setup for SerialService');
logger.debug('Event listeners setup for SerialService');
}
/**
* 清理端口订阅
*/
private cleanupPortSubscriptions(port: string): void {
const toRemove: string[] = [];
for (const [id, subscription] of this.subscriptions) {
if (subscription.port === port) {
toRemove.push(id);
}
}
for (const id of toRemove) {
const subscription = this.subscriptions.get(id)!;
// 取消事件监听
const eventId = this.eventListeners.get(id);
if (eventId) {
this.serialEngine.unsubscribe(eventId);
this.eventListeners.delete(id);
}
// 删除订阅
this.subscriptions.delete(id);
logger.info(`Port subscription cleaned up: ${id}`, {
clientId: subscription.clientId,
port: port
});
}
if (toRemove.length > 0) {
logger.info(`Cleaned up ${toRemove.length} subscriptions for closed port: ${port}`);
}
}
/**
* Hex字符串转Buffer
*/
private hexToBuffer(hex: string): Buffer {
const cleanHex = hex.replace(/\s+/g, '');
if (cleanHex.length % 2 !== 0) {
throw new Error('Invalid hex string length');
}
return Buffer.from(cleanHex, 'hex');
}
/**
* Buffer转Hex字符串
*/
private bufferToHex(buffer: Buffer): string {
return buffer.toString('hex').toUpperCase();
}
/**
* 获取服务统计
*/
getServiceStats(): any {
const subscriptionsByClient = new Map<string, number>();
const subscriptionsByPort = new Map<string, number>();
for (const subscription of this.subscriptions.values()) {
subscriptionsByClient.set(
subscription.clientId,
(subscriptionsByClient.get(subscription.clientId) || 0) + 1
);
subscriptionsByPort.set(
subscription.port,
(subscriptionsByPort.get(subscription.port) || 0) + 1
);
}
return {
totalSubscriptions: this.subscriptions.size,
maxSubscriptions: this.config.maxSubscriptions,
subscriptionsByClient: Object.fromEntries(subscriptionsByClient),
subscriptionsByPort: Object.fromEntries(subscriptionsByPort),
config: this.config
};
}
/**
* 更新配置
*/
updateConfig(newConfig: Partial<SerialServiceConfig>): void {
this.config = { ...this.config, ...newConfig };
logger.info('SerialService configuration updated', newConfig);
}
/**
* 销毁串口服务
*/
dispose(): void {
try {
logger.info('Disposing SerialService...');
// 清理所有订阅
for (const [id, eventId] of this.eventListeners) {
this.serialEngine.unsubscribe(eventId);
}
this.eventListeners.clear();
this.subscriptions.clear();
logger.info('SerialService disposed');
} catch (error) {
logger.error('Failed to dispose SerialService', error as Error);
}
}
}