"""
Модуль безопасности.
Содержит функции для работы с паролями, токенами и аутентификацией.
"""
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
from app.models.user import User
from app.schemas.token import TokenPayload
# Настройка контекста для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Настройка OAuth2 для получения токена
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login",
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверка пароля."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Получение хеша пароля."""
return pwd_context.hash(password)
def create_access_token(
subject: Union[str, Any],
expires_delta: Optional[timedelta] = None,
) -> str:
"""
Создание JWT токена доступа.
Args:
subject: Идентификатор пользователя или другие данные
expires_delta: Время жизни токена
Returns:
Закодированный JWT токен
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES,
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""
Получение текущего пользователя по токену.
Args:
token: JWT токен
Returns:
Объект пользователя
Raises:
HTTPException: Если токен недействителен или пользователь не найден
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Не удалось проверить учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenPayload(sub=user_id) # noqa: F841
# Для отладки можно вывести: print(f"Token data: {token_data}")
except JWTError as err:
raise credentials_exception from err
# Здесь должен быть код для получения пользователя из БД
# user = get_user_by_id(token_data.sub)
# if user is None:
# raise credentials_exception
# return user
# Временная заглушка
raise NotImplementedError("Функция получения пользователя не реализована")