Skip to main content
Glama

MCP AI Service Platform

by dkb12138ggg
error_handling.py10.9 kB
"""错误处理和重试机制""" import asyncio import traceback from typing import Any, Dict, Optional, Union, Callable, Type from enum import Enum from dataclasses import dataclass from functools import wraps from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_result, RetryError ) from circuitbreaker import circuit, CircuitBreakerError from src.utils.logging import get_structured_logger from src.utils.metrics import metrics logger = get_structured_logger(__name__) class ErrorType(Enum): """错误类型枚举""" CONNECTION_ERROR = "connection_error" TIMEOUT_ERROR = "timeout_error" AUTHENTICATION_ERROR = "authentication_error" RATE_LIMIT_ERROR = "rate_limit_error" VALIDATION_ERROR = "validation_error" TOOL_EXECUTION_ERROR = "tool_execution_error" OPENAI_API_ERROR = "openai_api_error" INTERNAL_ERROR = "internal_error" CIRCUIT_BREAKER_ERROR = "circuit_breaker_error" @dataclass class ErrorInfo: """错误信息""" error_type: ErrorType message: str details: Optional[Dict[str, Any]] = None traceback: Optional[str] = None component: str = "unknown" recoverable: bool = True class MCPException(Exception): """MCP自定义异常基类""" def __init__(self, error_info: ErrorInfo): self.error_info = error_info super().__init__(error_info.message) class ConnectionException(MCPException): """连接异常""" pass class TimeoutException(MCPException): """超时异常""" pass class AuthenticationException(MCPException): """认证异常""" pass class RateLimitException(MCPException): """限流异常""" pass class ValidationException(MCPException): """验证异常""" pass class ToolExecutionException(MCPException): """工具执行异常""" pass class OpenAIAPIException(MCPException): """OpenAI API异常""" pass def is_retriable_error(exception: Exception) -> bool: """判断异常是否可重试""" if isinstance(exception, MCPException): return exception.error_info.recoverable # 特定异常类型的重试逻辑 retriable_types = ( ConnectionError, TimeoutError, asyncio.TimeoutError, ConnectionException, TimeoutException, ) return isinstance(exception, retriable_types) def create_retry_decorator( max_attempts: int = 3, min_wait: float = 1.0, max_wait: float = 10.0, component: str = "unknown" ): """创建重试装饰器""" def decorator(func: Callable): @retry( stop=stop_after_attempt(max_attempts), wait=wait_exponential(multiplier=1, min=min_wait, max=max_wait), retry=retry_if_exception_type(( ConnectionError, TimeoutError, asyncio.TimeoutError, ConnectionException, TimeoutException, )), reraise=True ) @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except RetryError as e: # 记录重试失败 original_error = e.last_attempt.exception() logger.error( "重试失败", component=component, function=func.__name__, attempts=max_attempts, error=str(original_error) ) metrics.record_error("retry_failed", component) raise original_error except Exception as e: logger.error( "函数执行失败", component=component, function=func.__name__, error=str(e) ) metrics.record_error(type(e).__name__, component) raise return wrapper return decorator def create_circuit_breaker( failure_threshold: int = 5, recovery_timeout: int = 30, expected_exception: Type[Exception] = Exception, component: str = "unknown" ): """创建断路器装饰器""" def decorator(func: Callable): @circuit( failure_threshold=failure_threshold, recovery_timeout=recovery_timeout, expected_exception=expected_exception ) @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except CircuitBreakerError as e: logger.warning( "断路器打开", component=component, function=func.__name__, reason=str(e) ) metrics.record_error("circuit_breaker_open", component) raise MCPException(ErrorInfo( error_type=ErrorType.CIRCUIT_BREAKER_ERROR, message=f"服务暂时不可用: {str(e)}", component=component, recoverable=False )) except Exception as e: logger.error( "断路器包装函数失败", component=component, function=func.__name__, error=str(e) ) raise return wrapper return decorator async def handle_error( error: Exception, component: str, context: Optional[Dict[str, Any]] = None, user_friendly: bool = True ) -> ErrorInfo: """统一错误处理""" # 获取错误追踪信息 error_traceback = traceback.format_exc() # 分类错误 if isinstance(error, MCPException): error_info = error.error_info elif isinstance(error, (ConnectionError, ConnectionException)): error_info = ErrorInfo( error_type=ErrorType.CONNECTION_ERROR, message="连接服务器失败,请稍后重试", component=component, recoverable=True ) elif isinstance(error, (TimeoutError, asyncio.TimeoutError, TimeoutException)): error_info = ErrorInfo( error_type=ErrorType.TIMEOUT_ERROR, message="请求超时,请稍后重试", component=component, recoverable=True ) elif isinstance(error, (AuthenticationException,)): error_info = ErrorInfo( error_type=ErrorType.AUTHENTICATION_ERROR, message="认证失败,请检查API密钥", component=component, recoverable=False ) elif isinstance(error, (RateLimitException,)): error_info = ErrorInfo( error_type=ErrorType.RATE_LIMIT_ERROR, message="请求频率过高,请稍后重试", component=component, recoverable=True ) elif isinstance(error, (ValidationException,)): error_info = ErrorInfo( error_type=ErrorType.VALIDATION_ERROR, message="请求参数无效", component=component, recoverable=False ) else: error_info = ErrorInfo( error_type=ErrorType.INTERNAL_ERROR, message="内部服务器错误" if user_friendly else str(error), component=component, recoverable=False ) # 添加详细信息 error_info.details = context or {} error_info.traceback = error_traceback # 记录错误 logger.error( "处理错误", error_type=error_info.error_type.value, component=component, message=error_info.message, recoverable=error_info.recoverable, context=context ) # 记录指标 metrics.record_error(error_info.error_type.value, component) return error_info class ErrorHandler: """错误处理器""" def __init__(self, component: str): self.component = component async def handle_connection_error(self, error: Exception, server_name: str) -> ErrorInfo: """处理连接错误""" context = {"server_name": server_name} return await handle_error(error, self.component, context) async def handle_tool_execution_error( self, error: Exception, tool_name: str, server_name: str, arguments: Dict[str, Any] ) -> ErrorInfo: """处理工具执行错误""" context = { "tool_name": tool_name, "server_name": server_name, "arguments": arguments } return await handle_error(error, self.component, context) async def handle_openai_error(self, error: Exception, model: str) -> ErrorInfo: """处理OpenAI API错误""" context = {"model": model} # 特殊处理OpenAI错误 error_message = str(error) if "rate limit" in error_message.lower(): error_info = ErrorInfo( error_type=ErrorType.RATE_LIMIT_ERROR, message="OpenAI API请求频率限制", component=self.component, recoverable=True, details=context ) elif "authentication" in error_message.lower() or "unauthorized" in error_message.lower(): error_info = ErrorInfo( error_type=ErrorType.AUTHENTICATION_ERROR, message="OpenAI API认证失败", component=self.component, recoverable=False, details=context ) else: error_info = ErrorInfo( error_type=ErrorType.OPENAI_API_ERROR, message=f"OpenAI API错误: {error_message}", component=self.component, recoverable=True, details=context ) # 记录错误 logger.error( "OpenAI API错误", error_type=error_info.error_type.value, model=model, message=error_message ) metrics.record_error(error_info.error_type.value, self.component) return error_info # 常用装饰器 connection_retry = create_retry_decorator(max_attempts=3, component="connection") tool_execution_retry = create_retry_decorator(max_attempts=2, component="tool_execution") openai_retry = create_retry_decorator(max_attempts=3, component="openai") connection_circuit_breaker = create_circuit_breaker( failure_threshold=5, recovery_timeout=30, component="connection" ) tool_circuit_breaker = create_circuit_breaker( failure_threshold=3, recovery_timeout=60, component="tool_execution" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dkb12138ggg/python-rag-mcp-client'

If you have feedback or need assistance with the MCP directory API, please join our Discord server