manager.pyโข14.2 kB
#!/usr/bin/env python3
"""
Authentication Manager
Unified authentication and authorization management.
"""
from typing import Any, Dict, List, Optional, Union
from enum import Enum
import structlog
from .base import (
BaseAuthenticator,
BaseAuthorizer,
User,
AuthToken,
Permission,
UserRole,
AuthenticationError,
AuthorizationError,
InvalidTokenError,
TokenExpiredError,
)
from .api_key import APIKeyAuthenticator, APIKeyAuthorizer
from .jwt_auth import JWTAuthenticator, JWTAuthorizer
from .oauth2 import OAuth2Authenticator, OAuth2Authorizer
logger = structlog.get_logger(__name__)
class AuthMethod(Enum):
"""Authentication methods."""
API_KEY = "api_key"
JWT = "jwt"
OAUTH2 = "oauth2"
class AuthManager:
"""Unified authentication and authorization manager."""
def __init__(
self,
default_method: AuthMethod = AuthMethod.API_KEY,
jwt_secret: Optional[str] = None,
oauth2_redirect_uri: Optional[str] = None,
secret_key: Optional[str] = None,
):
"""Initialize authentication manager.
Args:
default_method: Default authentication method
jwt_secret: JWT secret key (required for JWT auth)
oauth2_redirect_uri: OAuth2 redirect URI
"""
self.default_method = default_method
# Initialize authenticators
self._authenticators: Dict[AuthMethod, BaseAuthenticator] = {
AuthMethod.API_KEY: APIKeyAuthenticator(secret_key=secret_key or "default-secret-key"),
AuthMethod.JWT: JWTAuthenticator(secret_key=jwt_secret) if jwt_secret else None,
AuthMethod.OAUTH2: OAuth2Authenticator(default_redirect_uri=oauth2_redirect_uri) if oauth2_redirect_uri else None,
}
# Initialize authorizers
self._authorizers: Dict[AuthMethod, BaseAuthorizer] = {
AuthMethod.API_KEY: APIKeyAuthorizer(),
AuthMethod.JWT: JWTAuthorizer() if jwt_secret else None,
AuthMethod.OAUTH2: OAuth2Authorizer() if oauth2_redirect_uri else None,
}
# Remove None values
self._authenticators = {k: v for k, v in self._authenticators.items() if v is not None}
self._authorizers = {k: v for k, v in self._authorizers.items() if v is not None}
logger.info(
"Authentication manager initialized",
default_method=default_method.value,
available_methods=[method.value for method in self._authenticators.keys()],
)
def get_authenticator(self, method: AuthMethod) -> BaseAuthenticator:
"""Get authenticator for method.
Args:
method: Authentication method
Returns:
Authenticator instance
Raises:
ValueError: If method not available
"""
if method not in self._authenticators:
raise ValueError(f"Authentication method '{method.value}' not available")
return self._authenticators[method]
def get_authorizer(self, method: AuthMethod) -> BaseAuthorizer:
"""Get authorizer for method.
Args:
method: Authentication method
Returns:
Authorizer instance
Raises:
ValueError: If method not available
"""
if method not in self._authorizers:
raise ValueError(f"Authorization method '{method.value}' not available")
return self._authorizers[method]
async def authenticate(
self,
credentials: Dict[str, Any],
method: Optional[AuthMethod] = None,
) -> tuple[User, AuthMethod]:
"""Authenticate user with credentials.
Args:
credentials: Authentication credentials
method: Specific authentication method to use
Returns:
Tuple of (user, auth_method)
Raises:
AuthenticationError: If authentication fails
"""
method = method or self.default_method
try:
authenticator = self.get_authenticator(method)
user = await authenticator.authenticate(credentials)
logger.info(
"User authentication successful",
user_id=user.id,
method=method.value,
)
return user, method
except Exception as e:
logger.error(
"User authentication failed",
method=method.value,
error=str(e),
)
raise AuthenticationError(f"Authentication failed: {e}")
async def validate_token(
self,
token: str,
method: Optional[AuthMethod] = None,
) -> tuple[User, AuthMethod]:
"""Validate authentication token.
Args:
token: Authentication token
method: Specific authentication method to use
Returns:
Tuple of (user, auth_method)
Raises:
InvalidTokenError: If token is invalid
TokenExpiredError: If token is expired
"""
# If method not specified, try all available methods
methods_to_try = [method] if method else list(self._authenticators.keys())
last_error = None
for auth_method in methods_to_try:
try:
authenticator = self.get_authenticator(auth_method)
user = await authenticator.validate_token(token)
logger.info(
"Token validation successful",
user_id=user.id,
method=auth_method.value,
)
return user, auth_method
except (InvalidTokenError, TokenExpiredError) as e:
last_error = e
continue
except Exception as e:
logger.warning(
"Token validation error",
method=auth_method.value,
error=str(e),
)
last_error = e
continue
# If we get here, all methods failed
if last_error:
raise last_error
else:
raise InvalidTokenError("Token validation failed for all methods")
async def create_token(
self,
user: User,
method: Optional[AuthMethod] = None,
**kwargs,
) -> AuthToken:
"""Create authentication token for user.
Args:
user: User to create token for
method: Authentication method to use
**kwargs: Additional token parameters
Returns:
Authentication token
Raises:
AuthenticationError: If token creation fails
"""
method = method or self.default_method
try:
authenticator = self.get_authenticator(method)
token = await authenticator.create_token(user, **kwargs)
logger.info(
"Token creation successful",
user_id=user.id,
method=method.value,
)
return token
except Exception as e:
logger.error(
"Token creation failed",
user_id=user.id,
method=method.value,
error=str(e),
)
raise AuthenticationError(f"Token creation failed: {e}")
async def revoke_token(
self,
token: str,
method: Optional[AuthMethod] = None,
) -> bool:
"""Revoke authentication token.
Args:
token: Token to revoke
method: Authentication method to use
Returns:
True if revocation successful
"""
# If method not specified, try all available methods
methods_to_try = [method] if method else list(self._authenticators.keys())
for auth_method in methods_to_try:
try:
authenticator = self.get_authenticator(auth_method)
if await authenticator.revoke_token(token):
logger.info(
"Token revocation successful",
method=auth_method.value,
)
return True
except Exception as e:
logger.warning(
"Token revocation error",
method=auth_method.value,
error=str(e),
)
continue
return False
async def check_permission(
self,
user: User,
permission: Permission,
resource: Optional[str] = None,
method: Optional[AuthMethod] = None,
) -> bool:
"""Check if user has permission.
Args:
user: User to check
permission: Permission to check
resource: Optional resource identifier
method: Authorization method to use
Returns:
True if user has permission
"""
method = method or self.default_method
try:
authorizer = self.get_authorizer(method)
has_permission = await authorizer.check_permission(user, permission, resource)
logger.debug(
"Permission check",
user_id=user.id,
permission=permission.value,
resource=resource,
has_permission=has_permission,
method=method.value,
)
return has_permission
except Exception as e:
logger.error(
"Permission check failed",
user_id=user.id,
permission=permission.value,
resource=resource,
method=method.value,
error=str(e),
)
return False
async def require_permission(
self,
user: User,
permission: Permission,
resource: Optional[str] = None,
method: Optional[AuthMethod] = None,
) -> None:
"""Require user to have permission.
Args:
user: User to check
permission: Required permission
resource: Optional resource identifier
method: Authorization method to use
Raises:
AuthorizationError: If user lacks permission
"""
method = method or self.default_method
try:
authorizer = self.get_authorizer(method)
await authorizer.require_permission(user, permission, resource)
logger.debug(
"Permission requirement satisfied",
user_id=user.id,
permission=permission.value,
resource=resource,
method=method.value,
)
except AuthorizationError:
logger.warning(
"Permission requirement failed",
user_id=user.id,
permission=permission.value,
resource=resource,
method=method.value,
)
raise
except Exception as e:
logger.error(
"Permission requirement check failed",
user_id=user.id,
permission=permission.value,
resource=resource,
method=method.value,
error=str(e),
)
raise AuthorizationError(f"Permission check failed: {e}")
def get_available_methods(self) -> List[AuthMethod]:
"""Get list of available authentication methods.
Returns:
List of available authentication methods
"""
return list(self._authenticators.keys())
def is_method_available(self, method: AuthMethod) -> bool:
"""Check if authentication method is available.
Args:
method: Authentication method to check
Returns:
True if method is available
"""
return method in self._authenticators
# Convenience methods for specific authenticators
def get_api_key_authenticator(self) -> Optional[APIKeyAuthenticator]:
"""Get API key authenticator.
Returns:
API key authenticator or None if not available
"""
return self._authenticators.get(AuthMethod.API_KEY)
def get_jwt_authenticator(self) -> Optional[JWTAuthenticator]:
"""Get JWT authenticator.
Returns:
JWT authenticator or None if not available
"""
return self._authenticators.get(AuthMethod.JWT)
def get_oauth2_authenticator(self) -> Optional[OAuth2Authenticator]:
"""Get OAuth2 authenticator.
Returns:
OAuth2 authenticator or None if not available
"""
return self._authenticators.get(AuthMethod.OAUTH2)
def get_api_key_authorizer(self) -> Optional[APIKeyAuthorizer]:
"""Get API key authorizer.
Returns:
API key authorizer or None if not available
"""
return self._authorizers.get(AuthMethod.API_KEY)
def get_jwt_authorizer(self) -> Optional[JWTAuthorizer]:
"""Get JWT authorizer.
Returns:
JWT authorizer or None if not available
"""
return self._authorizers.get(AuthMethod.JWT)
def get_oauth2_authorizer(self) -> Optional[OAuth2Authorizer]:
"""Get OAuth2 authorizer.
Returns:
OAuth2 authorizer or None if not available
"""
return self._authorizers.get(AuthMethod.OAUTH2)