import dataclasses
import os
from lib import jsonutils, loggerutils
from lib.aws.firehose_utils.exceptions import FirehoseConfigurationError
from lib.enums import AwsProfilesList
logger = loggerutils.getLogger('analytics')
@dataclasses.dataclass
class IrsaCredentials:
assume_role_arn: str
assume_role_name: str
@dataclasses.dataclass
class DirectKeyCredentials:
aws_access_key_id: str
aws_secret_access_key: str
aws_session_token: str | None = None
@dataclasses.dataclass
class AWSProfileConfig:
name: AwsProfilesList
region: str
bucket: str
creds: IrsaCredentials | DirectKeyCredentials
@property
def uses_irsa(self) -> bool:
return isinstance(self.creds, IrsaCredentials)
def init_aws_profiles(path: str) -> None:
if not os.path.exists(path):
raise FirehoseConfigurationError(f'File {path} not found')
with open(path, 'r') as f:
try:
data = jsonutils.loads(f.read())
except Exception:
raise FirehoseConfigurationError(f'Can`t parse file {path}')
for config in data:
# Check if profile name exists in enum
profile_name_str = config.get('name', 'UNKNOWN')
try:
profile_name = AwsProfilesList.get_member(profile_name_str)
except AttributeError:
logger.error(
'Skip',
data='AWS profile not found in AwsProfilesList enum',
extra_data={
'profile': profile_name_str,
'available_profiles': [p.value for p in AwsProfilesList],
},
)
continue
try:
region = config['region']
bucket = config['bucket']
creds: IrsaCredentials | DirectKeyCredentials
# If no secret_key in config → IRSA (assume role from env or config)
if 'secret_key' not in config or not config['secret_key']:
# Get role ARN: either from config directly or from env var
role_arn = config.get('role_arn')
if not role_arn:
# Try to read from env var
role_arn_env_var = config.get('role_arn_env_var')
if not role_arn_env_var:
raise FirehoseConfigurationError(
f'Missing role_arn or role_arn_env_var in IRSA config '
f'for {profile_name.value}'
)
role_arn = os.environ.get(role_arn_env_var)
if not role_arn:
raise FirehoseConfigurationError(
f'IRSA role ARN not found in environment: {role_arn_env_var}'
)
# Get session name: config field > env var > default
default_session_name = f'analytics-{profile_name.value}'
role_session_name = config.get('session_name')
if not role_session_name:
session_name_env_var = config.get('session_name_env_var')
if session_name_env_var:
role_session_name = os.environ.get(
session_name_env_var, default_session_name
)
else:
role_session_name = default_session_name
creds = IrsaCredentials(
assume_role_arn=role_arn,
assume_role_name=role_session_name,
)
else:
# Legacy: static credentials from config
logger.info(
'DEPRECATED: Static AWS credentials',
data='Migrate to IRSA for improved security',
extra_data={
'profile': profile_name.value,
},
)
creds = DirectKeyCredentials(
aws_access_key_id=config['access_key'],
aws_secret_access_key=config['secret_key'],
aws_session_token=config.get('session_token'),
)
_PROFILES[profile_name] = AWSProfileConfig(
name=profile_name,
region=region,
bucket=bucket,
creds=creds,
)
except KeyError as e:
logger.error(f'Missing required field {e} in AWS profile config for {profile_name_str}')
_PROFILES: dict[AwsProfilesList, AWSProfileConfig] = {}
def register_profile(profile: AWSProfileConfig) -> None:
_PROFILES[profile.name] = profile
def get_aws_profile_config(profile_name: AwsProfilesList) -> AWSProfileConfig:
config = _PROFILES.get(profile_name)
if not config:
raise FirehoseConfigurationError(f'AWS profile {profile_name} not found')
return config
def get_available_profile_names() -> list[AwsProfilesList]:
return list(_PROFILES.keys())