from __future__ import annotations
from typing import Any, Dict, List
from aws_mcp_audit.checks.findings import Finding, new_finding
def check_unencrypted_ebs(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
for region, blob in snapshot.get("ec2_by_region", {}).items():
unenc = [v for v in blob.get("volumes", []) if v.get("encrypted") is False]
if unenc:
out.append(
new_finding(
severity="MEDIUM",
title="Unencrypted EBS volumes detected",
region=region,
evidence={"count": len(unenc), "examples": unenc[:10]},
remediation_hint="Enable EBS encryption by default and migrate volumes via snapshot/copy to encrypted volumes.",
)
)
return out
def check_rds_public_or_low_backup(snapshot: Dict[str, Any], min_backup_days: int = 7) -> List[Finding]:
out: List[Finding] = []
rds_by_region: Dict[str, Any] = snapshot.get("rds_by_region", {})
for region, blob in rds_by_region.items():
for db in blob.get("instances", []):
if db.get("publicly_accessible") is True:
out.append(
new_finding(
severity="HIGH",
title="RDS instance publicly accessible",
region=region,
evidence={"db_instance_identifier": db.get("db_instance_identifier"), "engine": db.get("engine")},
remediation_hint="Move DB to private subnets and restrict access via security groups; expose only app tiers publicly.",
)
)
br = db.get("backup_retention_period")
if isinstance(br, int) and br < min_backup_days:
out.append(
new_finding(
severity="MEDIUM",
title=f"RDS backup retention below {min_backup_days} days",
region=region,
evidence={"db_instance_identifier": db.get("db_instance_identifier"), "backup_retention_period": br},
remediation_hint="Increase automated backup retention; confirm point-in-time restore meets RPO needs.",
)
)
return out