"""
AES-256-GCM encryption/decryption module using cryptography library.
"""
import os
import base64
import logging
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class EncryptionManager:
"""AES-256-GCM encryption/decryption manager."""
def __init__(self):
self.key = None
self.salt_length = 16
self.nonce_length = 12 # GCM standard nonce length
# Get encryption key from environment or generate
encryption_key = os.getenv("ENCRYPTION_KEY")
if encryption_key:
self.key = self._derive_key(encryption_key.encode())
else:
logger.warning("No ENCRYPTION_KEY provided. Using system-generated key.")
# In production, always use a proper key from environment
self.key = os.urandom(32) # 256 bits
def _derive_key(self, password: bytes, salt: Optional[bytes] = None) -> bytes:
"""
Derive encryption key from password using PBKDF2.
Args:
password: Password to derive key from
salt: Salt for key derivation (generates if None)
Returns:
Derived encryption key (32 bytes for AES-256)
"""
if salt is None:
salt = os.urandom(self.salt_length)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # AES-256 key length
salt=salt,
iterations=100000, # OWASP recommended minimum
backend=default_backend()
)
return kdf.derive(password), salt
def encrypt_data(self, data: str, password: Optional[str] = None) -> str:
"""
Encrypt data using AES-256-GCM.
Args:
data: Plain text data to encrypt
password: Optional password for key derivation
Returns:
Base64 encoded encrypted data with salt and nonce
"""
try:
if password:
# Derive key from password
key, salt = self._derive_key(password.encode())
else:
# Use pre-configured key
key = self.key
salt = os.urandom(self.salt_length)
# Generate random nonce for GCM
nonce = os.urandom(self.nonce_length)
# Create cipher
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce),
backend=default_backend()
)
encryptor = cipher.encryptor()
# Encrypt data
ciphertext = encryptor.update(data.encode()) + encryptor.finalize()
# Combine salt, nonce, ciphertext, and tag
encrypted_data = salt + nonce + encryptor.tag + ciphertext
# Encode to base64 for storage
encrypted_b64 = base64.b64encode(encrypted_data).decode()
logger.debug(f"Data encrypted successfully (length: {len(data)})")
return encrypted_b64
except Exception as e:
logger.error(f"Encryption failed: {e}")
raise ValueError(f"Failed to encrypt data: {e}")
def decrypt_data(self, encrypted_data: str, password: Optional[str] = None) -> str:
"""
Decrypt data using AES-256-GCM.
Args:
encrypted_data: Base64 encoded encrypted data
password: Optional password for key derivation
Returns:
Decrypted plain text data
"""
try:
# Decode from base64
encrypted_bytes = base64.b64decode(encrypted_data.encode())
# Extract components
salt = encrypted_bytes[:self.salt_length]
nonce = encrypted_bytes[self.salt_length:self.salt_length + self.nonce_length]
tag = encrypted_bytes[self.salt_length + self.nonce_length:self.salt_length + self.nonce_length + 16]
ciphertext = encrypted_bytes[self.salt_length + self.nonce_length + 16:]
if password:
# Derive key from password
key, _ = self._derive_key(password.encode(), salt)
else:
# Use pre-configured key
key = self.key
# Create cipher for decryption
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
# Decrypt data
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
logger.debug(f"Data decrypted successfully (length: {len(plaintext)})")
return plaintext.decode()
except Exception as e:
logger.error(f"Decryption failed: {e}")
raise ValueError(f"Failed to decrypt data: {e}")
def encrypt_credentials(self, username: str, password: str) -> Tuple[str, str]:
"""
Encrypt username and password credentials.
Args:
username: Username to encrypt
password: Password to encrypt
Returns:
Tuple of (encrypted_username, encrypted_password)
"""
try:
encrypted_username = self.encrypt_data(username)
encrypted_password = self.encrypt_data(password)
logger.info("Credentials encrypted successfully")
return encrypted_username, encrypted_password
except Exception as e:
logger.error(f"Failed to encrypt credentials: {e}")
raise
def decrypt_credentials(self, encrypted_username: str, encrypted_password: str) -> Tuple[str, str]:
"""
Decrypt username and password credentials.
Args:
encrypted_username: Encrypted username
encrypted_password: Encrypted password
Returns:
Tuple of (username, password)
"""
try:
username = self.decrypt_data(encrypted_username)
password = self.decrypt_data(encrypted_password)
logger.info("Credentials decrypted successfully")
return username, password
except Exception as e:
logger.error(f"Failed to decrypt credentials: {e}")
raise
def encrypt_file_data(self, file_data: bytes) -> bytes:
"""
Encrypt file data.
Args:
file_data: File data to encrypt
Returns:
Encrypted file data as bytes
"""
try:
nonce = os.urandom(self.nonce_length)
cipher = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(file_data) + encryptor.finalize()
# Combine nonce, tag, and ciphertext
encrypted_file_data = nonce + encryptor.tag + ciphertext
logger.info(f"File data encrypted (size: {len(file_data)} bytes)")
return encrypted_file_data
except Exception as e:
logger.error(f"File encryption failed: {e}")
raise
def decrypt_file_data(self, encrypted_file_data: bytes) -> bytes:
"""
Decrypt file data.
Args:
encrypted_file_data: Encrypted file data
Returns:
Decrypted file data as bytes
"""
try:
nonce = encrypted_file_data[:self.nonce_length]
tag = encrypted_file_data[self.nonce_length:self.nonce_length + 16]
ciphertext = encrypted_file_data[self.nonce_length + 16:]
cipher = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
file_data = decryptor.update(ciphertext) + decryptor.finalize()
logger.info(f"File data decrypted (size: {len(file_data)} bytes)")
return file_data
except Exception as e:
logger.error(f"File decryption failed: {e}")
raise
def generate_secure_token(self, length: int = 32) -> str:
"""
Generate cryptographically secure random token.
Args:
length: Token length in bytes
Returns:
Base64 encoded random token
"""
token_bytes = os.urandom(length)
return base64.b64encode(token_bytes).decode()
def hash_data(self, data: str) -> str:
"""
Create SHA-256 hash of data.
Args:
data: Data to hash
Returns:
Hexadecimal hash string
"""
import hashlib
hash_object = hashlib.sha256(data.encode())
return hash_object.hexdigest()
# Global encryption manager instance
encryption_manager = EncryptionManager()
def encrypt_data(data: str, password: Optional[str] = None) -> str:
"""Global function to encrypt data."""
return encryption_manager.encrypt_data(data, password)
def decrypt_data(encrypted_data: str, password: Optional[str] = None) -> str:
"""Global function to decrypt data."""
return encryption_manager.decrypt_data(encrypted_data, password)
def encrypt_credentials(username: str, password: str) -> Tuple[str, str]:
"""Global function to encrypt credentials."""
return encryption_manager.encrypt_credentials(username, password)
def decrypt_credentials(encrypted_username: str, encrypted_password: str) -> Tuple[str, str]:
"""Global function to decrypt credentials."""
return encryption_manager.decrypt_credentials(encrypted_username, encrypted_password)