from __future__ import annotations
from typing import Any, Dict, List, Set
from aws_mcp_audit.checks.findings import Finding, new_finding
RISKY_PORTS: Set[int] = {22, 3389, 5432, 3306, 6379, 27017}
def _perm_exposes_world(ip_perm: Dict[str, Any]) -> bool:
from_port = ip_perm.get("FromPort")
to_port = ip_perm.get("ToPort")
if from_port is None or to_port is None:
return False
overlaps = any(from_port <= p <= to_port for p in RISKY_PORTS)
if not overlaps:
return False
for r in ip_perm.get("IpRanges", []):
if r.get("CidrIp") == "0.0.0.0/0":
return True
for r in ip_perm.get("Ipv6Ranges", []):
if r.get("CidrIpv6") == "::/0":
return True
return False
def check_sg_world_open(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
ec2_by_region: Dict[str, Any] = snapshot.get("ec2_by_region", {})
for region, blob in ec2_by_region.items():
for sg in blob.get("security_groups", []):
for perm in sg.get("ip_permissions", []):
if _perm_exposes_world(perm):
out.append(
new_finding(
severity="HIGH",
title="Security group allows world access on risky ports",
region=region,
evidence={
"group_id": sg.get("group_id"),
"group_name": sg.get("group_name"),
"vpc_id": sg.get("vpc_id"),
"ip_permission": {
"FromPort": perm.get("FromPort"),
"ToPort": perm.get("ToPort"),
"IpProtocol": perm.get("IpProtocol"),
"IpRanges": perm.get("IpRanges", []),
"Ipv6Ranges": perm.get("Ipv6Ranges", []),
},
},
remediation_hint="Restrict ingress to known IP ranges (VPN/bastion) or remove the rule; prefer SSM Session Manager over SSH/RDP when possible.",
)
)
return out
def check_public_instances(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
for region, blob in snapshot.get("ec2_by_region", {}).items():
public = [i for i in blob.get("instances", []) if i.get("public_ip")]
if public:
out.append(
new_finding(
severity="MEDIUM",
title="Instances with public IPs detected (review if expected)",
region=region,
evidence={"count": len(public), "examples": public[:10]},
remediation_hint="Confirm these are intended. Prefer private subnets + ALB/NLB, or restrict inbound rules and use SSM for admin access.",
)
)
return out
def check_unassociated_eips(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
for region, blob in snapshot.get("ec2_by_region", {}).items():
eips = blob.get("eips", [])
unassoc = [e for e in eips if not e.get("association_id") and not e.get("instance_id")]
if unassoc:
out.append(
new_finding(
severity="LOW",
title="Unassociated Elastic IPs (waste + exposure risk if reassigned)",
region=region,
evidence={"count": len(unassoc), "examples": unassoc[:10]},
remediation_hint="Release unused EIPs or document why they’re reserved; consider an allowlist process for EIP reservations.",
)
)
return out
def check_unattached_ebs(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
for region, blob in snapshot.get("ec2_by_region", {}).items():
vols = blob.get("volumes", [])
unattached = [v for v in vols if not v.get("attached_instance_id")]
if unattached:
out.append(
new_finding(
severity="LOW",
title="Unattached EBS volumes (cost + orphan risk)",
region=region,
evidence={"count": len(unattached), "examples": unattached[:10]},
remediation_hint="Delete unused volumes after confirming they’re not needed; consider snapshotting first if unsure.",
)
)
return out