from __future__ import annotations
from typing import Any, Dict, List
import boto3
from botocore.exceptions import ClientError
def collect_rds_region(session: boto3.Session, region: str) -> Dict[str, Any]:
rds = session.client("rds", region_name=region)
instances: List[Dict[str, Any]] = []
clusters: List[Dict[str, Any]] = []
try:
inst_p = rds.get_paginator("describe_db_instances")
for page in inst_p.paginate():
for db in page.get("DBInstances", []):
instances.append(
{
"db_instance_identifier": db.get("DBInstanceIdentifier"),
"engine": db.get("Engine"),
"engine_version": db.get("EngineVersion"),
"class": db.get("DBInstanceClass"),
"publicly_accessible": db.get("PubliclyAccessible"),
"storage_encrypted": db.get("StorageEncrypted"),
"backup_retention_period": db.get("BackupRetentionPeriod"),
"multi_az": db.get("MultiAZ"),
"availability_zone": db.get("AvailabilityZone"),
}
)
cl_p = rds.get_paginator("describe_db_clusters")
for page in cl_p.paginate():
for c in page.get("DBClusters", []):
clusters.append(
{
"db_cluster_identifier": c.get("DBClusterIdentifier"),
"engine": c.get("Engine"),
"engine_version": c.get("EngineVersion"),
"storage_encrypted": c.get("StorageEncrypted"),
"backup_retention_period": c.get("BackupRetentionPeriod"),
"multi_az": c.get("MultiAZ"),
}
)
except ClientError as e:
return {"region": region, "error": str(e), "instances": [], "clusters": []}
return {"region": region, "instances": instances, "clusters": clusters}