# -*- coding: utf-8 -*-
"""重试和稳定性工具模块
提供自动重试、健康检查等稳定性功能。
"""
import asyncio
import functools
import time
from typing import TypeVar, Callable, Any, Optional, List, Type
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
# 默认配置
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_DELAY = 1.0 # 秒
DEFAULT_BACKOFF_FACTOR = 2.0
class RetryError(Exception):
"""重试失败异常"""
def __init__(self, message: str, attempts: int, last_error: Exception):
self.message = message
self.attempts = attempts
self.last_error = last_error
super().__init__(f"{message} (尝试{attempts}次后失败: {last_error})")
def retry_sync(
max_retries: int = DEFAULT_MAX_RETRIES,
delay: float = DEFAULT_RETRY_DELAY,
backoff: float = DEFAULT_BACKOFF_FACTOR,
exceptions: tuple = (Exception,),
on_retry: Optional[Callable[[int, Exception], None]] = None
):
"""同步函数重试装饰器
Args:
max_retries: 最大重试次数
delay: 初始延迟时间(秒)
backoff: 退避因子,每次重试延迟乘以此值
exceptions: 需要重试的异常类型
on_retry: 重试时的回调函数
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
current_delay = delay
for attempt in range(1, max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries:
if on_retry:
on_retry(attempt, e)
logger.warning(f"操作失败,{current_delay:.1f}秒后重试 ({attempt}/{max_retries}): {e}")
time.sleep(current_delay)
current_delay *= backoff
else:
logger.error(f"操作失败,已达最大重试次数: {e}")
raise RetryError(
f"操作在{max_retries}次尝试后仍然失败",
max_retries,
last_exception
)
return wrapper
return decorator
async def retry_async(
func: Callable[..., T],
*args,
max_retries: int = DEFAULT_MAX_RETRIES,
delay: float = DEFAULT_RETRY_DELAY,
backoff: float = DEFAULT_BACKOFF_FACTOR,
exceptions: tuple = (Exception,),
**kwargs
) -> T:
"""异步函数重试
Args:
func: 要执行的异步函数
max_retries: 最大重试次数
delay: 初始延迟时间
backoff: 退避因子
exceptions: 需要重试的异常类型
"""
last_exception = None
current_delay = delay
for attempt in range(1, max_retries + 1):
try:
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries:
logger.warning(f"操作失败,{current_delay:.1f}秒后重试 ({attempt}/{max_retries}): {e}")
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
logger.error(f"操作失败,已达最大重试次数: {e}")
raise RetryError(
f"操作在{max_retries}次尝试后仍然失败",
max_retries,
last_exception
)
class OperationResult:
"""操作结果封装类
提供统一的操作结果格式,包含成功/失败状态、数据、错误信息等。
"""
def __init__(
self,
success: bool,
data: Any = None,
error: Optional[str] = None,
duration_ms: Optional[float] = None,
retries: int = 0,
suggestions: Optional[List[str]] = None
):
self.success = success
self.data = data
self.error = error
self.duration_ms = duration_ms
self.retries = retries
self.suggestions = suggestions or []
def to_dict(self) -> dict:
"""转换为字典"""
result = {
"success": self.success,
"data": self.data
}
if self.error:
result["error"] = self.error
if self.duration_ms is not None:
result["duration_ms"] = round(self.duration_ms, 2)
if self.retries > 0:
result["retries"] = self.retries
if self.suggestions:
result["suggestions"] = self.suggestions
return result
def to_message(self) -> str:
"""转换为用户友好的消息"""
if self.success:
msg = f"✅ 操作成功"
if self.data:
msg += f": {self.data}"
if self.duration_ms:
msg += f" ({self.duration_ms:.0f}ms)"
return msg
else:
msg = f"❌ 操作失败: {self.error}"
if self.suggestions:
msg += f"\n💡 建议: " + "; ".join(self.suggestions)
return msg
@classmethod
def ok(cls, data: Any = None, duration_ms: float = None) -> 'OperationResult':
"""创建成功结果"""
return cls(success=True, data=data, duration_ms=duration_ms)
@classmethod
def fail(cls, error: str, suggestions: List[str] = None) -> 'OperationResult':
"""创建失败结果"""
return cls(success=False, error=error, suggestions=suggestions)
class HealthChecker:
"""健康检查器
用于检查浏览器连接状态和服务可用性。
"""
def __init__(self, browser_manager):
self.browser_manager = browser_manager
self._last_check_time = 0
self._last_check_result = False
self._check_interval = 5 # 秒
def is_healthy(self, force_check: bool = False) -> bool:
"""检查服务是否健康
Args:
force_check: 是否强制检查,忽略缓存
"""
current_time = time.time()
# 使用缓存结果
if not force_check and (current_time - self._last_check_time) < self._check_interval:
return self._last_check_result
# 执行检查
try:
if not self.browser_manager:
self._last_check_result = False
elif not self.browser_manager.browser:
self._last_check_result = False
elif not self.browser_manager.current_tab:
self._last_check_result = False
else:
# 尝试获取标签页标题来验证连接
_ = self.browser_manager.current_tab.title
self._last_check_result = True
except Exception:
self._last_check_result = False
self._last_check_time = current_time
return self._last_check_result
def get_status(self) -> dict:
"""获取详细状态信息"""
is_healthy = self.is_healthy(force_check=True)
status = {
"healthy": is_healthy,
"browser_connected": False,
"tab_available": False,
"current_url": None,
"current_title": None
}
if self.browser_manager:
status["browser_connected"] = self.browser_manager.browser is not None
status["tab_available"] = self.browser_manager.current_tab is not None
if self.browser_manager.current_tab:
try:
status["current_url"] = self.browser_manager.current_tab.url
status["current_title"] = self.browser_manager.current_tab.title
except:
pass
return status
def measure_time(func: Callable[..., T]) -> Callable[..., T]:
"""测量函数执行时间的装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
start_time = time.perf_counter()
result = func(*args, **kwargs)
duration = (time.perf_counter() - start_time) * 1000 # 转换为毫秒
logger.debug(f"{func.__name__} 执行耗时: {duration:.2f}ms")
return result
return wrapper
async def measure_time_async(func: Callable[..., T], *args, **kwargs) -> tuple:
"""测量异步函数执行时间
Returns:
tuple: (结果, 耗时毫秒)
"""
start_time = time.perf_counter()
result = await func(*args, **kwargs)
duration = (time.perf_counter() - start_time) * 1000
return result, duration