base.pyโข12.9 kB
#!/usr/bin/env python3
"""
Authentication Base Classes
Base classes and interfaces for authentication and authorization.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class UserRole(str, Enum):
    """User roles for authorization."""
    ADMIN = "admin"
    DOC_ADMIN = "doc_admin"
    API_USER = "api_user"
    READONLY = "readonly"
class Permission(str, Enum):
    """System permissions."""
    # System administration
    SYSTEM_ADMIN = "system:admin"
    SYSTEM_CONFIG = "system:config"
    SYSTEM_MONITOR = "system:monitor"
    
    # Documentation management
    DOC_READ = "doc:read"
    DOC_WRITE = "doc:write"
    DOC_DELETE = "doc:delete"
    DOC_CONFIG = "doc:config"
    
    # User management
    USER_READ = "user:read"
    USER_WRITE = "user:write"
    USER_DELETE = "user:delete"
    
    # API access
    API_READ = "api:read"
    API_WRITE = "api:write"
@dataclass
class User:
    """User information."""
    id: str
    username: str
    email: Optional[str] = None
    role: UserRole = UserRole.API_USER
    permissions: List[Permission] = None
    is_active: bool = True
    created_at: Optional[datetime] = None
    last_login: Optional[datetime] = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        """Initialize default values."""
        if self.permissions is None:
            self.permissions = self._get_default_permissions()
        if self.metadata is None:
            self.metadata = {}
        if self.created_at is None:
            self.created_at = datetime.utcnow()
    
    def _get_default_permissions(self) -> List[Permission]:
        """Get default permissions based on user role."""
        role_permissions = {
            UserRole.ADMIN: [
                Permission.SYSTEM_ADMIN,
                Permission.SYSTEM_CONFIG,
                Permission.SYSTEM_MONITOR,
                Permission.DOC_READ,
                Permission.DOC_WRITE,
                Permission.DOC_DELETE,
                Permission.DOC_CONFIG,
                Permission.USER_READ,
                Permission.USER_WRITE,
                Permission.USER_DELETE,
                Permission.API_READ,
                Permission.API_WRITE,
            ],
            UserRole.DOC_ADMIN: [
                Permission.DOC_READ,
                Permission.DOC_WRITE,
                Permission.DOC_DELETE,
                Permission.DOC_CONFIG,
                Permission.API_READ,
                Permission.API_WRITE,
            ],
            UserRole.API_USER: [
                Permission.DOC_READ,
                Permission.API_READ,
            ],
            UserRole.READONLY: [
                Permission.DOC_READ,
                Permission.API_READ,
            ],
        }
        return role_permissions.get(self.role, [])
    
    def has_permission(self, permission: Permission) -> bool:
        """Check if user has a specific permission."""
        return permission in self.permissions
    
    def has_any_permission(self, permissions: List[Permission]) -> bool:
        """Check if user has any of the specified permissions."""
        return any(perm in self.permissions for perm in permissions)
    
    def has_all_permissions(self, permissions: List[Permission]) -> bool:
        """Check if user has all of the specified permissions."""
        return all(perm in self.permissions for perm in permissions)
    
    def add_permission(self, permission: Permission) -> None:
        """Add a permission to the user."""
        if permission not in self.permissions:
            self.permissions.append(permission)
    
    def remove_permission(self, permission: Permission) -> None:
        """Remove a permission from the user."""
        if permission in self.permissions:
            self.permissions.remove(permission)
    
    def update_last_login(self) -> None:
        """Update the last login timestamp."""
        self.last_login = datetime.utcnow()
@dataclass
class AuthToken:
    """Authentication token information."""
    token: str
    token_type: str = "bearer"
    expires_at: Optional[datetime] = None
    user_id: Optional[str] = None
    scopes: List[str] = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        """Initialize default values."""
        if self.scopes is None:
            self.scopes = []
        if self.metadata is None:
            self.metadata = {}
    
    def is_expired(self) -> bool:
        """Check if the token is expired."""
        if self.expires_at is None:
            return False
        return datetime.utcnow() > self.expires_at
    
    def is_valid(self) -> bool:
        """Check if the token is valid (not expired and has token value)."""
        return bool(self.token) and not self.is_expired()
    
    def time_until_expiry(self) -> Optional[timedelta]:
        """Get time until token expires."""
        if self.expires_at is None:
            return None
        return self.expires_at - datetime.utcnow()
class AuthenticationError(Exception):
    """Authentication related errors."""
    pass
class AuthorizationError(Exception):
    """Authorization related errors."""
    pass
class TokenExpiredError(AuthenticationError):
    """Token has expired."""
    pass
class InvalidTokenError(AuthenticationError):
    """Invalid token format or signature."""
    pass
class InsufficientPermissionsError(AuthorizationError):
    """User lacks required permissions."""
    pass
class BaseAuthenticator(ABC):
    """Base class for authentication providers."""
    
    @abstractmethod
    async def authenticate(self, credentials: Dict[str, Any]) -> Optional[User]:
        """Authenticate user with provided credentials.
        
        Args:
            credentials: Authentication credentials (varies by provider)
            
        Returns:
            User object if authentication successful, None otherwise
            
        Raises:
            AuthenticationError: If authentication fails
        """
        pass
    
    @abstractmethod
    async def validate_token(self, token: str) -> Optional[AuthToken]:
        """Validate an authentication token.
        
        Args:
            token: Token to validate
            
        Returns:
            AuthToken object if valid, None otherwise
            
        Raises:
            InvalidTokenError: If token format is invalid
            TokenExpiredError: If token has expired
        """
        pass
    
    @abstractmethod
    async def create_token(self, user: User, **kwargs) -> AuthToken:
        """Create an authentication token for a user.
        
        Args:
            user: User to create token for
            **kwargs: Additional token parameters
            
        Returns:
            AuthToken object
            
        Raises:
            AuthenticationError: If token creation fails
        """
        pass
    
    @abstractmethod
    async def revoke_token(self, token: str) -> bool:
        """Revoke an authentication token.
        
        Args:
            token: Token to revoke
            
        Returns:
            True if token was revoked, False if not found
        """
        pass
    
    async def refresh_token(self, token: str) -> Optional[AuthToken]:
        """Refresh an authentication token.
        
        Args:
            token: Token to refresh
            
        Returns:
            New AuthToken if refresh successful, None otherwise
            
        Raises:
            AuthenticationError: If refresh fails
        """
        # Default implementation - subclasses can override
        auth_token = await self.validate_token(token)
        if not auth_token or not auth_token.user_id:
            return None
        
        # Get user and create new token
        user = await self.get_user(auth_token.user_id)
        if not user:
            return None
        
        return await self.create_token(user)
    
    @abstractmethod
    async def get_user(self, user_id: str) -> Optional[User]:
        """Get user by ID.
        
        Args:
            user_id: User identifier
            
        Returns:
            User object if found, None otherwise
        """
        pass
    
    async def get_user_by_token(self, token: str) -> Optional[User]:
        """Get user associated with a token.
        
        Args:
            token: Authentication token
            
        Returns:
            User object if token is valid and user exists, None otherwise
        """
        auth_token = await self.validate_token(token)
        if not auth_token or not auth_token.user_id:
            return None
        
        return await self.get_user(auth_token.user_id)
class BaseAuthorizer(ABC):
    """Base class for authorization providers."""
    
    @abstractmethod
    async def check_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> bool:
        """Check if user has permission for a resource.
        
        Args:
            user: User to check
            permission: Required permission
            resource: Optional resource identifier
            
        Returns:
            True if user has permission, False otherwise
        """
        pass
    
    async def check_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool:
        """Check if user has all specified permissions.
        
        Args:
            user: User to check
            permissions: List of required permissions
            resource: Optional resource identifier
            
        Returns:
            True if user has all permissions, False otherwise
        """
        for permission in permissions:
            if not await self.check_permission(user, permission, resource):
                return False
        return True
    
    async def check_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool:
        """Check if user has any of the specified permissions.
        
        Args:
            user: User to check
            permissions: List of permissions (user needs at least one)
            resource: Optional resource identifier
            
        Returns:
            True if user has any permission, False otherwise
        """
        for permission in permissions:
            if await self.check_permission(user, permission, resource):
                return True
        return False
    
    async def require_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> None:
        """Require user to have a specific permission.
        
        Args:
            user: User to check
            permission: Required permission
            resource: Optional resource identifier
            
        Raises:
            InsufficientPermissionsError: If user lacks permission
        """
        if not await self.check_permission(user, permission, resource):
            raise InsufficientPermissionsError(f"User {user.username} lacks permission {permission.value}")
    
    async def require_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None:
        """Require user to have all specified permissions.
        
        Args:
            user: User to check
            permissions: List of required permissions
            resource: Optional resource identifier
            
        Raises:
            InsufficientPermissionsError: If user lacks any permission
        """
        missing_permissions = []
        for permission in permissions:
            if not await self.check_permission(user, permission, resource):
                missing_permissions.append(permission.value)
        
        if missing_permissions:
            raise InsufficientPermissionsError(
                f"User {user.username} lacks permissions: {', '.join(missing_permissions)}"
            )
    
    async def require_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None:
        """Require user to have at least one of the specified permissions.
        
        Args:
            user: User to check
            permissions: List of permissions (user needs at least one)
            resource: Optional resource identifier
            
        Raises:
            InsufficientPermissionsError: If user lacks all permissions
        """
        if not await self.check_any_permission(user, permissions, resource):
            permission_names = [p.value for p in permissions]
            raise InsufficientPermissionsError(
                f"User {user.username} lacks any of the required permissions: {', '.join(permission_names)}"
            )