"""串口管理器模块
提供串口的枚举、打开、关闭、配置等核心功能。
"""
import logging
import threading
import time
from typing import Any
import serial
import serial.tools.list_ports
from serial import SerialException
from .config import get_blacklist_manager, get_config_manager
from .errors import (
InvalidParamError,
PermissionDeniedError,
PortBlacklistedError,
PortBusyError,
PortClosedError,
PortNotFoundError,
PortOpenFailedError,
WriteFailedError,
)
from .types import (
SUPPORTED_BAUDRATES,
SUPPORTED_BYTESIZES,
FlowControl,
Parity,
PortInfo,
PortStatus,
SerialConfig,
StopBits,
)
logger = logging.getLogger(__name__)
# 重连检测间隔(秒)
RECONNECT_CHECK_INTERVAL = 2.0
# 重连尝试间隔(秒)
RECONNECT_RETRY_INTERVAL = 3.0
class ManagedPort:
"""管理的串口连接
封装 pyserial 的 Serial 对象,添加配置和状态管理。
Attributes:
port: 串口路径
serial: pyserial Serial 对象
config: 串口配置
reconnecting: 是否正在重连
auto_reconnect: 是否启用自动重连
"""
def __init__(
self,
port: str,
serial_obj: serial.Serial,
config: SerialConfig,
auto_reconnect: bool = True,
) -> None:
self.port = port
self.serial = serial_obj
self.config = config
self.reconnecting = False
self.auto_reconnect = auto_reconnect
self._lock = threading.Lock()
@property
def is_connected(self) -> bool:
"""检查物理连接状态"""
try:
# 尝试读取 DSR 状态来检测连接
# 如果串口已断开,这会抛出异常
if self.serial.is_open:
# 某些串口可能不支持 DSR,所以我们用 in_waiting 检查
_ = self.serial.in_waiting
return True
except (SerialException, OSError):
pass
return False
class SerialManager:
"""串口管理器
提供串口设备的枚举、打开、关闭、配置等功能。
支持多串口同时连接、配置热更新和自动重连。
Attributes:
_ports: 已打开的串口字典,键为串口路径
_lock: 线程锁
_reconnect_thread: 重连检测线程
_running: 管理器运行状态
"""
def __init__(self, enable_auto_reconnect: bool = True) -> None:
"""初始化串口管理器
Args:
enable_auto_reconnect: 是否启用自动重连功能
"""
self._ports: dict[str, ManagedPort] = {}
self._lock = threading.RLock()
self._running = True
self._enable_auto_reconnect = enable_auto_reconnect
self._reconnect_thread: threading.Thread | None = None
if enable_auto_reconnect:
self._start_reconnect_thread()
def _start_reconnect_thread(self) -> None:
"""启动重连检测线程"""
self._reconnect_thread = threading.Thread(
target=self._reconnect_loop, daemon=True, name="serial-reconnect"
)
self._reconnect_thread.start()
logger.debug("重连检测线程已启动")
def _reconnect_loop(self) -> None:
"""重连检测循环"""
while self._running:
try:
self._check_and_reconnect()
except Exception as e:
logger.error("重连检测异常:%s", e)
time.sleep(RECONNECT_CHECK_INTERVAL)
def _check_and_reconnect(self) -> None:
"""检查并重连断开的串口"""
with self._lock:
ports_to_reconnect: list[tuple[str, SerialConfig]] = []
for port_path, managed in list(self._ports.items()):
if not managed.auto_reconnect:
continue
if not managed.is_connected and not managed.reconnecting:
logger.info("检测到串口断开:%s", port_path)
managed.reconnecting = True
ports_to_reconnect.append((port_path, managed.config))
# 在锁外执行重连操作
for port_path, config in ports_to_reconnect:
self._try_reconnect(port_path, config)
def _try_reconnect(self, port: str, config: SerialConfig) -> None:
"""尝试重连串口
Args:
port: 串口路径
config: 串口配置
"""
logger.info("尝试重连串口:%s", port)
try:
serial_obj = self._create_serial(port, config)
with self._lock:
if port in self._ports:
old_managed = self._ports[port]
try:
old_managed.serial.close()
except Exception:
pass
# 保留原始的 auto_reconnect 设置
self._ports[port] = ManagedPort(
port,
serial_obj,
config,
auto_reconnect=old_managed.auto_reconnect,
)
logger.info("串口重连成功:%s", port)
else:
# 端口在重连期间被关闭,释放新创建的串口资源
serial_obj.close()
logger.info("串口在重连期间被关闭,已释放资源:%s", port)
except Exception as e:
logger.warning("串口重连失败:%s - %s", port, e)
with self._lock:
if port in self._ports:
self._ports[port].reconnecting = False
def _create_serial(self, port: str, config: SerialConfig) -> serial.Serial:
"""创建 pyserial Serial 对象
Args:
port: 串口路径
config: 串口配置
Returns:
配置好的 Serial 对象
Raises:
PortNotFoundError: 串口不存在
PortBusyError: 串口被占用
PortOpenFailedError: 打开失败
"""
# 转换配置
parity_map = {
Parity.NONE: serial.PARITY_NONE,
Parity.EVEN: serial.PARITY_EVEN,
Parity.ODD: serial.PARITY_ODD,
Parity.MARK: serial.PARITY_MARK,
Parity.SPACE: serial.PARITY_SPACE,
}
stopbits_map = {
StopBits.ONE: serial.STOPBITS_ONE,
StopBits.ONE_POINT_FIVE: serial.STOPBITS_ONE_POINT_FIVE,
StopBits.TWO: serial.STOPBITS_TWO,
}
try:
serial_obj = serial.Serial(
port=port,
baudrate=config.baudrate,
bytesize=config.bytesize,
parity=parity_map[config.parity],
stopbits=stopbits_map[config.stopbits],
timeout=config.read_timeout_ms / 1000.0,
write_timeout=config.write_timeout_ms / 1000.0,
xonxoff=config.flow_control == FlowControl.SOFTWARE,
rtscts=config.flow_control == FlowControl.HARDWARE,
)
return serial_obj
except serial.SerialException as e:
error_msg = str(e).lower()
if "no such file" in error_msg or "not found" in error_msg:
raise PortNotFoundError(port) from e
if "permission" in error_msg or "access" in error_msg:
raise PermissionDeniedError(port) from e
if "busy" in error_msg or "in use" in error_msg:
raise PortBusyError(port) from e
raise PortOpenFailedError(port, str(e)) from e
def list_ports(self) -> list[PortInfo]:
"""列出所有可用串口
返回系统中所有可用的串口,已过滤黑名单中的串口。
Returns:
串口信息列表
"""
blacklist = get_blacklist_manager()
ports: list[PortInfo] = []
for port_info in serial.tools.list_ports.comports():
if blacklist.is_blacklisted(port_info.device):
logger.debug("串口在黑名单中,已过滤:%s", port_info.device)
continue
ports.append(
PortInfo(
port=port_info.device,
description=port_info.description or "",
hwid=port_info.hwid or "",
)
)
return ports
def open_port(
self,
port: str,
baudrate: int | None = None,
bytesize: int | None = None,
parity: str | None = None,
stopbits: float | None = None,
flow_control: str | None = None,
read_timeout_ms: int | None = None,
write_timeout_ms: int | None = None,
auto_reconnect: bool | None = None,
) -> PortStatus:
"""打开串口
Args:
port: 串口路径
baudrate: 波特率(None 时使用配置默认值)
bytesize: 数据位(None 时使用配置默认值)
parity: 校验位(None 时使用配置默认值)
stopbits: 停止位(None 时使用配置默认值)
flow_control: 流控制(None 时使用配置默认值)
read_timeout_ms: 读取超时(毫秒,None 时使用配置默认值)
write_timeout_ms: 写入超时(毫秒,None 时使用配置默认值)
auto_reconnect: 是否启用自动重连(None 时使用配置默认值)
Returns:
串口状态
Raises:
PortBlacklistedError: 串口在黑名单中
PortNotFoundError: 串口不存在
PortBusyError: 串口被占用
InvalidParamError: 参数无效
"""
# 检查黑名单
blacklist = get_blacklist_manager()
if blacklist.is_blacklisted(port):
raise PortBlacklistedError(port)
# 获取全局配置作为默认值
global_config = get_config_manager().config
# 使用用户参数或配置默认值
final_baudrate = baudrate if baudrate is not None else global_config.baudrate
final_bytesize = bytesize if bytesize is not None else global_config.bytesize
final_parity = parity if parity is not None else global_config.parity
final_stopbits = stopbits if stopbits is not None else global_config.stopbits
# 流控:显式传入时优先使用传入值
if flow_control is None:
final_flow_control = (
"software" if global_config.xonxoff else
"hardware" if global_config.rtscts else
"none"
)
else:
final_flow_control = flow_control
cc = global_config
rt = read_timeout_ms
wt = write_timeout_ms
ar = auto_reconnect
final_read_timeout = cc.read_timeout if rt is None else rt
final_write_timeout = cc.write_timeout if wt is None else wt
final_auto_reconnect = cc.auto_reconnect if ar is None else ar
# 验证参数
config = self._validate_and_create_config(
final_baudrate,
final_bytesize,
final_parity,
final_stopbits,
final_flow_control,
final_read_timeout,
final_write_timeout,
)
with self._lock:
# 检查是否已打开(幂等操作)
if port in self._ports:
managed = self._ports[port]
logger.info("串口已打开,返回当前状态:%s", port)
return PortStatus(
port=port,
is_open=True,
config=managed.config,
connected=managed.is_connected,
reconnecting=managed.reconnecting,
)
# 打开串口
serial_obj = self._create_serial(port, config)
managed = ManagedPort(port, serial_obj, config, final_auto_reconnect)
self._ports[port] = managed
logger.info("串口打开成功:%s", port)
return PortStatus(
port=port,
is_open=True,
config=config,
connected=managed.is_connected,
reconnecting=False,
)
def _validate_and_create_config(
self,
baudrate: int,
bytesize: int,
parity: str,
stopbits: float,
flow_control: str,
read_timeout_ms: int,
write_timeout_ms: int,
) -> SerialConfig:
"""验证参数并创建配置对象
Raises:
InvalidParamError: 参数无效
"""
# 验证波特率
if baudrate not in SUPPORTED_BAUDRATES:
raise InvalidParamError(
"baudrate", baudrate, f"支持的值:{SUPPORTED_BAUDRATES}"
)
# 验证数据位
if bytesize not in SUPPORTED_BYTESIZES:
raise InvalidParamError(
"bytesize", bytesize, f"支持的值:{SUPPORTED_BYTESIZES}"
)
# 验证校验位
try:
parity_enum = Parity(parity)
except ValueError:
raise InvalidParamError(
"parity", parity, f"支持的值:{[p.value for p in Parity]}"
)
# 验证停止位
try:
stopbits_enum = StopBits(stopbits)
except ValueError:
raise InvalidParamError(
"stopbits", stopbits, f"支持的值:{[s.value for s in StopBits]}"
)
# 验证流控制
try:
flow_enum = FlowControl(flow_control)
except ValueError:
valid_values = [f.value for f in FlowControl]
raise InvalidParamError(
"flow_control", flow_control, f"支持的值:{valid_values}"
)
# 验证超时
if read_timeout_ms < 0 or read_timeout_ms > 60000:
raise InvalidParamError("read_timeout_ms", read_timeout_ms, "范围:0-60000")
if write_timeout_ms < 0 or write_timeout_ms > 60000:
raise InvalidParamError(
"write_timeout_ms", write_timeout_ms, "范围:0-60000"
)
return SerialConfig(
baudrate=baudrate,
bytesize=bytesize,
parity=parity_enum,
stopbits=stopbits_enum,
flow_control=flow_enum,
read_timeout_ms=read_timeout_ms,
write_timeout_ms=write_timeout_ms,
)
def close_port(self, port: str) -> dict[str, Any]:
"""关闭串口
Args:
port: 串口路径
Returns:
操作结果
Raises:
PortClosedError: 串口未打开
"""
with self._lock:
if port not in self._ports:
raise PortClosedError(port)
managed = self._ports.pop(port)
try:
managed.serial.close()
except Exception as e:
logger.warning("关闭串口时发生异常:%s - %s", port, e)
logger.info("串口关闭成功:%s", port)
return {"success": True, "port": port}
def set_config(
self,
port: str,
baudrate: int | None = None,
bytesize: int | None = None,
parity: str | None = None,
stopbits: float | None = None,
flow_control: str | None = None,
read_timeout_ms: int | None = None,
write_timeout_ms: int | None = None,
) -> PortStatus:
"""修改串口配置(热更新)
Args:
port: 串口路径
baudrate: 波特率(可选)
bytesize: 数据位(可选)
parity: 校验位(可选)
stopbits: 停止位(可选)
flow_control: 流控制(可选)
read_timeout_ms: 读取超时(可选)
write_timeout_ms: 写入超时(可选)
Returns:
更新后的串口状态
Raises:
PortClosedError: 串口未打开
InvalidParamError: 参数无效
"""
with self._lock:
if port not in self._ports:
raise PortClosedError(port)
managed = self._ports[port]
current_config = managed.config
# 构建新配置,未指定的参数使用当前值
cc = current_config # 简化引用
new_baudrate = baudrate if baudrate is not None else cc.baudrate
new_bytesize = bytesize if bytesize is not None else cc.bytesize
new_parity = parity if parity is not None else cc.parity.value
new_stopbits = stopbits if stopbits is not None else cc.stopbits.value
new_flow = (
flow_control if flow_control is not None else cc.flow_control.value
)
new_read_timeout = (
read_timeout_ms if read_timeout_ms is not None else cc.read_timeout_ms
)
new_write_timeout = (
write_timeout_ms
if write_timeout_ms is not None
else cc.write_timeout_ms
)
new_config = self._validate_and_create_config(
new_baudrate,
new_bytesize,
new_parity,
new_stopbits,
new_flow,
new_read_timeout,
new_write_timeout,
)
# 应用配置(热更新)
self._apply_config(managed, new_config)
managed.config = new_config
logger.info("串口配置更新成功:%s", port)
return PortStatus(
port=port,
is_open=True,
config=new_config,
connected=managed.is_connected,
reconnecting=managed.reconnecting,
)
def _apply_config(self, managed: ManagedPort, config: SerialConfig) -> None:
"""应用配置到已打开的串口
Args:
managed: 管理的串口对象
config: 新配置
"""
ser = managed.serial
# 转换校验位
parity_map = {
Parity.NONE: serial.PARITY_NONE,
Parity.EVEN: serial.PARITY_EVEN,
Parity.ODD: serial.PARITY_ODD,
Parity.MARK: serial.PARITY_MARK,
Parity.SPACE: serial.PARITY_SPACE,
}
# 转换停止位
stopbits_map = {
StopBits.ONE: serial.STOPBITS_ONE,
StopBits.ONE_POINT_FIVE: serial.STOPBITS_ONE_POINT_FIVE,
StopBits.TWO: serial.STOPBITS_TWO,
}
# 使用 apply_settings 进行热更新
ser.apply_settings(
{
"baudrate": config.baudrate,
"bytesize": config.bytesize,
"parity": parity_map[config.parity],
"stopbits": stopbits_map[config.stopbits],
"xonxoff": config.flow_control == FlowControl.SOFTWARE,
"rtscts": config.flow_control == FlowControl.HARDWARE,
}
)
# 更新超时设置
ser.timeout = config.read_timeout_ms / 1000.0
ser.write_timeout = config.write_timeout_ms / 1000.0
def get_status(self, port: str) -> PortStatus:
"""获取串口状态
Args:
port: 串口路径
Returns:
串口状态
Raises:
PortClosedError: 串口未打开
"""
with self._lock:
if port not in self._ports:
raise PortClosedError(port)
managed = self._ports[port]
return PortStatus(
port=port,
is_open=True,
config=managed.config,
connected=managed.is_connected,
reconnecting=managed.reconnecting,
)
def get_all_status(self) -> list[PortStatus]:
"""获取所有已打开串口的状态
Returns:
串口状态列表
"""
with self._lock:
return [
PortStatus(
port=port,
is_open=True,
config=managed.config,
connected=managed.is_connected,
reconnecting=managed.reconnecting,
)
for port, managed in self._ports.items()
]
def send_data(self, port: str, data: bytes) -> int:
"""发送原始字节数据
Args:
port: 串口路径
data: 要发送的字节数据
Returns:
发送的字节数
Raises:
PortClosedError: 串口未打开
WriteFailedError: 写入失败
"""
with self._lock:
if port not in self._ports:
raise PortClosedError(port)
managed = self._ports[port]
try:
result = managed.serial.write(data)
bytes_written: int = result if result is not None else 0
logger.debug("发送数据到串口 %s:%d 字节", port, bytes_written)
return bytes_written
except SerialException as e:
logger.error("串口写入失败:%s - %s", port, e)
raise WriteFailedError(port, str(e)) from e
def read_data(
self, port: str, size: int | None = None, timeout_ms: int | None = None
) -> bytes:
"""读取原始字节数据
Args:
port: 串口路径
size: 读取字节数,None 表示读取所有可用数据
timeout_ms: 读取超时(毫秒),None 使用串口配置的超时
Returns:
读取的字节数据
Raises:
PortClosedError: 串口未打开
"""
with self._lock:
if port not in self._ports:
raise PortClosedError(port)
managed = self._ports[port]
ser = managed.serial
# 保存原始超时设置
original_timeout = ser.timeout
try:
# 如果指定了超时,临时修改
if timeout_ms is not None:
ser.timeout = timeout_ms / 1000.0
data: bytes
if size is not None:
# 读取指定字节数
data = ser.read(size)
else:
# 读取所有可用数据
available: int = ser.in_waiting
if available > 0:
data = ser.read(available)
else:
# 没有可用数据,尝试读取一次(会等待超时)
data = ser.read(1)
if data:
# 如果读到数据,继续读取剩余的
remaining: int = ser.in_waiting
if remaining > 0:
data += ser.read(remaining)
logger.debug("从串口 %s 读取数据:%d 字节", port, len(data))
return data
except SerialException as e:
logger.error("串口读取失败:%s - %s", port, e)
raise PortClosedError(port) from e
finally:
# 恢复原始超时设置
if timeout_ms is not None:
ser.timeout = original_timeout
def shutdown(self) -> None:
"""关闭管理器
停止重连线程并关闭所有串口。
"""
self._running = False
# 等待重连线程结束
if self._reconnect_thread and self._reconnect_thread.is_alive():
self._reconnect_thread.join(timeout=5.0)
# 关闭所有串口
with self._lock:
for port, managed in list(self._ports.items()):
try:
managed.serial.close()
logger.debug("关闭串口:%s", port)
except Exception as e:
logger.warning("关闭串口失败:%s - %s", port, e)
self._ports.clear()
logger.info("串口管理器已关闭")
# 全局串口管理器实例
_serial_manager: SerialManager | None = None
def get_serial_manager() -> SerialManager:
"""获取串口管理器单例
Returns:
串口管理器实例
"""
global _serial_manager
if _serial_manager is None:
_serial_manager = SerialManager()
return _serial_manager