api_key.pyโข13.4 kB
#!/usr/bin/env python3
"""
API Key Authentication
Implementation of API key-based authentication.
"""
import secrets
import hashlib
import hmac
from typing import Any, Dict, List, Optional
from datetime import datetime, timedelta
import structlog
from .base import (
BaseAuthenticator,
BaseAuthorizer,
User,
AuthToken,
Permission,
UserRole,
AuthenticationError,
InvalidTokenError,
TokenExpiredError,
)
logger = structlog.get_logger(__name__)
class APIKey:
"""API Key model."""
def __init__(
self,
key_id: str,
key_hash: str,
user_id: str,
name: str,
scopes: List[str] = None,
expires_at: Optional[datetime] = None,
is_active: bool = True,
created_at: Optional[datetime] = None,
last_used: Optional[datetime] = None,
metadata: Dict[str, Any] = None,
):
self.key_id = key_id
self.key_hash = key_hash
self.user_id = user_id
self.name = name
self.scopes = scopes or []
self.expires_at = expires_at
self.is_active = is_active
self.created_at = created_at or datetime.utcnow()
self.last_used = last_used
self.metadata = metadata or {}
def is_expired(self) -> bool:
"""Check if the API key is expired."""
if self.expires_at is None:
return False
return datetime.utcnow() > self.expires_at
def is_valid(self) -> bool:
"""Check if the API key is valid."""
return self.is_active and not self.is_expired()
def update_last_used(self) -> None:
"""Update the last used timestamp."""
self.last_used = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
"key_id": self.key_id,
"user_id": self.user_id,
"name": self.name,
"scopes": self.scopes,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"is_active": self.is_active,
"created_at": self.created_at.isoformat(),
"last_used": self.last_used.isoformat() if self.last_used else None,
"metadata": self.metadata,
}
class APIKeyAuthenticator(BaseAuthenticator):
"""API Key-based authenticator."""
def __init__(self, secret_key: str, default_expiry_days: int = 365):
"""Initialize API key authenticator.
Args:
secret_key: Secret key for HMAC signing
default_expiry_days: Default expiry period for new keys
"""
self.secret_key = secret_key.encode()
self.default_expiry_days = default_expiry_days
self._api_keys: Dict[str, APIKey] = {} # key_id -> APIKey
self._users: Dict[str, User] = {} # user_id -> User
logger.info("API Key authenticator initialized",
default_expiry_days=default_expiry_days)
def _generate_key_pair(self) -> tuple[str, str]:
"""Generate a new API key pair (key_id, secret).
Returns:
Tuple of (key_id, secret)
"""
# Generate key ID (public identifier)
key_id = f"ak_{secrets.token_urlsafe(16)}"
# Generate secret (private key)
secret = secrets.token_urlsafe(32)
return key_id, secret
def _hash_secret(self, secret: str) -> str:
"""Hash API key secret using HMAC.
Args:
secret: Raw secret to hash
Returns:
Hashed secret
"""
return hmac.new(
self.secret_key,
secret.encode(),
hashlib.sha256
).hexdigest()
def _verify_secret(self, secret: str, key_hash: str) -> bool:
"""Verify API key secret against hash.
Args:
secret: Raw secret to verify
key_hash: Stored hash to verify against
Returns:
True if secret matches hash
"""
computed_hash = self._hash_secret(secret)
return hmac.compare_digest(computed_hash, key_hash)
async def create_api_key(
self,
user: User,
name: str,
scopes: List[str] = None,
expires_in_days: Optional[int] = None,
metadata: Dict[str, Any] = None,
) -> tuple[APIKey, str]:
"""Create a new API key for a user.
Args:
user: User to create key for
name: Human-readable name for the key
scopes: List of scopes for the key
expires_in_days: Expiry period in days (None for no expiry)
metadata: Additional metadata
Returns:
Tuple of (APIKey object, raw secret)
"""
key_id, secret = self._generate_key_pair()
key_hash = self._hash_secret(secret)
# Calculate expiry
expires_at = None
if expires_in_days is not None:
expires_at = datetime.utcnow() + timedelta(days=expires_in_days)
elif self.default_expiry_days > 0:
expires_at = datetime.utcnow() + timedelta(days=self.default_expiry_days)
# Create API key
api_key = APIKey(
key_id=key_id,
key_hash=key_hash,
user_id=user.id,
name=name,
scopes=scopes or [],
expires_at=expires_at,
metadata=metadata or {},
)
# Store API key and user
self._api_keys[key_id] = api_key
self._users[user.id] = user
logger.info("API key created",
key_id=key_id,
user_id=user.id,
name=name,
expires_at=expires_at)
return api_key, secret
async def authenticate(self, credentials: Dict[str, Any]) -> Optional[User]:
"""Authenticate using API key credentials.
Args:
credentials: Dict with 'api_key' containing the full key
Returns:
User object if authentication successful
"""
api_key = credentials.get("api_key")
if not api_key:
raise AuthenticationError("API key is required")
# Parse API key format: key_id.secret
try:
key_id, secret = api_key.split(".", 1)
except ValueError:
raise AuthenticationError("Invalid API key format")
# Find API key
stored_key = self._api_keys.get(key_id)
if not stored_key:
logger.warning("API key not found", key_id=key_id)
raise AuthenticationError("Invalid API key")
# Verify secret
if not self._verify_secret(secret, stored_key.key_hash):
logger.warning("API key secret verification failed", key_id=key_id)
raise AuthenticationError("Invalid API key")
# Check if key is valid
if not stored_key.is_valid():
logger.warning("API key is invalid",
key_id=key_id,
is_active=stored_key.is_active,
is_expired=stored_key.is_expired())
raise AuthenticationError("API key is expired or inactive")
# Update last used
stored_key.update_last_used()
# Get user
user = self._users.get(stored_key.user_id)
if not user:
logger.error("User not found for API key",
key_id=key_id,
user_id=stored_key.user_id)
raise AuthenticationError("User not found")
# Update user last login
user.update_last_login()
logger.info("API key authentication successful",
key_id=key_id,
user_id=user.id,
username=user.username)
return user
async def validate_token(self, token: str) -> Optional[AuthToken]:
"""Validate API key token.
Args:
token: API key token to validate
Returns:
AuthToken object if valid
"""
try:
# Parse API key format
key_id, secret = token.split(".", 1)
except ValueError:
raise InvalidTokenError("Invalid API key format")
# Find API key
stored_key = self._api_keys.get(key_id)
if not stored_key:
raise InvalidTokenError("API key not found")
# Verify secret
if not self._verify_secret(secret, stored_key.key_hash):
raise InvalidTokenError("Invalid API key secret")
# Check expiry
if stored_key.is_expired():
raise TokenExpiredError("API key has expired")
# Check if active
if not stored_key.is_active:
raise InvalidTokenError("API key is inactive")
# Update last used
stored_key.update_last_used()
return AuthToken(
token=token,
token_type="api_key",
expires_at=stored_key.expires_at,
user_id=stored_key.user_id,
scopes=stored_key.scopes,
metadata={"key_id": key_id, "key_name": stored_key.name},
)
async def create_token(self, user: User, **kwargs) -> AuthToken:
"""Create API key token for user.
Args:
user: User to create token for
**kwargs: Additional parameters (name, scopes, expires_in_days)
Returns:
AuthToken object
"""
name = kwargs.get("name", f"Token for {user.username}")
scopes = kwargs.get("scopes", [])
expires_in_days = kwargs.get("expires_in_days")
api_key, secret = await self.create_api_key(
user=user,
name=name,
scopes=scopes,
expires_in_days=expires_in_days,
)
# Combine key_id and secret
full_key = f"{api_key.key_id}.{secret}"
return AuthToken(
token=full_key,
token_type="api_key",
expires_at=api_key.expires_at,
user_id=user.id,
scopes=api_key.scopes,
metadata={"key_id": api_key.key_id, "key_name": api_key.name},
)
async def revoke_token(self, token: str) -> bool:
"""Revoke API key token.
Args:
token: API key token to revoke
Returns:
True if token was revoked
"""
try:
key_id, _ = token.split(".", 1)
except ValueError:
return False
api_key = self._api_keys.get(key_id)
if not api_key:
return False
api_key.is_active = False
logger.info("API key revoked", key_id=key_id, user_id=api_key.user_id)
return True
async def get_user(self, user_id: str) -> Optional[User]:
"""Get user by ID.
Args:
user_id: User identifier
Returns:
User object if found
"""
return self._users.get(user_id)
async def list_api_keys(self, user_id: str) -> List[APIKey]:
"""List API keys for a user.
Args:
user_id: User identifier
Returns:
List of API keys for the user
"""
return [key for key in self._api_keys.values() if key.user_id == user_id]
async def get_api_key(self, key_id: str) -> Optional[APIKey]:
"""Get API key by ID.
Args:
key_id: API key identifier
Returns:
APIKey object if found
"""
return self._api_keys.get(key_id)
async def delete_api_key(self, key_id: str) -> bool:
"""Delete API key.
Args:
key_id: API key identifier
Returns:
True if key was deleted
"""
if key_id in self._api_keys:
api_key = self._api_keys[key_id]
del self._api_keys[key_id]
logger.info("API key deleted", key_id=key_id, user_id=api_key.user_id)
return True
return False
async def add_user(self, user: User) -> None:
"""Add a user to the authenticator.
Args:
user: User to add
"""
self._users[user.id] = user
logger.info("User added to API key authenticator",
user_id=user.id,
username=user.username)
class APIKeyAuthorizer(BaseAuthorizer):
"""Simple role-based authorizer for API keys."""
async def check_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> bool:
"""Check if user has permission.
Args:
user: User to check
permission: Required permission
resource: Optional resource identifier (not used in simple implementation)
Returns:
True if user has permission
"""
return user.has_permission(permission)