from datetime import datetime, timedelta
from typing import Optional, Union
from jose import JWTError, jwt
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models import User
import logging
# Configure logging
logger = logging.getLogger(__name__)
class TokenManager:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire_minutes = 30
self.refresh_token_expire_days = 30
def create_access_token(
self,
data: dict,
remember: bool = False,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a new access token with optional remember functionality
"""
try:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
if remember:
expire = datetime.utcnow() + timedelta(days=self.refresh_token_expire_days)
else:
expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
logger.info(f"Access token created successfully for user {data.get('sub')}")
return encoded_jwt
except Exception as e:
logger.error(f"Error creating access token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error creating access token"
)
def verify_token(self, token: str) -> dict:
"""
Verify and decode a JWT token
"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except JWTError as e:
logger.error(f"Token verification failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_user(self, token: str, db: Session) -> User:
"""
Get the current user from the token
"""
try:
payload = self.verify_token(token)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.email == username).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
except Exception as e:
logger.error(f"Error getting current user: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)