"""
Authentication Router
Handles user authentication endpoints: login, register, logout, token refresh
Uses Supabase/PostgreSQL database for persistent user storage
"""
from fastapi import APIRouter, HTTPException, Depends, Header, status
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, Dict, Any
import logging
from datetime import datetime, timedelta
from .jwt_handler import JWTHandler
from .blacklist import TokenBlacklist
from ..services.user_service import UserService
# Logger setup
logger = logging.getLogger(__name__)
# Initialize router
router = APIRouter(tags=["Authentication"])
# Initialize JWT handler and token blacklist
jwt_handler = JWTHandler()
token_blacklist = TokenBlacklist()
# Initialize user service
user_service = UserService()
# Request/Response Models
class LoginRequest(BaseModel):
username: str = Field(..., description="Username or email")
password: str = Field(..., description="User password")
class RegisterRequest(BaseModel):
email: EmailStr = Field(..., description="User email address")
username: str = Field(..., description="Unique username")
password: str = Field(..., description="User password (min 8 characters)")
full_name: str = Field(..., description="Full name")
class TokenRefreshRequest(BaseModel):
refresh_token: str = Field(..., description="Refresh token")
class AuthResponse(BaseModel):
access_token: str = Field(..., description="JWT access token")
refresh_token: str = Field(..., description="Refresh token")
token_type: str = Field(default="bearer", description="Token type")
expires_in: int = Field(..., description="Token expiration time in seconds")
user: Dict[str, Any] = Field(..., description="User information")
class UserInfo(BaseModel):
id: int
email: EmailStr
username: str
full_name: str
is_active: bool
created_at: datetime
last_login: Optional[datetime] = None
# Utility Functions
async def get_current_user(authorization: Optional[str] = Header(None)) -> Dict[str, Any]:
"""Dependency to get current authenticated user"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header missing or invalid",
headers={"WWW-Authenticate": "Bearer"},
)
token = authorization.split(" ")[1]
try:
# Verify access token
payload = jwt_handler.verify_token(token)
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if token is blacklisted
if await token_blacklist.check_token(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user from database
user = await user_service.get_user_by_id(user_id)
if not user or not user.get("is_active"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
headers={"WWW-Authenticate": "Bearer"},
)
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Token verification error: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate token",
headers={"WWW-Authenticate": "Bearer"},
)
# Authentication Endpoints
@router.post("/auth/token", response_model=AuthResponse, status_code=status.HTTP_200_OK)
async def login_user(login_data: LoginRequest):
"""
User login endpoint
"""
try:
# Check if login is by email or username
user = await user_service.get_user_by_email(login_data.username)
if not user:
user = await user_service.get_user_by_username(login_data.username)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify password
if not user_service.verify_password_hash(login_data.password, user["password_hash"], user["salt"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Update last login
await user_service.update_last_login(user["id"])
# Create tokens
token_pair = jwt_handler.create_token_pair({
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"]
})
# Add refresh token to blacklist temporarily (will be removed when used)
await token_blacklist.add_token(token_pair["refresh_token"], str(user["id"]))
# Return user response
user_info = {
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"],
"is_active": user["is_active"],
"created_at": user["created_at"],
"last_login": user.get("last_login")
}
logger.info(f"User logged in successfully: {user['username']}")
return AuthResponse(
access_token=token_pair["access_token"],
refresh_token=token_pair["refresh_token"],
token_type="bearer",
expires_in=jwt_handler.get_token_expiry(),
user=user_info
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Login error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error during login",
)
@router.post("/auth/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
async def register_user(register_data: RegisterRequest):
"""
User registration endpoint
"""
try:
# Check if email already exists
existing_user = await user_service.get_user_by_email(register_data.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Check if username already exists
existing_user = await user_service.get_user_by_username(register_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken",
)
# Create user
user_id = await user_service.create_user(
email=register_data.email,
username=register_data.username,
password=register_data.password,
full_name=register_data.full_name
)
# Get created user
user = await user_service.get_user_by_id(user_id)
# Create tokens
token_pair = jwt_handler.create_token_pair({
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"]
})
# Add refresh token to blacklist temporarily
await token_blacklist.add_token(token_pair["refresh_token"], str(user["id"]))
# Return user response
user_info = {
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"],
"is_active": user["is_active"],
"created_at": user["created_at"],
"last_login": user.get("last_login")
}
logger.info(f"User registered successfully: {user['username']}")
return AuthResponse(
access_token=token_pair["access_token"],
refresh_token=token_pair["refresh_token"],
token_type="bearer",
expires_in=jwt_handler.get_token_expiry(),
user=user_info
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Registration error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error during registration",
)
@router.post("/auth/refresh", response_model=AuthResponse, status_code=status.HTTP_200_OK)
async def refresh_token(refresh_data: TokenRefreshRequest):
"""
Refresh access token endpoint
"""
try:
# Verify refresh token
payload = jwt_handler.verify_refresh_token(refresh_data.refresh_token)
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if token is blacklisted
if await token_blacklist.check_token(refresh_data.refresh_token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user
user = await user_service.get_user_by_id(user_id)
if not user or not user.get("is_active"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
headers={"WWW-Authenticate": "Bearer"},
)
# Revoke old refresh token
await token_blacklist.remove_token(refresh_data.refresh_token)
# Create new token pair
token_pair = jwt_handler.create_token_pair({
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"]
})
# Add new refresh token to blacklist
await token_blacklist.add_token(token_pair["refresh_token"], str(user["id"]))
# Return user response
user_info = {
"id": user["id"],
"email": user["email"],
"username": user["username"],
"full_name": user["full_name"],
"is_active": user["is_active"],
"created_at": user["created_at"],
"last_login": user.get("last_login")
}
logger.info(f"Token refreshed for user: {user['username']}")
return AuthResponse(
access_token=token_pair["access_token"],
refresh_token=token_pair["refresh_token"],
token_type="bearer",
expires_in=jwt_handler.get_token_expiry(),
user=user_info
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Token refresh error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error during token refresh",
)
@router.post("/auth/logout", status_code=status.HTTP_200_OK)
async def logout_user(current_user: Dict[str, Any] = Depends(get_current_user)):
"""
Logout endpoint - blacklists current access token
"""
try:
# Get authorization header
authorization = Header(None)
# Access token would be passed in Authorization header
# For now, we'll just log the logout
logger.info(f"User logged out: {current_user['username']}")
return {"message": "Logged out successfully"}
except Exception as e:
logger.error(f"Logout error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error during logout",
)
@router.get("/auth/me", response_model=UserInfo, status_code=status.HTTP_200_OK)
async def get_current_user_info(current_user: Dict[str, Any] = Depends(get_current_user)):
"""
Get current user information
"""
try:
return UserInfo(
id=current_user["id"],
email=current_user["email"],
username=current_user["username"],
full_name=current_user["full_name"],
is_active=current_user["is_active"],
created_at=current_user["created_at"],
last_login=current_user.get("last_login")
)
except Exception as e:
logger.error(f"Get user info error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error",
)