auth_middleware_template.j2โข7.75 kB
{# Jinja2 template for authentication middleware #}
import os
import time
import jwt
from typing import Dict, List, Optional, Any, Union
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
# Configuration
# In a real app, these would be in environment variables or a secure config
API_KEY_NAME = "X-API-Key"
API_KEY = "mock-api-key-{{ random_suffix }}" # For testing purposes
JWT_SECRET_KEY = "mock-jwt-secret-{{ random_suffix }}" # For testing only
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_SECONDS = 3600 # 1 hour
# Security schemes
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
token_auth_scheme = HTTPBearer(auto_error=False)
# Mock user database
# In a real application, this would be in a database
mock_users_db = {
"admin": {
"username": "admin",
"full_name": "Admin User",
"email": "admin@example.com",
"disabled": False,
"roles": ["admin"]
},
"user": {
"username": "user",
"full_name": "Regular User",
"email": "user@example.com",
"disabled": False,
"roles": ["user"]
},
"guest": {
"username": "guest",
"full_name": "Guest User",
"email": "guest@example.com",
"disabled": False,
"roles": ["guest"]
}
}
# Models
class Token(BaseModel):
access_token: str
token_type: str
expires_at: int
user_info: Dict[str, Any]
class TokenData(BaseModel):
username: Optional[str] = None
roles: List[str] = []
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
roles: List[str] = []
def verify_api_key(api_key: str = Depends(api_key_header)) -> bool:
"""
Verify API key for authentication.
Args:
api_key: API key from X-API-Key header
Returns:
True if API key is valid
Raises:
HTTPException: If API key is invalid
"""
if api_key == API_KEY:
return True
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "ApiKey"},
)
def get_user(username: str) -> User:
"""
Get user from mock database.
Args:
username: Username to lookup
Returns:
User object if found
Raises:
HTTPException: If user not found or disabled
"""
if username in mock_users_db:
user_dict = mock_users_db[username]
user = User(**user_dict)
return user
# For mock server, if username not in db but has format test_user_*,
# create a test user on the fly
if username.startswith("test_user_"):
return User(
username=username,
email=f"{username}@example.com",
full_name=f"Test User {username.split('_')[-1]}",
disabled=False,
roles=["test_user"]
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {username} not found",
)
def decode_token(token: str) -> TokenData:
"""
Decode JWT token and extract user data.
Args:
token: JWT token string
Returns:
TokenData containing username and roles
Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Get roles from token payload
roles = payload.get("roles", [])
return TokenData(username=username, roles=roles)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def verify_jwt_token(token: HTTPAuthorizationCredentials = Depends(token_auth_scheme)) -> User:
"""
Verify JWT token and return associated user.
Args:
token: Bearer token from Authorization header
Returns:
User associated with the token
Raises:
HTTPException: If token is invalid or user is disabled
"""
if token is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Bearer authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = decode_token(token.credentials)
user = get_user(token_data.username)
if user.disabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user",
)
return user
def create_access_token(data: Dict[str, Any], expires_delta: Optional[int] = None) -> str:
"""
Create JWT access token.
Args:
data: Data to encode in token
expires_delta: Optional expiration time in seconds
Returns:
Encoded JWT token
"""
to_encode = data.copy()
expire = time.time() + (expires_delta or JWT_EXPIRATION_SECONDS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
return encoded_jwt
def generate_token_response(username: str, password: str) -> Token:
"""
Generate token response for OAuth2 compatible token endpoint.
Args:
username: Username for authentication
password: Password for authentication (ignored in mock)
Returns:
Token response with access token
Raises:
HTTPException: If username is invalid
"""
# For mock API, any valid username works with any password
# In a real app, you would verify the password here
user = get_user(username)
# Create token payload
token_data = {
"sub": user.username,
"roles": user.roles
}
# Create token
expires = JWT_EXPIRATION_SECONDS
access_token = create_access_token(token_data, expires_delta=expires)
# Create token response
token_response = Token(
access_token=access_token,
token_type="bearer",
expires_at=int(time.time() + expires),
user_info={
"username": user.username,
"full_name": user.full_name,
"email": user.email,
"roles": user.roles
}
)
return token_response
def has_role(required_roles: List[str]):
"""
Dependency to check if user has required role.
Args:
required_roles: List of roles, any of which grants access
Returns:
Dependency function checking user roles
"""
async def role_checker(user: User = Depends(verify_jwt_token)):
# Admin role has access to everything
if "admin" in user.roles:
return user
# Check if user has any of the required roles
for role in required_roles:
if role in user.roles:
return user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required roles: {required_roles}"
)
return role_checker