# -*- coding: utf-8 -*-
"""超时工具模块
提供通用的超时控制功能,防止操作卡死。
"""
import asyncio
import concurrent.futures
import functools
from typing import TypeVar, Callable, Any, Optional
# 默认超时配置
DEFAULT_TIMEOUT = 30 # 秒
T = TypeVar('T')
async def run_with_timeout(
func: Callable[..., T],
*args,
timeout: int = DEFAULT_TIMEOUT,
timeout_message: str = "操作超时",
**kwargs
) -> T:
"""在线程池中执行同步函数,并添加超时控制
将同步的阻塞操作放入线程池执行,避免阻塞事件循环,
同时添加超时控制防止操作无限等待。
Args:
func: 要执行的同步函数
*args: 函数的位置参数
timeout: 超时时间(秒)
timeout_message: 超时时的错误消息
**kwargs: 函数的关键字参数
Returns:
函数的返回值
Raises:
TimeoutError: 操作超时
Exception: 函数执行过程中的其他异常
"""
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
try:
# 使用functools.partial处理带参数的函数
if args or kwargs:
partial_func = functools.partial(func, *args, **kwargs)
future = loop.run_in_executor(executor, partial_func)
else:
future = loop.run_in_executor(executor, func)
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"{timeout_message}({timeout}秒)")
def timeout_decorator(timeout: int = DEFAULT_TIMEOUT, message: str = "操作超时"):
"""超时装饰器
将同步方法转换为带超时控制的异步方法。
Args:
timeout: 超时时间(秒)
message: 超时错误消息
Returns:
装饰器函数
Example:
@timeout_decorator(timeout=30, message="截图超时")
def capture_screenshot(self):
...
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> T:
return await run_with_timeout(
func, *args,
timeout=timeout,
timeout_message=message,
**kwargs
)
return wrapper
return decorator
class TimeoutContext:
"""超时上下文管理器
用于在特定代码块中添加超时控制。
Example:
async with TimeoutContext(timeout=30, message="操作超时") as ctx:
result = await ctx.run(some_sync_function, arg1, arg2)
"""
def __init__(self, timeout: int = DEFAULT_TIMEOUT, message: str = "操作超时"):
self.timeout = timeout
self.message = message
self.executor = None
async def __aenter__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.executor:
self.executor.shutdown(wait=False)
return False
async def run(self, func: Callable[..., T], *args, **kwargs) -> T:
"""在超时控制下执行函数"""
return await run_with_timeout(
func, *args,
timeout=self.timeout,
timeout_message=self.message,
**kwargs
)
class SafeOperationExecutor:
"""安全操作执行器
提供统一的超时控制和错误处理机制。
"""
def __init__(self, default_timeout: int = DEFAULT_TIMEOUT):
self.default_timeout = default_timeout
async def execute(
self,
func: Callable[..., T],
*args,
timeout: Optional[int] = None,
error_handler: Optional[Callable[[Exception], T]] = None,
**kwargs
) -> T:
"""安全执行操作
Args:
func: 要执行的函数
*args: 函数参数
timeout: 超时时间,None则使用默认值
error_handler: 错误处理函数,接收异常并返回默认值
**kwargs: 函数关键字参数
Returns:
函数返回值或错误处理函数的返回值
"""
actual_timeout = timeout if timeout is not None else self.default_timeout
try:
return await run_with_timeout(
func, *args,
timeout=actual_timeout,
**kwargs
)
except TimeoutError as e:
if error_handler:
return error_handler(e)
raise
except Exception as e:
if error_handler:
return error_handler(e)
raise
# 全局安全执行器实例
safe_executor = SafeOperationExecutor()