Skip to main content
Glama
auth_gateway.py13.3 kB
""" Auth Gateway Service - Obtiene credenciales desde Auth Gateway. Este módulo maneja la comunicación con el Auth Gateway para obtener credenciales de APIs de forma dinámica usando JWT de usuarios. Flujo: 1. Usuario proporciona JWT (obtenido via login en Auth Gateway) 2. MCP llama a GET {AUTH_GATEWAY_URL}{gateway_endpoint} con Bearer token 3. Auth Gateway devuelve {data: {credential_name: value, ...}} 4. Credenciales se cachean en memoria durante la sesión """ from __future__ import annotations import os from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any, Optional import httpx import structlog from .config import get_config logger = structlog.get_logger(__name__) # Configuración desde variables de entorno AUTH_GATEWAY_URL = os.getenv("AUTH_GATEWAY_URL", "") AUTH_GATEWAY_API_KEY = os.getenv("AUTH_GATEWAY_API_KEY", "") CREDENTIALS_CACHE_TTL_SECONDS = int(os.getenv("AUTH_CREDENTIALS_CACHE_TTL", "3600")) @dataclass class APICredentials: """Credenciales de API obtenidas del Auth Gateway.""" credentials: dict[str, str] fetched_at: datetime = field(default_factory=datetime.utcnow) ttl_seconds: int = CREDENTIALS_CACHE_TTL_SECONDS @property def expires_at(self) -> datetime: """Momento en que expiran las credenciales cacheadas.""" return self.fetched_at + timedelta(seconds=self.ttl_seconds) @property def is_expired(self) -> bool: """Verifica si las credenciales han expirado.""" return datetime.utcnow() > self.expires_at def to_auth_params(self) -> dict[str, str]: """ Convierte a parámetros de autenticación para la API. Usa la configuración de credentials_format para mapear las credenciales a query params o headers. """ config = get_config() params = {} for mapping in config.auth.credentials_format: if mapping.name in self.credentials: value = self.credentials[mapping.name] if mapping.prefix: value = f"{mapping.prefix}{value}" if mapping.query_param: params[mapping.query_param] = value # Headers se manejan por separado en to_auth_headers() return params def to_auth_headers(self) -> dict[str, str]: """ Convierte a headers de autenticación para la API. """ config = get_config() headers = {} for mapping in config.auth.credentials_format: if mapping.name in self.credentials and mapping.header: value = self.credentials[mapping.name] if mapping.prefix: value = f"{mapping.prefix}{value}" headers[mapping.header] = value return headers @dataclass class AuthGatewayError(Exception): """Error al comunicarse con el Auth Gateway.""" message: str status_code: Optional[int] = None details: Optional[str] = None def __str__(self) -> str: if self.status_code: return f"Auth Gateway Error ({self.status_code}): {self.message}" return f"Auth Gateway Error: {self.message}" class AuthGatewayService: """ Servicio para obtener credenciales desde Auth Gateway. Maneja: - Comunicación HTTP con el Auth Gateway - Cache de credenciales en memoria (por JWT) - Renovación automática cuando expiran """ def __init__( self, gateway_url: str = AUTH_GATEWAY_URL, api_key: str = AUTH_GATEWAY_API_KEY, cache_ttl_seconds: int = CREDENTIALS_CACHE_TTL_SECONDS, timeout: float = 30.0, ): self.gateway_url = gateway_url.rstrip("/") self.api_key = api_key self.cache_ttl_seconds = cache_ttl_seconds self.timeout = timeout self._cache: dict[str, APICredentials] = {} self._current_jwt: Optional[str] = None logger.info( "auth_gateway_service_init", gateway_url=self.gateway_url, has_api_key=bool(api_key), cache_ttl=cache_ttl_seconds, ) @property def is_authenticated(self) -> bool: """Verifica si hay un JWT configurado con credenciales válidas.""" if not self._current_jwt: return False creds = self._cache.get(self._current_jwt) return creds is not None and not creds.is_expired @property def current_credentials(self) -> Optional[APICredentials]: """Obtiene las credenciales actuales (si existen y no han expirado).""" if not self._current_jwt: return None creds = self._cache.get(self._current_jwt) if creds and not creds.is_expired: return creds return None async def authenticate(self, jwt: str) -> APICredentials: """ Autentica con el Auth Gateway y obtiene credenciales. Args: jwt: Token JWT del usuario (obtenido via login en Auth Gateway) Returns: APICredentials con las credenciales de la API Raises: AuthGatewayError: Si falla la autenticación o el Gateway no responde """ # Verificar cache primero cached = self._cache.get(jwt) if cached and not cached.is_expired: logger.debug("auth_cache_hit", jwt_hash=self._hash_jwt(jwt)) self._current_jwt = jwt return cached # Fetch desde el Gateway logger.info("auth_fetching_credentials", gateway_url=self.gateway_url) credentials = await self._fetch_from_gateway(jwt) # Guardar en cache self._cache[jwt] = credentials self._current_jwt = jwt logger.info( "auth_credentials_cached", jwt_hash=self._hash_jwt(jwt), expires_at=credentials.expires_at.isoformat(), ) return credentials async def _fetch_from_gateway(self, jwt: str) -> APICredentials: """ Llama al Auth Gateway para obtener credenciales. Endpoint: GET {gateway_url}{gateway_endpoint} Headers: - Authorization: Bearer <jwt> - X-API-Key: <api_key> Response: {data: {credential_name: value, ...}} """ config = get_config() url = f"{self.gateway_url}{config.auth.gateway_endpoint}" headers = { "Authorization": f"Bearer {jwt}", "X-API-Key": self.api_key, } service_name = config.service.display_name try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(url, headers=headers) if response.status_code == 401: raise AuthGatewayError( message="JWT inválido o expirado. Por favor, vuelve a hacer login.", status_code=401, ) if response.status_code == 403: raise AuthGatewayError( message=f"No tienes permisos para acceder a las credenciales de {service_name}.", status_code=403, ) if response.status_code == 404: raise AuthGatewayError( message=f"No tienes credenciales de {service_name} configuradas en Auth Gateway. " "Por favor, añade tus credenciales en el panel de Auth Gateway.", status_code=404, ) if response.status_code >= 500: raise AuthGatewayError( message="Auth Gateway no disponible. Inténtalo más tarde.", status_code=response.status_code, details=response.text[:200] if response.text else None, ) response.raise_for_status() data = response.json() # Extraer credenciales del response # Formato esperado: {data: {key: value, ...}} creds_data = data.get("data", data) if not isinstance(creds_data, dict): raise AuthGatewayError( message="Respuesta inválida del Auth Gateway: formato incorrecto", details=f"Tipo recibido: {type(creds_data).__name__}", ) # Verificar que tenemos las credenciales esperadas expected_creds = [m.name for m in config.auth.credentials_format] missing = [c for c in expected_creds if c not in creds_data] if missing: raise AuthGatewayError( message="Respuesta inválida del Auth Gateway: faltan credenciales", details=f"Credenciales faltantes: {missing}", ) return APICredentials( credentials=creds_data, ttl_seconds=self.cache_ttl_seconds, ) except httpx.ConnectError as e: raise AuthGatewayError( message=f"No se pudo conectar con Auth Gateway ({self.gateway_url})", details=str(e), ) except httpx.TimeoutException: raise AuthGatewayError( message="Timeout al conectar con Auth Gateway", details=f"Timeout: {self.timeout}s", ) except httpx.HTTPStatusError as e: raise AuthGatewayError( message="Error HTTP del Auth Gateway", status_code=e.response.status_code, details=e.response.text[:200] if e.response.text else None, ) def get_auth_params(self) -> dict[str, str]: """ Obtiene los parámetros de autenticación para la API. Returns: Dict con parámetros para usar en requests Raises: AuthGatewayError: Si no hay credenciales configuradas """ creds = self.current_credentials if not creds: raise AuthGatewayError( message="No hay credenciales configuradas. " "Usa la herramienta 'set_auth_token' con tu JWT primero." ) return creds.to_auth_params() def get_auth_headers(self) -> dict[str, str]: """ Obtiene los headers de autenticación para la API. Returns: Dict con headers para usar en requests Raises: AuthGatewayError: Si no hay credenciales configuradas """ creds = self.current_credentials if not creds: raise AuthGatewayError( message="No hay credenciales configuradas. " "Usa la herramienta 'set_auth_token' con tu JWT primero." ) return creds.to_auth_headers() def clear_cache(self, jwt: Optional[str] = None) -> None: """ Limpia el cache de credenciales. Args: jwt: JWT específico a limpiar. Si es None, limpia todo el cache. """ if jwt: self._cache.pop(jwt, None) if self._current_jwt == jwt: self._current_jwt = None logger.info("auth_cache_cleared", jwt_hash=self._hash_jwt(jwt)) else: self._cache.clear() self._current_jwt = None logger.info("auth_cache_cleared_all") def logout(self) -> None: """Cierra la sesión actual (limpia el JWT actual del cache).""" if self._current_jwt: self.clear_cache(self._current_jwt) @staticmethod def _hash_jwt(jwt: str) -> str: """Hash parcial del JWT para logging (no exponer el token completo).""" if len(jwt) < 20: return "***" return f"{jwt[:8]}...{jwt[-4:]}" # Instancia global del servicio _auth_service: Optional[AuthGatewayService] = None def get_auth_service() -> AuthGatewayService: """ Obtiene la instancia global del servicio de Auth Gateway. Raises: RuntimeError: Si AUTH_GATEWAY_URL o AUTH_GATEWAY_API_KEY no están configurados """ global _auth_service if _auth_service is None: if not AUTH_GATEWAY_URL: raise RuntimeError( "AUTH_GATEWAY_URL no está configurado.\n\n" "Este MCP requiere autenticación via Auth Gateway.\n" "Configura la variable de entorno AUTH_GATEWAY_URL.\n\n" "Ejemplo: AUTH_GATEWAY_URL=https://auth.example.com" ) if not AUTH_GATEWAY_API_KEY: raise RuntimeError( "AUTH_GATEWAY_API_KEY no está configurado.\n\n" "Este MCP requiere una API Key para comunicarse con el Auth Gateway.\n" "Configura la variable de entorno AUTH_GATEWAY_API_KEY." ) _auth_service = AuthGatewayService( gateway_url=AUTH_GATEWAY_URL, api_key=AUTH_GATEWAY_API_KEY, ) return _auth_service def reset_auth_service() -> None: """Resetea la instancia global (útil para testing).""" global _auth_service _auth_service = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jesusperezdeveloper/mcp_openapi_template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server