#!/usr/bin/env python3
"""
Authentication Base Classes
Base classes and interfaces for authentication and authorization.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class UserRole(str, Enum):
"""User roles for authorization."""
ADMIN = "admin"
DOC_ADMIN = "doc_admin"
API_USER = "api_user"
READONLY = "readonly"
class Permission(str, Enum):
"""System permissions."""
# System administration
SYSTEM_ADMIN = "system:admin"
SYSTEM_CONFIG = "system:config"
SYSTEM_MONITOR = "system:monitor"
# Documentation management
DOC_READ = "doc:read"
DOC_WRITE = "doc:write"
DOC_DELETE = "doc:delete"
DOC_CONFIG = "doc:config"
# User management
USER_READ = "user:read"
USER_WRITE = "user:write"
USER_DELETE = "user:delete"
# API access
API_READ = "api:read"
API_WRITE = "api:write"
@dataclass
class User:
"""User information."""
id: str
username: str
email: Optional[str] = None
role: UserRole = UserRole.API_USER
permissions: List[Permission] = None
is_active: bool = True
created_at: Optional[datetime] = None
last_login: Optional[datetime] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
"""Initialize default values."""
if self.permissions is None:
self.permissions = self._get_default_permissions()
if self.metadata is None:
self.metadata = {}
if self.created_at is None:
self.created_at = datetime.utcnow()
def _get_default_permissions(self) -> List[Permission]:
"""Get default permissions based on user role."""
role_permissions = {
UserRole.ADMIN: [
Permission.SYSTEM_ADMIN,
Permission.SYSTEM_CONFIG,
Permission.SYSTEM_MONITOR,
Permission.DOC_READ,
Permission.DOC_WRITE,
Permission.DOC_DELETE,
Permission.DOC_CONFIG,
Permission.USER_READ,
Permission.USER_WRITE,
Permission.USER_DELETE,
Permission.API_READ,
Permission.API_WRITE,
],
UserRole.DOC_ADMIN: [
Permission.DOC_READ,
Permission.DOC_WRITE,
Permission.DOC_DELETE,
Permission.DOC_CONFIG,
Permission.API_READ,
Permission.API_WRITE,
],
UserRole.API_USER: [
Permission.DOC_READ,
Permission.API_READ,
],
UserRole.READONLY: [
Permission.DOC_READ,
Permission.API_READ,
],
}
return role_permissions.get(self.role, [])
def has_permission(self, permission: Permission) -> bool:
"""Check if user has a specific permission."""
return permission in self.permissions
def has_any_permission(self, permissions: List[Permission]) -> bool:
"""Check if user has any of the specified permissions."""
return any(perm in self.permissions for perm in permissions)
def has_all_permissions(self, permissions: List[Permission]) -> bool:
"""Check if user has all of the specified permissions."""
return all(perm in self.permissions for perm in permissions)
def add_permission(self, permission: Permission) -> None:
"""Add a permission to the user."""
if permission not in self.permissions:
self.permissions.append(permission)
def remove_permission(self, permission: Permission) -> None:
"""Remove a permission from the user."""
if permission in self.permissions:
self.permissions.remove(permission)
def update_last_login(self) -> None:
"""Update the last login timestamp."""
self.last_login = datetime.utcnow()
@dataclass
class AuthToken:
"""Authentication token information."""
token: str
token_type: str = "bearer"
expires_at: Optional[datetime] = None
user_id: Optional[str] = None
scopes: List[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
"""Initialize default values."""
if self.scopes is None:
self.scopes = []
if self.metadata is None:
self.metadata = {}
def is_expired(self) -> bool:
"""Check if the token is expired."""
if self.expires_at is None:
return False
return datetime.utcnow() > self.expires_at
def is_valid(self) -> bool:
"""Check if the token is valid (not expired and has token value)."""
return bool(self.token) and not self.is_expired()
def time_until_expiry(self) -> Optional[timedelta]:
"""Get time until token expires."""
if self.expires_at is None:
return None
return self.expires_at - datetime.utcnow()
class AuthenticationError(Exception):
"""Authentication related errors."""
pass
class AuthorizationError(Exception):
"""Authorization related errors."""
pass
class TokenExpiredError(AuthenticationError):
"""Token has expired."""
pass
class InvalidTokenError(AuthenticationError):
"""Invalid token format or signature."""
pass
class InsufficientPermissionsError(AuthorizationError):
"""User lacks required permissions."""
pass
class BaseAuthenticator(ABC):
"""Base class for authentication providers."""
@abstractmethod
async def authenticate(self, credentials: Dict[str, Any]) -> Optional[User]:
"""Authenticate user with provided credentials.
Args:
credentials: Authentication credentials (varies by provider)
Returns:
User object if authentication successful, None otherwise
Raises:
AuthenticationError: If authentication fails
"""
pass
@abstractmethod
async def validate_token(self, token: str) -> Optional[AuthToken]:
"""Validate an authentication token.
Args:
token: Token to validate
Returns:
AuthToken object if valid, None otherwise
Raises:
InvalidTokenError: If token format is invalid
TokenExpiredError: If token has expired
"""
pass
@abstractmethod
async def create_token(self, user: User, **kwargs) -> AuthToken:
"""Create an authentication token for a user.
Args:
user: User to create token for
**kwargs: Additional token parameters
Returns:
AuthToken object
Raises:
AuthenticationError: If token creation fails
"""
pass
@abstractmethod
async def revoke_token(self, token: str) -> bool:
"""Revoke an authentication token.
Args:
token: Token to revoke
Returns:
True if token was revoked, False if not found
"""
pass
async def refresh_token(self, token: str) -> Optional[AuthToken]:
"""Refresh an authentication token.
Args:
token: Token to refresh
Returns:
New AuthToken if refresh successful, None otherwise
Raises:
AuthenticationError: If refresh fails
"""
# Default implementation - subclasses can override
auth_token = await self.validate_token(token)
if not auth_token or not auth_token.user_id:
return None
# Get user and create new token
user = await self.get_user(auth_token.user_id)
if not user:
return None
return await self.create_token(user)
@abstractmethod
async def get_user(self, user_id: str) -> Optional[User]:
"""Get user by ID.
Args:
user_id: User identifier
Returns:
User object if found, None otherwise
"""
pass
async def get_user_by_token(self, token: str) -> Optional[User]:
"""Get user associated with a token.
Args:
token: Authentication token
Returns:
User object if token is valid and user exists, None otherwise
"""
auth_token = await self.validate_token(token)
if not auth_token or not auth_token.user_id:
return None
return await self.get_user(auth_token.user_id)
class BaseAuthorizer(ABC):
"""Base class for authorization providers."""
@abstractmethod
async def check_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> bool:
"""Check if user has permission for a resource.
Args:
user: User to check
permission: Required permission
resource: Optional resource identifier
Returns:
True if user has permission, False otherwise
"""
pass
async def check_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool:
"""Check if user has all specified permissions.
Args:
user: User to check
permissions: List of required permissions
resource: Optional resource identifier
Returns:
True if user has all permissions, False otherwise
"""
for permission in permissions:
if not await self.check_permission(user, permission, resource):
return False
return True
async def check_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool:
"""Check if user has any of the specified permissions.
Args:
user: User to check
permissions: List of permissions (user needs at least one)
resource: Optional resource identifier
Returns:
True if user has any permission, False otherwise
"""
for permission in permissions:
if await self.check_permission(user, permission, resource):
return True
return False
async def require_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> None:
"""Require user to have a specific permission.
Args:
user: User to check
permission: Required permission
resource: Optional resource identifier
Raises:
InsufficientPermissionsError: If user lacks permission
"""
if not await self.check_permission(user, permission, resource):
raise InsufficientPermissionsError(f"User {user.username} lacks permission {permission.value}")
async def require_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None:
"""Require user to have all specified permissions.
Args:
user: User to check
permissions: List of required permissions
resource: Optional resource identifier
Raises:
InsufficientPermissionsError: If user lacks any permission
"""
missing_permissions = []
for permission in permissions:
if not await self.check_permission(user, permission, resource):
missing_permissions.append(permission.value)
if missing_permissions:
raise InsufficientPermissionsError(
f"User {user.username} lacks permissions: {', '.join(missing_permissions)}"
)
async def require_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None:
"""Require user to have at least one of the specified permissions.
Args:
user: User to check
permissions: List of permissions (user needs at least one)
resource: Optional resource identifier
Raises:
InsufficientPermissionsError: If user lacks all permissions
"""
if not await self.check_any_permission(user, permissions, resource):
permission_names = [p.value for p in permissions]
raise InsufficientPermissionsError(
f"User {user.username} lacks any of the required permissions: {', '.join(permission_names)}"
)