Skip to main content
Glama
growley

MCP HTTP TAVILY DATE OAUTH

by growley
auth_middleware.py11 kB
#!/usr/bin/env python3 """ Middleware de autenticación para FastAPI """ from typing import Optional, List, Callable from fastapi import HTTPException, status, Depends, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from functools import wraps try: from .auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError from .auth_service import auth_service from .logger import get_logger except ImportError: from auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError from auth_service import auth_service from logger import get_logger logger = get_logger("auth-middleware") # Esquema de seguridad Bearer security = HTTPBearer(auto_error=False) class AuthMiddleware: """Middleware de autenticación""" def __init__(self): self.public_paths = { "/", "/health", "/tools", "/docs", "/openapi.json", "/redoc", "/login", "/auth/success", "/test-page" # Removido "/mcp" para que el middleware procese la autenticación } self.auth_paths = { "/auth", "/auth/callback", "/auth/token", "/auth/providers" } def is_public_path(self, path: str) -> bool: """Verificar si una ruta es pública""" # Rutas exactas públicas if path in self.public_paths: return True # Rutas de autenticación if any(path.startswith(auth_path) for auth_path in self.auth_paths): return True # Rutas estáticas if path.startswith("/static/") or path.startswith("/favicon"): return True return False async def __call__(self, request: Request, call_next): """Middleware principal""" path = request.url.path # Permitir rutas públicas if self.is_public_path(path): return await call_next(request) # Para rutas MCP, manejar autenticación de forma especial if path == "/mcp": # Leer el body de la petición para determinar el método MCP body = await request.body() try: import json mcp_request = json.loads(body.decode()) mcp_method = mcp_request.get("method", "") # Métodos MCP que NO requieren autenticación public_mcp_methods = ["initialize", "notifications/initialized", "tools/list"] if mcp_method in public_mcp_methods: logger.info(f"Método MCP público: {mcp_method}") return await call_next(request) # Para tools/call, verificar autenticación logger.info(f"Método MCP que requiere autenticación: {mcp_method}") except Exception as e: logger.warning(f"Error parseando petición MCP: {e}") # Si no se puede parsear, requerir autenticación por seguridad pass # Verificar autenticación para rutas protegidas try: token = None # 1. Intentar obtener token del header Authorization auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header.split(" ")[1] # 2. Si no hay token en header, intentar obtener de cookies if not token: token = request.cookies.get("access_token") if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token de acceso requerido", headers={"WWW-Authenticate": "Bearer"} ) user = auth_service.verify_access_token(token) # Para rutas MCP que requieren autenticación, verificar scopes if path == "/mcp": # Decodificar token para verificar scopes from jose import jwt payload = jwt.decode( token, auth_service.secret_key, algorithms=[auth_service.algorithm] ) token_scopes = payload.get("scopes", []) if "mcp:read" not in token_scopes: logger.warning(f"Scope 'mcp:read' no encontrado en {token_scopes}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permisos insuficientes. Se requiere scope 'mcp:read'" ) logger.info(f"Scope 'mcp:read' verificado correctamente para {user.email}") # Agregar usuario al contexto de la petición request.state.user = user request.state.authenticated = True logger.debug(f"Usuario autenticado: {user.email} para {path}") except (TokenExpiredError, InvalidTokenError) as e: logger.warning(f"Token inválido para {path}: {e.message}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=e.message, headers={"WWW-Authenticate": "Bearer"} ) except AuthError as e: logger.warning(f"Error de autenticación para {path}: {e.message}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=e.message ) except HTTPException: # Re-lanzar HTTPException sin modificar raise except Exception as e: logger.error(f"Error inesperado en autenticación: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno de autenticación" ) return await call_next(request) # Dependencias de FastAPI para autenticación async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security) ) -> User: """Obtener usuario actual desde el token""" if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token de acceso requerido", headers={"WWW-Authenticate": "Bearer"} ) try: user = auth_service.verify_access_token(credentials.credentials) return user except TokenExpiredError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expirado", headers={"WWW-Authenticate": "Bearer"} ) except InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido", headers={"WWW-Authenticate": "Bearer"} ) except AuthError as e: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=e.message ) async def get_current_active_user( current_user: User = Depends(get_current_user) ) -> User: """Obtener usuario activo actual""" if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Usuario desactivado" ) return current_user def require_role(required_role: UserRole): """Decorador para requerir un rol específico""" def role_checker(current_user: User = Depends(get_current_active_user)) -> User: if current_user.role != required_role and current_user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Rol requerido: {required_role.value}" ) return current_user return role_checker def require_scopes(required_scopes: List[str]): """Decorador para requerir scopes específicos""" def scope_checker( current_user: User = Depends(get_current_active_user), credentials: HTTPAuthorizationCredentials = Depends(security) ) -> User: try: # Verificar scopes en el token from jose import jwt payload = jwt.decode( credentials.credentials, auth_service.secret_key, algorithms=[auth_service.algorithm] ) token_scopes = payload.get("scopes", []) for scope in required_scopes: if scope not in token_scopes: raise InsufficientScopeError(scope) return current_user except InsufficientScopeError as e: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=e.message ) except Exception as e: logger.error(f"Error verificando scopes: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error verificando permisos" ) return scope_checker # Dependencias específicas para diferentes niveles de acceso async def get_admin_user( current_user: User = Depends(require_role(UserRole.ADMIN)) ) -> User: """Obtener usuario administrador""" return current_user async def get_user_with_mcp_read( current_user: User = Depends(require_scopes(["mcp:read"])) ) -> User: """Obtener usuario con permisos de lectura MCP""" return current_user async def get_user_with_mcp_read_from_request(request: Request) -> User: """Obtener usuario con permisos de lectura MCP desde Request""" # El middleware ya se encarga de la autenticación y verificación de scopes user = getattr(request.state, 'user', None) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token de acceso requerido", headers={"WWW-Authenticate": "Bearer"} ) return user async def get_user_with_mcp_write( current_user: User = Depends(require_scopes(["mcp:write"])) ) -> User: """Obtener usuario con permisos de escritura MCP""" return current_user # Funciones auxiliares def get_user_from_request(request: Request) -> Optional[User]: """Obtener usuario desde el contexto de la petición""" return getattr(request.state, 'user', None) def is_authenticated(request: Request) -> bool: """Verificar si la petición está autenticada""" return getattr(request.state, 'authenticated', False) # Instancia del middleware auth_middleware = AuthMiddleware()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/growley/MCP_TAVILI_DATE_OAUT'

If you have feedback or need assistance with the MCP directory API, please join our Discord server