Skip to main content
Glama
Huangwh826

Halo MCP Server

by Huangwh826
base.py9.74 kB
"""带重试与错误处理的基础 HTTP 客户端""" from typing import Any, Dict, Optional import httpx from loguru import logger from halo_mcp_server.config import settings from halo_mcp_server.exceptions import ( AuthenticationError, AuthorizationError, NetworkError, ResourceNotFoundError, ) class BaseHTTPClient: """通用功能的基础 HTTP 客户端""" def __init__(self, base_url: str, timeout: int = 30): """ 初始化 HTTP 客户端。 参数: base_url: API 基础地址 timeout: 请求超时时间(秒) """ self.base_url = base_url.rstrip("/") self.timeout = timeout self._client: Optional[httpx.AsyncClient] = None self._headers: Dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json", } async def __aenter__(self): """异步上下文管理器入口。""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器退出。""" await self.close() async def connect(self) -> None: """创建 HTTP 客户端连接。""" if self._client is None: self._client = httpx.AsyncClient( base_url=self.base_url, timeout=self.timeout, limits=httpx.Limits( max_connections=settings.http_pool_size, max_keepalive_connections=settings.http_pool_size // 2, ), headers=self._headers, follow_redirects=True, ) logger.debug(f"HTTP 客户端已连接:{self.base_url}") async def close(self) -> None: """关闭 HTTP 客户端连接。""" if self._client is not None: await self._client.aclose() self._client = None logger.debug("HTTP 客户端已关闭") def set_auth_token(self, token: str) -> None: """ 设置认证令牌。 参数: token: Bearer 令牌 """ self._headers["Authorization"] = f"Bearer {token}" if self._client: self._client.headers.update({"Authorization": f"Bearer {token}"}) logger.debug("认证令牌已设置") def remove_auth_token(self) -> None: """移除认证令牌。""" self._headers.pop("Authorization", None) if self._client: self._client.headers.pop("Authorization", None) logger.debug("认证令牌已移除") async def _request( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """ 发起带重试逻辑的 HTTP 请求。 参数: method: HTTP 方法 path: 请求路径 params: 查询参数 json: JSON 请求体 data: 表单数据 files: 上传文件 headers: 额外请求头 返回: 响应 JSON 异常: AuthenticationError:认证失败 ResourceNotFoundError:资源未找到 NetworkError:网络/HTTP 错误 """ if not self._client: await self.connect() url = path if path.startswith("http") else f"{self.base_url}{path}" request_headers = {**self._headers, **(headers or {})} # Remove Content-Type for multipart/form-data (httpx will set it) if files: request_headers.pop("Content-Type", None) retry_count = 0 last_error = None while retry_count <= settings.max_retries: try: logger.debug(f"API 请求:{method} {url}") response = await self._client.request( method=method, url=url, params=params, json=json, data=data, files=files, headers=request_headers, ) # Handle error status codes if response.status_code == 401: logger.error("认证失败(401)") raise AuthenticationError("认证失败。请检查令牌或凭据。") if response.status_code == 403: logger.error("权限不足(403)") raise AuthorizationError("权限不足。访问受限。") if response.status_code == 404: logger.error(f"资源未找到(404):{url}") raise ResourceNotFoundError("资源", path) if response.status_code >= 500: error_detail = response.text try: error_json = response.json() error_detail = ( error_json.get("detail") or error_json.get("message") or error_detail ) except Exception: pass logger.error(f"HTTP {response.status_code} 错误:{error_detail}") raise NetworkError( f"HTTP {response.status_code} 错误:{error_detail}", status_code=response.status_code, ) if response.status_code >= 400: error_detail = response.text try: error_json = response.json() error_detail = ( error_json.get("detail") or error_json.get("message") or error_detail ) except Exception: pass logger.error(f"HTTP {response.status_code} 错误:{error_detail}") raise NetworkError( f"HTTP {response.status_code} 错误:{error_detail}", status_code=response.status_code, ) # Parse response if response.status_code == 204: return {} try: result = response.json() logger.debug(f"API 响应:{response.status_code}") return result except Exception as e: logger.warning(f"解析 JSON 响应失败:{e}") return {"text": response.text} except (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError) as e: last_error = e retry_count += 1 if retry_count <= settings.max_retries: logger.warning( f"请求失败,正在重试({retry_count}/{settings.max_retries}):{e}" ) await self._wait_retry() else: logger.error(f"请求在重试 {settings.max_retries} 次后仍失败:{e}") raise NetworkError(f"网络错误:{e}") except (AuthenticationError, ResourceNotFoundError, NetworkError): raise except Exception as e: logger.error(f"请求过程中出现未预期错误:{e}", exc_info=True) raise NetworkError(f"未预期错误:{e}") raise NetworkError(f"请求在重试 {settings.max_retries} 次后仍失败:{last_error}") async def _wait_retry(self) -> None: """重试前等待。""" import asyncio await asyncio.sleep(settings.retry_delay) async def get( self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """GET 请求。""" return await self._request("GET", path, params=params, headers=headers) async def post( self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """POST 请求。""" return await self._request( "POST", path, params=params, json=json, data=data, files=files, headers=headers ) async def put( self, path: str, json: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """PUT 请求。""" return await self._request("PUT", path, params=params, json=json, headers=headers) async def patch( self, path: str, json: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """PATCH 请求。""" return await self._request("PATCH", path, params=params, json=json, headers=headers) async def delete( self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """DELETE 请求。""" return await self._request("DELETE", path, params=params, headers=headers) @property def headers(self): return self._headers

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Huangwh826/halo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server