#!/usr/bin/env python3
"""
Middleware de autenticación para FastAPI
"""
from typing import Optional, List, Callable
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from functools import wraps
try:
from .auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError
from .auth_service import auth_service
from .logger import get_logger
except ImportError:
from auth_models import User, UserRole, AuthError, TokenExpiredError, InvalidTokenError, InsufficientScopeError
from auth_service import auth_service
from logger import get_logger
logger = get_logger("auth-middleware")
# Esquema de seguridad Bearer
security = HTTPBearer(auto_error=False)
class AuthMiddleware:
"""Middleware de autenticación"""
def __init__(self):
self.public_paths = {
"/",
"/health",
"/tools",
"/docs",
"/openapi.json",
"/redoc",
"/login",
"/auth/success",
"/test-page"
# Removido "/mcp" para que el middleware procese la autenticación
}
self.auth_paths = {
"/auth",
"/auth/callback",
"/auth/token",
"/auth/providers"
}
def is_public_path(self, path: str) -> bool:
"""Verificar si una ruta es pública"""
# Rutas exactas públicas
if path in self.public_paths:
return True
# Rutas de autenticación
if any(path.startswith(auth_path) for auth_path in self.auth_paths):
return True
# Rutas estáticas
if path.startswith("/static/") or path.startswith("/favicon"):
return True
return False
async def __call__(self, request: Request, call_next):
"""Middleware principal"""
path = request.url.path
# Permitir rutas públicas
if self.is_public_path(path):
return await call_next(request)
# Para rutas MCP, manejar autenticación de forma especial
if path == "/mcp":
# Leer el body de la petición para determinar el método MCP
body = await request.body()
try:
import json
mcp_request = json.loads(body.decode())
mcp_method = mcp_request.get("method", "")
# Métodos MCP que NO requieren autenticación
public_mcp_methods = ["initialize", "notifications/initialized", "tools/list"]
if mcp_method in public_mcp_methods:
logger.info(f"Método MCP público: {mcp_method}")
return await call_next(request)
# Para tools/call, verificar autenticación
logger.info(f"Método MCP que requiere autenticación: {mcp_method}")
except Exception as e:
logger.warning(f"Error parseando petición MCP: {e}")
# Si no se puede parsear, requerir autenticación por seguridad
pass
# Verificar autenticación para rutas protegidas
try:
token = None
# 1. Intentar obtener token del header Authorization
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
# 2. Si no hay token en header, intentar obtener de cookies
if not token:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
user = auth_service.verify_access_token(token)
# Para rutas MCP que requieren autenticación, verificar scopes
if path == "/mcp":
# Decodificar token para verificar scopes
from jose import jwt
payload = jwt.decode(
token,
auth_service.secret_key,
algorithms=[auth_service.algorithm]
)
token_scopes = payload.get("scopes", [])
if "mcp:read" not in token_scopes:
logger.warning(f"Scope 'mcp:read' no encontrado en {token_scopes}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permisos insuficientes. Se requiere scope 'mcp:read'"
)
logger.info(f"Scope 'mcp:read' verificado correctamente para {user.email}")
# Agregar usuario al contexto de la petición
request.state.user = user
request.state.authenticated = True
logger.debug(f"Usuario autenticado: {user.email} para {path}")
except (TokenExpiredError, InvalidTokenError) as e:
logger.warning(f"Token inválido para {path}: {e.message}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=e.message,
headers={"WWW-Authenticate": "Bearer"}
)
except AuthError as e:
logger.warning(f"Error de autenticación para {path}: {e.message}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
except HTTPException:
# Re-lanzar HTTPException sin modificar
raise
except Exception as e:
logger.error(f"Error inesperado en autenticación: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno de autenticación"
)
return await call_next(request)
# Dependencias de FastAPI para autenticación
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
) -> User:
"""Obtener usuario actual desde el token"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
try:
user = auth_service.verify_access_token(credentials.credentials)
return user
except TokenExpiredError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expirado",
headers={"WWW-Authenticate": "Bearer"}
)
except InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido",
headers={"WWW-Authenticate": "Bearer"}
)
except AuthError as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Obtener usuario activo actual"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Usuario desactivado"
)
return current_user
def require_role(required_role: UserRole):
"""Decorador para requerir un rol específico"""
def role_checker(current_user: User = Depends(get_current_active_user)) -> User:
if current_user.role != required_role and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Rol requerido: {required_role.value}"
)
return current_user
return role_checker
def require_scopes(required_scopes: List[str]):
"""Decorador para requerir scopes específicos"""
def scope_checker(
current_user: User = Depends(get_current_active_user),
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
try:
# Verificar scopes en el token
from jose import jwt
payload = jwt.decode(
credentials.credentials,
auth_service.secret_key,
algorithms=[auth_service.algorithm]
)
token_scopes = payload.get("scopes", [])
for scope in required_scopes:
if scope not in token_scopes:
raise InsufficientScopeError(scope)
return current_user
except InsufficientScopeError as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=e.message
)
except Exception as e:
logger.error(f"Error verificando scopes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error verificando permisos"
)
return scope_checker
# Dependencias específicas para diferentes niveles de acceso
async def get_admin_user(
current_user: User = Depends(require_role(UserRole.ADMIN))
) -> User:
"""Obtener usuario administrador"""
return current_user
async def get_user_with_mcp_read(
current_user: User = Depends(require_scopes(["mcp:read"]))
) -> User:
"""Obtener usuario con permisos de lectura MCP"""
return current_user
async def get_user_with_mcp_read_from_request(request: Request) -> User:
"""Obtener usuario con permisos de lectura MCP desde Request"""
# El middleware ya se encarga de la autenticación y verificación de scopes
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token de acceso requerido",
headers={"WWW-Authenticate": "Bearer"}
)
return user
async def get_user_with_mcp_write(
current_user: User = Depends(require_scopes(["mcp:write"]))
) -> User:
"""Obtener usuario con permisos de escritura MCP"""
return current_user
# Funciones auxiliares
def get_user_from_request(request: Request) -> Optional[User]:
"""Obtener usuario desde el contexto de la petición"""
return getattr(request.state, 'user', None)
def is_authenticated(request: Request) -> bool:
"""Verificar si la petición está autenticada"""
return getattr(request.state, 'authenticated', False)
# Instancia del middleware
auth_middleware = AuthMiddleware()