"""
用户体验服务模块
为其他服务提供统一的用户体验增强功能
"""
import asyncio
import time
from typing import Dict, Any, Optional, Callable, List, Union
from dataclasses import dataclass
import traceback
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import (
InteractionManager,
TaskStatus,
TaskInfo,
ProgressContext,
handle_exception,
info,
success,
warning,
error,
debug
)
@dataclass
class OperationResult:
"""操作结果"""
success: bool
data: Any = None
error: Optional[str] = None
execution_time: float = 0.0
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class UserExperienceService:
"""用户体验服务"""
def __init__(self, interaction: Optional[InteractionManager] = None):
self.interaction = interaction or InteractionManager()
self.operation_history: List[OperationResult] = []
def execute_with_progress(
self,
task_id: str,
task_name: str,
operation: Callable,
*args,
progress_callback: Optional[Callable[[float], None]] = None,
**kwargs
) -> OperationResult:
"""执行带有进度跟踪的操作"""
start_time = time.time()
with ProgressContext(task_id, task_name, show_progress=True) as progress:
try:
# 如果操作需要进度回调,注入它
if asyncio.iscoroutinefunction(operation):
# 异步操作
if progress_callback:
# 创建包装函数,在进度回调中更新进度
async def wrapped_progress_callback(p):
progress.update(p, "")
if progress_callback:
progress_callback(p)
kwargs['progress_callback'] = wrapped_progress_callback
result = asyncio.run(operation(*args, **kwargs))
else:
result = asyncio.run(operation(*args, **kwargs))
else:
# 同步操作
if progress_callback:
# 创建包装函数,在进度回调中更新进度
def wrapped_progress_callback(p):
progress.update(p, "")
if progress_callback:
progress_callback(p)
kwargs['progress_callback'] = wrapped_progress_callback
result = operation(*args, **kwargs)
else:
result = operation(*args, **kwargs)
execution_time = time.time() - start_time
operation_result = OperationResult(
success=True,
data=result,
execution_time=execution_time,
metadata={"task_id": task_id, "task_name": task_name}
)
self.operation_history.append(operation_result)
self.interaction.success(
"操作完成",
f"{task_name} 已完成 (用时: {execution_time:.2f}秒)"
)
return operation_result
except (RuntimeError, ValueError) as e:
execution_time = time.time() - start_time
operation_result = OperationResult(
success=False,
error=str(e),
execution_time=execution_time,
metadata={"task_id": task_id, "task_name": task_name}
)
self.operation_history.append(operation_result)
# 使用异常处理器处理错误
handle_exception(e, f"执行 {task_name}")
return operation_result
def execute_batch_operations(
self,
operations: List[Dict[str, Any]],
show_progress: bool = True,
continue_on_error: bool = True
) -> List[OperationResult]:
"""批量执行操作"""
batch_id = f"batch_{int(time.time())}"
with ProgressContext(
batch_id,
f"批量操作 ({len(operations)} 项)",
show_progress=show_progress
) as batch_progress:
results = []
for i, op in enumerate(operations):
task_id = f"{batch_id}_{i}"
task_name = op.get("name", f"操作 {i+1}")
operation = op["operation"]
args = op.get("args", [])
kwargs = op.get("kwargs", {})
# 更新批量进度
batch_progress.update((i / len(operations)) * 100, f"执行 {task_name}")
# 执行单个操作
result = self.execute_with_progress(
task_id,
task_name,
operation,
*args,
show_progress=False, # 不显示单个操作的进度条
**kwargs
)
results.append(result)
# 如果出错且不继续执行,则中断
if not result.success and not continue_on_error:
self.interaction.warning("批量操作中断", f"在 {task_name} 处出错,停止执行")
break
# 完成批量进度
batch_progress.update(100, "批量操作完成")
# 汇总结果
success_count = sum(1 for r in results if r.success)
total_time = sum(r.execution_time for r in results)
self.interaction.success(
"批量操作完成",
f"成功 {success_count}/{len(results)} 项 (总用时: {total_time:.2f}秒)"
)
return results
def validate_operation(
self,
operation_name: str,
validator: Callable[[], bool],
error_message: str = "验证失败"
) -> bool:
"""验证操作"""
try:
if validator():
self.interaction.info("验证通过", f"{operation_name} 验证成功")
return True
else:
self.interaction.error("验证失败", f"{operation_name}: {error_message}")
return False
except (RuntimeError, ValueError) as e:
handle_exception(e, f"验证 {operation_name}")
return False
def confirm_operation(
self,
operation_name: str,
description: str = "",
details: Dict[str, Any] = None,
default: bool = False
) -> bool:
"""确认操作"""
message = f"是否执行 {operation_name}?"
if description:
message += f"\n{description}"
if details:
message += "\n详细信息:"
for key, value in details.items():
message += f"\n {key}: {value}"
return self.interaction.confirm(operation_name, message, default)
def get_operation_summary(self) -> Dict[str, Any]:
"""获取操作摘要"""
if not self.operation_history:
return {"total_operations": 0, "message": "暂无操作记录"}
total_operations = len(self.operation_history)
successful_operations = sum(1 for op in self.operation_history if op.success)
failed_operations = total_operations - successful_operations
total_time = sum(op.execution_time for op in self.operation_history)
# 最近操作
recent_operations = self.operation_history[-5:]
recent_summary = []
for op in recent_operations:
recent_summary.append({
"name": op.metadata.get("task_name", "未知操作"),
"success": op.success,
"execution_time": op.execution_time,
"error": op.error
})
return {
"total_operations": total_operations,
"successful_operations": successful_operations,
"failed_operations": failed_operations,
"success_rate": (successful_operations / total_operations) * 100 if total_operations > 0 else 0,
"total_execution_time": total_time,
"average_execution_time": total_time / total_operations if total_operations > 0 else 0,
"recent_operations": recent_summary
}
def clear_history(self) -> None:
"""清空操作历史"""
self.operation_history.clear()
self.interaction.info("历史清空", "操作历史已清空")
def retry_operation(
self,
result: OperationResult,
max_retries: int = 3,
retry_delay: float = 1.0
) -> OperationResult:
"""重试失败的操作"""
if result.success:
return result
task_id = result.metadata.get("task_id", f"retry_{int(time.time())}")
task_name = result.metadata.get("task_name", "重试操作")
self.interaction.info("重试操作", f"准备重试 {task_name}")
for attempt in range(max_retries):
try:
# 这里我们无法重试原始操作,因为我们没有保存函数引用
# 在实际使用中,应该保存原始操作的引用以便重试
self.interaction.warning("重试限制", "当前实现不支持自动重试,需要手动执行")
return OperationResult(
success=False,
error="重试功能需要保存原始操作引用",
metadata={"attempt": attempt + 1, "original_error": result.error}
)
except (RuntimeError, ValueError) as e:
if attempt == max_retries - 1:
self.interaction.error("重试失败", f"重试 {max_retries} 次后仍然失败: {e}")
return OperationResult(success=False, error=str(e))
# 等待后重试
time.sleep(retry_delay)
self.interaction.info("重试中", f"第 {attempt + 2} 次尝试...")
return result
# 全局用户体验服务实例
global_ux_service = UserExperienceService()
def execute_with_progress(
task_id: str,
task_name: str,
operation: Callable,
*args,
**kwargs
) -> OperationResult:
"""全局便捷函数"""
return global_ux_service.execute_with_progress(task_id, task_name, operation, *args, **kwargs)
def execute_batch_operations(
operations: List[Dict[str, Any]],
show_progress: bool = True,
continue_on_error: bool = True
) -> List[OperationResult]:
"""批量操作全局便捷函数"""
return global_ux_service.execute_batch_operations(operations, show_progress, continue_on_error)
def confirm_operation(
operation_name: str,
description: str = "",
details: Dict[str, Any] = None,
default: bool = False
) -> bool:
"""确认操作全局便捷函数"""
return global_ux_service.confirm_operation(operation_name, description, details, default)
def get_operation_summary() -> Dict[str, Any]:
"""获取操作摘要全局便捷函数"""
return global_ux_service.get_operation_summary()