Skip to main content
Glama
user_experience_service.py11.9 kB
""" 用户体验服务模块 为其他服务提供统一的用户体验增强功能 """ import asyncio import time from typing import Dict, Any, Optional, Callable, List, Union from dataclasses import dataclass import traceback import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils import ( InteractionManager, TaskStatus, TaskInfo, ProgressContext, handle_exception, info, success, warning, error, debug ) @dataclass class OperationResult: """操作结果""" success: bool data: Any = None error: Optional[str] = None execution_time: float = 0.0 metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class UserExperienceService: """用户体验服务""" def __init__(self, interaction: Optional[InteractionManager] = None): self.interaction = interaction or InteractionManager() self.operation_history: List[OperationResult] = [] def execute_with_progress( self, task_id: str, task_name: str, operation: Callable, *args, progress_callback: Optional[Callable[[float], None]] = None, **kwargs ) -> OperationResult: """执行带有进度跟踪的操作""" start_time = time.time() with ProgressContext(task_id, task_name, show_progress=True) as progress: try: # 如果操作需要进度回调,注入它 if asyncio.iscoroutinefunction(operation): # 异步操作 if progress_callback: # 创建包装函数,在进度回调中更新进度 async def wrapped_progress_callback(p): progress.update(p, "") if progress_callback: progress_callback(p) kwargs['progress_callback'] = wrapped_progress_callback result = asyncio.run(operation(*args, **kwargs)) else: result = asyncio.run(operation(*args, **kwargs)) else: # 同步操作 if progress_callback: # 创建包装函数,在进度回调中更新进度 def wrapped_progress_callback(p): progress.update(p, "") if progress_callback: progress_callback(p) kwargs['progress_callback'] = wrapped_progress_callback result = operation(*args, **kwargs) else: result = operation(*args, **kwargs) execution_time = time.time() - start_time operation_result = OperationResult( success=True, data=result, execution_time=execution_time, metadata={"task_id": task_id, "task_name": task_name} ) self.operation_history.append(operation_result) self.interaction.success( "操作完成", f"{task_name} 已完成 (用时: {execution_time:.2f}秒)" ) return operation_result except (RuntimeError, ValueError) as e: execution_time = time.time() - start_time operation_result = OperationResult( success=False, error=str(e), execution_time=execution_time, metadata={"task_id": task_id, "task_name": task_name} ) self.operation_history.append(operation_result) # 使用异常处理器处理错误 handle_exception(e, f"执行 {task_name}") return operation_result def execute_batch_operations( self, operations: List[Dict[str, Any]], show_progress: bool = True, continue_on_error: bool = True ) -> List[OperationResult]: """批量执行操作""" batch_id = f"batch_{int(time.time())}" with ProgressContext( batch_id, f"批量操作 ({len(operations)} 项)", show_progress=show_progress ) as batch_progress: results = [] for i, op in enumerate(operations): task_id = f"{batch_id}_{i}" task_name = op.get("name", f"操作 {i+1}") operation = op["operation"] args = op.get("args", []) kwargs = op.get("kwargs", {}) # 更新批量进度 batch_progress.update((i / len(operations)) * 100, f"执行 {task_name}") # 执行单个操作 result = self.execute_with_progress( task_id, task_name, operation, *args, show_progress=False, # 不显示单个操作的进度条 **kwargs ) results.append(result) # 如果出错且不继续执行,则中断 if not result.success and not continue_on_error: self.interaction.warning("批量操作中断", f"在 {task_name} 处出错,停止执行") break # 完成批量进度 batch_progress.update(100, "批量操作完成") # 汇总结果 success_count = sum(1 for r in results if r.success) total_time = sum(r.execution_time for r in results) self.interaction.success( "批量操作完成", f"成功 {success_count}/{len(results)} 项 (总用时: {total_time:.2f}秒)" ) return results def validate_operation( self, operation_name: str, validator: Callable[[], bool], error_message: str = "验证失败" ) -> bool: """验证操作""" try: if validator(): self.interaction.info("验证通过", f"{operation_name} 验证成功") return True else: self.interaction.error("验证失败", f"{operation_name}: {error_message}") return False except (RuntimeError, ValueError) as e: handle_exception(e, f"验证 {operation_name}") return False def confirm_operation( self, operation_name: str, description: str = "", details: Dict[str, Any] = None, default: bool = False ) -> bool: """确认操作""" message = f"是否执行 {operation_name}?" if description: message += f"\n{description}" if details: message += "\n详细信息:" for key, value in details.items(): message += f"\n {key}: {value}" return self.interaction.confirm(operation_name, message, default) def get_operation_summary(self) -> Dict[str, Any]: """获取操作摘要""" if not self.operation_history: return {"total_operations": 0, "message": "暂无操作记录"} total_operations = len(self.operation_history) successful_operations = sum(1 for op in self.operation_history if op.success) failed_operations = total_operations - successful_operations total_time = sum(op.execution_time for op in self.operation_history) # 最近操作 recent_operations = self.operation_history[-5:] recent_summary = [] for op in recent_operations: recent_summary.append({ "name": op.metadata.get("task_name", "未知操作"), "success": op.success, "execution_time": op.execution_time, "error": op.error }) return { "total_operations": total_operations, "successful_operations": successful_operations, "failed_operations": failed_operations, "success_rate": (successful_operations / total_operations) * 100 if total_operations > 0 else 0, "total_execution_time": total_time, "average_execution_time": total_time / total_operations if total_operations > 0 else 0, "recent_operations": recent_summary } def clear_history(self) -> None: """清空操作历史""" self.operation_history.clear() self.interaction.info("历史清空", "操作历史已清空") def retry_operation( self, result: OperationResult, max_retries: int = 3, retry_delay: float = 1.0 ) -> OperationResult: """重试失败的操作""" if result.success: return result task_id = result.metadata.get("task_id", f"retry_{int(time.time())}") task_name = result.metadata.get("task_name", "重试操作") self.interaction.info("重试操作", f"准备重试 {task_name}") for attempt in range(max_retries): try: # 这里我们无法重试原始操作,因为我们没有保存函数引用 # 在实际使用中,应该保存原始操作的引用以便重试 self.interaction.warning("重试限制", "当前实现不支持自动重试,需要手动执行") return OperationResult( success=False, error="重试功能需要保存原始操作引用", metadata={"attempt": attempt + 1, "original_error": result.error} ) except (RuntimeError, ValueError) as e: if attempt == max_retries - 1: self.interaction.error("重试失败", f"重试 {max_retries} 次后仍然失败: {e}") return OperationResult(success=False, error=str(e)) # 等待后重试 time.sleep(retry_delay) self.interaction.info("重试中", f"第 {attempt + 2} 次尝试...") return result # 全局用户体验服务实例 global_ux_service = UserExperienceService() def execute_with_progress( task_id: str, task_name: str, operation: Callable, *args, **kwargs ) -> OperationResult: """全局便捷函数""" return global_ux_service.execute_with_progress(task_id, task_name, operation, *args, **kwargs) def execute_batch_operations( operations: List[Dict[str, Any]], show_progress: bool = True, continue_on_error: bool = True ) -> List[OperationResult]: """批量操作全局便捷函数""" return global_ux_service.execute_batch_operations(operations, show_progress, continue_on_error) def confirm_operation( operation_name: str, description: str = "", details: Dict[str, Any] = None, default: bool = False ) -> bool: """确认操作全局便捷函数""" return global_ux_service.confirm_operation(operation_name, description, details, default) def get_operation_summary() -> Dict[str, Any]: """获取操作摘要全局便捷函数""" return global_ux_service.get_operation_summary()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server