#!/usr/bin/env python3
"""
Middleware de autenticación para FastAPI
"""
from typing import Optional, List, Callable
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from functools import wraps
try:
from .auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError
from .auth_service import auth_service
from .logger import get_logger
except ImportError:
from auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError
from auth_service import auth_service
from logger import get_logger
logger = get_logger("auth-middleware")
# Esquema de seguridad Bearer
security = HTTPBearer(auto_error=False)
class AuthMiddleware:
"""Middleware de autenticación"""
def __init__(self):
self.public_paths = {
"/",
"/health",
"/tools",
"/docs",
"/openapi.json",
"/redoc",
"/login",
"/auth/success",
"/test-page",
"/mcp" # Permitir MCP sin autenticación automática
}
self.auth_paths = {
"/auth",
"/auth/callback",
"/auth/token",
"/auth/providers"
}
def is_public_path(self, path: str) -> bool:
"""Verificar si una ruta es pública"""
# Rutas exactas públicas
if path in self.public_paths:
return True
# Rutas de autenticación
if any(path.startswith(auth_path) for auth_path in self.auth_paths):
return True
# Rutas estáticas
if path.startswith("/static/") or path.startswith("/favicon"):
return True
return False
async def __call__(self, request: Request, call_next):
"""Middleware principal"""
path = request.url.path
# Permitir rutas públicas
if self.is_public_path(path):
return await call_next(request)
# Verificar autenticación para rutas protegidas
try:
token = None
# 1. Intentar obtener token del header Authorization
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
# 2. Si no hay token en header, intentar obtener de cookies
if not token:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
user = auth_service.verify_access_token(token)
# Agregar usuario al contexto de la petición
request.state.user = user
request.state.authenticated = True
logger.debug(f"Usuario autenticado: {user.email} para {path}")
except (TokenExpiredError, InvalidTokenError) as e:
logger.warning(f"Token inválido para {path}: {e.message}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=e.message,
headers={"WWW-Authenticate": "Bearer"}
)
except AuthError as e:
logger.warning(f"Error de autenticación para {path}: {e.message}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
except HTTPException:
# Re-lanzar HTTPException sin modificar
raise
except Exception as e:
logger.error(f"Error inesperado en autenticación: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno de autenticación"
)
return await call_next(request)
# Dependencias de FastAPI para autenticación
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
) -> User:
"""Obtener usuario actual desde el token"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
try:
user = auth_service.verify_access_token(credentials.credentials)
return user
except TokenExpiredError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expirado",
headers={"WWW-Authenticate": "Bearer"}
)
except InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido",
headers={"WWW-Authenticate": "Bearer"}
)
except AuthError as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Obtener usuario activo actual"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Usuario desactivado"
)
return current_user
def require_role(required_role: UserRole):
"""Decorador para requerir un rol específico"""
def role_checker(current_user: User = Depends(get_current_active_user)) -> User:
if current_user.role != required_role and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Rol requerido: {required_role.value}"
)
return current_user
return role_checker
def require_scopes(required_scopes: List[str]):
"""Decorador para requerir scopes específicos"""
def scope_checker(
current_user: User = Depends(get_current_active_user),
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
try:
# Verificar scopes en el token
from jose import jwt
payload = jwt.decode(
credentials.credentials,
auth_service.secret_key,
algorithms=[auth_service.algorithm]
)
token_scopes = payload.get("scopes", [])
for scope in required_scopes:
if scope not in token_scopes:
raise InsufficientScopeError(scope)
return current_user
except InsufficientScopeError as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
except Exception as e:
logger.error(f"Error verificando scopes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error verificando permisos"
)
return scope_checker
# Dependencias específicas para diferentes niveles de acceso
async def get_admin_user(
current_user: User = Depends(require_role(UserRole.ADMIN))
) -> User:
"""Obtener usuario administrador"""
return current_user
async def get_user_with_mcp_read(
current_user: User = Depends(require_scopes(["mcp:read"]))
) -> User:
"""Obtener usuario con permisos de lectura MCP"""
return current_user
async def get_user_with_mcp_read_from_request(request: Request) -> User:
"""Obtener usuario con permisos de lectura MCP desde Request"""
# Verificar si hay usuario en el contexto (ya autenticado por middleware)
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
# Verificar scopes
if not hasattr(user, 'scopes') or "mcp:read" not in user.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permisos insuficientes. Se requiere scope 'mcp:read'"
)
return user
async def get_user_with_mcp_write(
current_user: User = Depends(require_scopes(["mcp:write"]))
) -> User:
"""Obtener usuario con permisos de escritura MCP"""
return current_user
# Funciones auxiliares
def get_user_from_request(request: Request) -> Optional[User]:
"""Obtener usuario desde el contexto de la petición"""
return getattr(request.state, 'user', None)
def is_authenticated(request: Request) -> bool:
"""Verificar si la petición está autenticada"""
return getattr(request.state, 'authenticated', False)
# Instancia del middleware
auth_middleware = AuthMiddleware()