We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
用户体验服务模块
为其他服务提供统一的用户体验增强功能
"""
import asyncio
import time
from typing import Dict, Any, Optional, Callable, List, Union
from dataclasses import dataclass
import traceback
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import (
InteractionManager,
TaskStatus,
TaskInfo,
ProgressContext,
handle_exception,
info,
success,
warning,
error,
debug
)
@dataclass
class OperationResult:
"""操作结果"""
success: bool
data: Any = None
error: Optional[str] = None
execution_time: float = 0.0
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class UserExperienceService:
"""用户体验服务"""
def __init__(self, interaction: Optional[InteractionManager] = None):
self.interaction = interaction or InteractionManager()
self.operation_history: List[OperationResult] = []
def execute_with_progress(
self,
task_id: str,
task_name: str,
operation: Callable,
*args,
progress_callback: Optional[Callable[[float], None]] = None,
**kwargs
) -> OperationResult:
"""执行带有进度跟踪的操作"""
start_time = time.time()
with ProgressContext(task_id, task_name, show_progress=True) as progress:
try:
# 如果操作需要进度回调,注入它
if asyncio.iscoroutinefunction(operation):
# 异步操作
if progress_callback:
# 创建包装函数,在进度回调中更新进度
async def wrapped_progress_callback(p):
progress.update(p, "")
if progress_callback:
progress_callback(p)
kwargs['progress_callback'] = wrapped_progress_callback
result = asyncio.run(operation(*args, **kwargs))
else:
result = asyncio.run(operation(*args, **kwargs))
else:
# 同步操作
if progress_callback:
# 创建包装函数,在进度回调中更新进度
def wrapped_progress_callback(p):
progress.update(p, "")
if progress_callback:
progress_callback(p)
kwargs['progress_callback'] = wrapped_progress_callback
result = operation(*args, **kwargs)
else:
result = operation(*args, **kwargs)
execution_time = time.time() - start_time
operation_result = OperationResult(
success=True,
data=result,
execution_time=execution_time,
metadata={"task_id": task_id, "task_name": task_name}
)
self.operation_history.append(operation_result)
self.interaction.success(
"操作完成",
f"{task_name} 已完成 (用时: {execution_time:.2f}秒)"
)
return operation_result
except (RuntimeError, ValueError) as e:
execution_time = time.time() - start_time
operation_result = OperationResult(
success=False,
error=str(e),
execution_time=execution_time,
metadata={"task_id": task_id, "task_name": task_name}
)
self.operation_history.append(operation_result)
# 使用异常处理器处理错误
handle_exception(e, f"执行 {task_name}")
return operation_result
def execute_batch_operations(
self,
operations: List[Dict[str, Any]],
show_progress: bool = True,
continue_on_error: bool = True
) -> List[OperationResult]:
"""批量执行操作"""
batch_id = f"batch_{int(time.time())}"
with ProgressContext(
batch_id,
f"批量操作 ({len(operations)} 项)",
show_progress=show_progress
) as batch_progress:
results = []
for i, op in enumerate(operations):
task_id = f"{batch_id}_{i}"
task_name = op.get("name", f"操作 {i+1}")
operation = op["operation"]
args = op.get("args", [])
kwargs = op.get("kwargs", {})
# 更新批量进度
batch_progress.update((i / len(operations)) * 100, f"执行 {task_name}")
# 执行单个操作
result = self.execute_with_progress(
task_id,
task_name,
operation,
*args,
show_progress=False, # 不显示单个操作的进度条
**kwargs
)
results.append(result)
# 如果出错且不继续执行,则中断
if not result.success and not continue_on_error:
self.interaction.warning("批量操作中断", f"在 {task_name} 处出错,停止执行")
break
# 完成批量进度
batch_progress.update(100, "批量操作完成")
# 汇总结果
success_count = sum(1 for r in results if r.success)
total_time = sum(r.execution_time for r in results)
self.interaction.success(
"批量操作完成",
f"成功 {success_count}/{len(results)} 项 (总用时: {total_time:.2f}秒)"
)
return results
def validate_operation(
self,
operation_name: str,
validator: Callable[[], bool],
error_message: str = "验证失败"
) -> bool:
"""验证操作"""
try:
if validator():
self.interaction.info("验证通过", f"{operation_name} 验证成功")
return True
else:
self.interaction.error("验证失败", f"{operation_name}: {error_message}")
return False
except (RuntimeError, ValueError) as e:
handle_exception(e, f"验证 {operation_name}")
return False
def confirm_operation(
self,
operation_name: str,
description: str = "",
details: Dict[str, Any] = None,
default: bool = False
) -> bool:
"""确认操作"""
message = f"是否执行 {operation_name}?"
if description:
message += f"\n{description}"
if details:
message += "\n详细信息:"
for key, value in details.items():
message += f"\n {key}: {value}"
return self.interaction.confirm(operation_name, message, default)
def get_operation_summary(self) -> Dict[str, Any]:
"""获取操作摘要"""
if not self.operation_history:
return {"total_operations": 0, "message": "暂无操作记录"}
total_operations = len(self.operation_history)
successful_operations = sum(1 for op in self.operation_history if op.success)
failed_operations = total_operations - successful_operations
total_time = sum(op.execution_time for op in self.operation_history)
# 最近操作
recent_operations = self.operation_history[-5:]
recent_summary = []
for op in recent_operations:
recent_summary.append({
"name": op.metadata.get("task_name", "未知操作"),
"success": op.success,
"execution_time": op.execution_time,
"error": op.error
})
return {
"total_operations": total_operations,
"successful_operations": successful_operations,
"failed_operations": failed_operations,
"success_rate": (successful_operations / total_operations) * 100 if total_operations > 0 else 0,
"total_execution_time": total_time,
"average_execution_time": total_time / total_operations if total_operations > 0 else 0,
"recent_operations": recent_summary
}
def clear_history(self) -> None:
"""清空操作历史"""
self.operation_history.clear()
self.interaction.info("历史清空", "操作历史已清空")
def retry_operation(
self,
result: OperationResult,
max_retries: int = 3,
retry_delay: float = 1.0
) -> OperationResult:
"""重试失败的操作"""
if result.success:
return result
task_id = result.metadata.get("task_id", f"retry_{int(time.time())}")
task_name = result.metadata.get("task_name", "重试操作")
self.interaction.info("重试操作", f"准备重试 {task_name}")
for attempt in range(max_retries):
try:
# 这里我们无法重试原始操作,因为我们没有保存函数引用
# 在实际使用中,应该保存原始操作的引用以便重试
self.interaction.warning("重试限制", "当前实现不支持自动重试,需要手动执行")
return OperationResult(
success=False,
error="重试功能需要保存原始操作引用",
metadata={"attempt": attempt + 1, "original_error": result.error}
)
except (RuntimeError, ValueError) as e:
if attempt == max_retries - 1:
self.interaction.error("重试失败", f"重试 {max_retries} 次后仍然失败: {e}")
return OperationResult(success=False, error=str(e))
# 等待后重试
time.sleep(retry_delay)
self.interaction.info("重试中", f"第 {attempt + 2} 次尝试...")
return result
# 全局用户体验服务实例
global_ux_service = UserExperienceService()
def execute_with_progress(
task_id: str,
task_name: str,
operation: Callable,
*args,
**kwargs
) -> OperationResult:
"""全局便捷函数"""
return global_ux_service.execute_with_progress(task_id, task_name, operation, *args, **kwargs)
def execute_batch_operations(
operations: List[Dict[str, Any]],
show_progress: bool = True,
continue_on_error: bool = True
) -> List[OperationResult]:
"""批量操作全局便捷函数"""
return global_ux_service.execute_batch_operations(operations, show_progress, continue_on_error)
def confirm_operation(
operation_name: str,
description: str = "",
details: Dict[str, Any] = None,
default: bool = False
) -> bool:
"""确认操作全局便捷函数"""
return global_ux_service.confirm_operation(operation_name, description, details, default)
def get_operation_summary() -> Dict[str, Any]:
"""获取操作摘要全局便捷函数"""
return global_ux_service.get_operation_summary()