"""Authentication and authorization utilities"""
from datetime import datetime, timedelta
from typing import Optional
import jwt
from jwt.exceptions import PyJWTError as JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from .config import settings
from .database import get_db
from .models import User
from .schemas import TokenData
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# HTTP Bearer token scheme (auto_error=False makes it optional)
security = HTTPBearer(auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password (bcrypt has 72 byte limit)"""
# Ensure password is within bcrypt's 72 byte limit
if len(password.encode('utf-8')) > 72:
raise ValueError("Password cannot exceed 72 bytes")
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token
Args:
data: Data to encode in the token
expires_delta: Optional expiration time delta
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
return encoded_jwt
def verify_token(token: str) -> TokenData:
"""
Verify and decode a JWT token
Args:
token: JWT token to verify
Returns:
TokenData with user information
Raises:
HTTPException: If token is invalid
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
return token_data
except JWTError:
raise credentials_exception
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
Get the current authenticated user from the JWT token
Args:
credentials: HTTP Bearer credentials from request
db: Database session
Returns:
Current user object
Raises:
HTTPException: If user is not found or token is invalid
"""
token = credentials.credentials
token_data = verify_token(token)
user = db.query(User).filter(User.email == token_data.email).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Get the current user if authentication is enabled and token is provided.
Returns None if authentication is disabled.
Args:
credentials: HTTP Bearer credentials from request (optional)
db: Database session
Returns:
Current user object if authenticated, None if auth is disabled
Raises:
HTTPException: If auth is required and token is invalid
"""
# If authentication is not required, return None (no user context)
if not settings.require_auth:
return None
# If authentication is required but no credentials provided
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Please provide a valid Bearer token.",
headers={"WWW-Authenticate": "Bearer"},
)
# Validate the token
token = credentials.credentials
token_data = verify_token(token)
user = db.query(User).filter(User.email == token_data.email).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""
Authenticate a user with email and password
Args:
db: Database session
email: User email
password: Plain text password
Returns:
User object if authentication successful, None otherwise
"""
user = db.query(User).filter(User.email == email).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_user(db: Session, email: str, password: str, full_name: Optional[str] = None) -> User:
"""
Create a new user
Args:
db: Database session
email: User email
password: Plain text password
full_name: Optional full name
Returns:
Created user object
Raises:
ValueError: If user with email already exists
"""
# Check if user exists
existing_user = db.query(User).filter(User.email == email).first()
if existing_user:
raise ValueError("User with this email already exists")
# Create new user
hashed_password = get_password_hash(password)
user = User(
email=email,
hashed_password=hashed_password,
full_name=full_name
)
db.add(user)
db.commit()
db.refresh(user)
return user