Skip to main content
Glama
server.py14.1 kB
""" Servidor MCP base con soporte para OpenAPI. AUTENTICACIÓN: Este MCP requiere autenticación via Auth Gateway. 1. El usuario debe hacer login en Auth Gateway para obtener un JWT 2. Llamar a set_auth_token(jwt) para configurar la sesión 3. Las credenciales de la API se obtienen automáticamente del Gateway Soporta dos modos de transporte: - stdio: Para uso local con Claude Desktop/Cursor (por defecto) - sse: Para despliegue remoto en VPS/EasyPanel (activar con MCP_TRANSPORT=sse) Requiere que `vendor/` esté en PYTHONPATH para resolver `mcp` (vendorizado). """ from __future__ import annotations import asyncio import os import random import sys from pathlib import Path from typing import Any, Callable, Literal, TypeVar import httpx import structlog from dotenv import load_dotenv from fastapi import HTTPException from mcp.server.fastmcp import FastMCP from .config import get_config from .openapi_tools import register_openapi_tools from .validation import sanitize_string from .tool_policies import log_tool_execution from .auth_gateway import ( get_auth_service, AuthGatewayService, AuthGatewayError, AUTH_GATEWAY_URL, ) # Configurar structlog structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() if os.getenv("LOG_FORMAT") == "json" else structlog.dev.ConsoleRenderer(), ], wrapper_class=structlog.stdlib.BoundLogger, context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) T = TypeVar("T") load_dotenv() # Cargar configuración config = get_config() # Configuración de la API BASE_URL = os.getenv("API_BASE_URL", config.api.base_url) SPEC_PATH = os.getenv("OPENAPI_SPEC_PATH", "openapi/spec.json") # Configuración de transporte SSE MCP_TRANSPORT: Literal["stdio", "sse"] = os.getenv("MCP_TRANSPORT", "stdio") # type: ignore MCP_HOST = os.getenv("MCP_HOST", "0.0.0.0") MCP_PORT = int(os.getenv("MCP_PORT", "8000")) def _require_auth() -> AuthGatewayService: """ Verifica que el usuario esté autenticado via Auth Gateway. Returns: AuthGatewayService con credenciales válidas Raises: RuntimeError: Si no hay JWT configurado o las credenciales expiraron """ auth_service = get_auth_service() if not auth_service.is_authenticated: raise RuntimeError( "Autenticación requerida.\n\n" f"Este MCP requiere autenticación via Auth Gateway.\n" "Pasos:\n" " 1. Haz login en Auth Gateway para obtener tu JWT\n" " 2. Llama a la herramienta 'set_auth_token' con tu JWT\n" " 3. Luego podrás usar las demás herramientas\n\n" f"Auth Gateway: {AUTH_GATEWAY_URL}" ) return auth_service def _client() -> httpx.AsyncClient: """Crea un cliente HTTP (las credenciales se añaden en cada request).""" return httpx.AsyncClient(base_url=BASE_URL, timeout=config.api.timeout) def _auth_params() -> dict[str, str]: """Obtiene los parámetros de autenticación desde Auth Gateway.""" auth_service = _require_auth() return auth_service.get_auth_params() def _auth_headers() -> dict[str, str]: """Obtiene los headers de autenticación desde Auth Gateway.""" auth_service = _require_auth() return auth_service.get_auth_headers() def _merge_auth(params: dict[str, Any] | None) -> dict[str, Any]: """Mezcla parámetros con credenciales de autenticación.""" merged = dict(params or {}) auth = _auth_params() for key, value in auth.items(): merged.setdefault(key, value) return merged # Configuración de retry RETRY_MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "3")) RETRY_BASE_DELAY = float(os.getenv("RETRY_BASE_DELAY", "1.0")) RETRY_MAX_DELAY = float(os.getenv("RETRY_MAX_DELAY", "30.0")) RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504} async def with_retry( func: Callable[[], T], *, max_attempts: int = RETRY_MAX_ATTEMPTS, base_delay: float = RETRY_BASE_DELAY, max_delay: float = RETRY_MAX_DELAY, retryable_status: set[int] = RETRYABLE_STATUS_CODES, ) -> T: """ Ejecuta una función async con retry y backoff exponencial. Args: func: Función async a ejecutar max_attempts: Número máximo de intentos (default: 3) base_delay: Delay base en segundos (default: 1.0) max_delay: Delay máximo en segundos (default: 30.0) retryable_status: Códigos HTTP que disparan retry Returns: El resultado de la función Raises: httpx.HTTPStatusError: Si se agotan los reintentos o el error no es retriable Exception: Cualquier otro error no retriable """ last_exception: Exception | None = None for attempt in range(max_attempts): try: return await func() except httpx.HTTPStatusError as exc: last_exception = exc status_code = exc.response.status_code if status_code not in retryable_status: logger.warning( f"Error no retriable (HTTP {status_code}): {exc.response.text[:200]}" ) raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) total_delay = delay + jitter logger.warning( f"Error retriable (HTTP {status_code}), intento {attempt + 1}/{max_attempts}. " f"Reintentando en {total_delay:.2f}s..." ) if attempt < max_attempts - 1: await asyncio.sleep(total_delay) except (httpx.ConnectError, httpx.TimeoutException) as exc: last_exception = exc delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) total_delay = delay + jitter logger.warning( f"Error de conexión ({type(exc).__name__}), intento {attempt + 1}/{max_attempts}. " f"Reintentando en {total_delay:.2f}s..." ) if attempt < max_attempts - 1: await asyncio.sleep(total_delay) if last_exception: raise last_exception raise RuntimeError("with_retry: no se ejecutó ningún intento") async def api_request( method: str, path: str, *, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, ) -> Any: """ Ejecuta una request a la API con retry automático. Args: method: Método HTTP (GET, POST, PUT, DELETE) path: Path del endpoint (ej: /resources/123) params: Query parameters json: Body JSON Returns: Respuesta JSON de la API Raises: HTTPException: Con detalles del error RuntimeError: Si no hay autenticación configurada """ _require_auth() async def _do_request() -> httpx.Response: async with _client() as client: headers = _auth_headers() resp = await client.request( method.upper(), path, params=_merge_auth(params), json=json, headers=headers if headers else None, ) resp.raise_for_status() return resp try: resp = await with_retry(_do_request) except AuthGatewayError as exc: raise HTTPException(status_code=401, detail=str(exc)) except httpx.HTTPStatusError as exc: detail = f"Error de API ({exc.response.status_code}): {exc.response.text}" raise HTTPException(status_code=502, detail=detail) except (httpx.ConnectError, httpx.TimeoutException) as exc: raise HTTPException( status_code=502, detail=f"Error de conexión con la API: {exc}" ) except Exception as exc: raise HTTPException( status_code=502, detail=f"Error al conectar con la API: {exc}" ) try: return resp.json() except Exception: return resp.text # Crear instancia del servidor MCP mcp = FastMCP( name=f"{config.service.display_name} MCP", instructions=config.service.description, host=MCP_HOST, port=MCP_PORT, ) # ============================================================================= # Herramientas de Autenticación (siempre disponibles) # ============================================================================= @mcp.tool( description="Configura el token JWT para autenticación con Auth Gateway. " "OBLIGATORIO: Llamar antes de usar cualquier otra herramienta." ) async def set_auth_token(jwt: str) -> dict[str, Any]: """ Configura el JWT para autenticación con Auth Gateway. El JWT se obtiene haciendo login en Auth Gateway (POST /auth/login). Una vez configurado, las credenciales se obtienen automáticamente. Args: jwt: Token JWT obtenido del login en Auth Gateway Returns: { "success": bool, "message": str, "error": str si falla } """ if not jwt or not jwt.strip(): return { "success": False, "error": "JWT vacío. Proporciona un JWT válido obtenido de Auth Gateway.", } jwt = jwt.strip() auth_service = get_auth_service() try: credentials = await auth_service.authenticate(jwt) logger.info( "auth_token_set", expires_at=credentials.expires_at.isoformat(), ) return { "success": True, "message": "Autenticado correctamente via Auth Gateway", "credentials_expire_at": credentials.expires_at.isoformat(), } except AuthGatewayError as e: logger.warning("auth_token_failed", error=str(e)) return { "success": False, "error": str(e), } except Exception as e: logger.error("auth_token_error", error=str(e)) return { "success": False, "error": f"Error inesperado: {str(e)}", } @mcp.tool(description="Cierra la sesión actual y limpia las credenciales cacheadas.") async def logout() -> dict[str, Any]: """Cierra la sesión y limpia las credenciales del cache.""" auth_service = get_auth_service() was_authenticated = auth_service.is_authenticated auth_service.logout() return { "success": True, "message": "Sesión cerrada" if was_authenticated else "No había sesión activa", } @mcp.tool(description="Muestra el estado actual de autenticación.") async def get_auth_status() -> dict[str, Any]: """ Retorna el estado de autenticación actual. Returns: { "authenticated": bool, "auth_gateway_url": str, "credentials_expire_at": str (si autenticado), "message": str } """ auth_service = get_auth_service() creds = auth_service.current_credentials if creds and not creds.is_expired: return { "authenticated": True, "auth_gateway_url": AUTH_GATEWAY_URL, "credentials_expire_at": creds.expires_at.isoformat(), "message": "Autenticado correctamente via Auth Gateway", } return { "authenticated": False, "auth_gateway_url": AUTH_GATEWAY_URL, "message": "No autenticado. Usa 'set_auth_token' con tu JWT para autenticarte.", } # ============================================================================= # Punto de entrada # ============================================================================= def _validate_auth_gateway_config() -> None: """Valida que AUTH_GATEWAY_URL esté configurado al inicio.""" if not AUTH_GATEWAY_URL: logger.error("auth_gateway_not_configured") error_msg = f""" ============================================================ ERROR: AUTH_GATEWAY_URL no configurado ============================================================ Este MCP requiere autenticacion via Auth Gateway. Configura la variable de entorno: AUTH_GATEWAY_URL=https://auth.example.com Flujo de autenticacion: 1. Usuario hace login en Auth Gateway -> obtiene JWT 2. Llama a set_auth_token(jwt) en el MCP 3. MCP obtiene credenciales de la API del Gateway ============================================================ """ print(error_msg, file=sys.stderr) sys.exit(1) logger.info( "auth_gateway_configured", gateway_url=AUTH_GATEWAY_URL, ) def main() -> None: """Punto de entrada principal del servidor MCP.""" # Validar que AUTH_GATEWAY_URL esté configurado _validate_auth_gateway_config() # Registrar herramientas OpenAPI si el spec existe spec_path = Path(SPEC_PATH) if spec_path.exists(): tool_count = register_openapi_tools( mcp, spec_path=spec_path, auth_params=_auth_params, auth_headers=_auth_headers, client_factory=_client, ) logger.info("openapi_tools_registered", count=tool_count, spec=str(spec_path)) else: logger.warning("openapi_spec_not_found", spec=str(spec_path)) # Ejecutar con el transporte configurado (stdio o sse) transport = MCP_TRANSPORT if MCP_TRANSPORT in ("stdio", "sse") else "stdio" if transport == "sse": logger.info( "server_starting", transport=transport, host=MCP_HOST, port=MCP_PORT, endpoints=["/sse", "/messages/"], ) mcp.run(transport=transport) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jesusperezdeveloper/mcp_openapi_template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server