from __future__ import annotations
from typing import Any, Dict, List
from aws_mcp_audit.checks.findings import Finding, new_finding
def check_cloudtrail_present(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
trails = snapshot.get("telemetry", {}).get("cloudtrail", {}).get("trails", [])
if not trails:
out.append(
new_finding(
severity="HIGH",
title="CloudTrail not detected (or not visible with current permissions)",
region=None,
evidence={"trail_count": 0},
remediation_hint="Enable at least one organization/account CloudTrail; prefer multi-region; store logs in a secured bucket with retention.",
)
)
return out
non_multi = [t for t in trails if t.get("is_multi_region_trail") is False]
if non_multi:
out.append(
new_finding(
severity="MEDIUM",
title="CloudTrail trails not multi-region",
region=None,
evidence={"count": len(non_multi), "examples": non_multi[:10]},
remediation_hint="Enable multi-region trails to capture activity across regions (especially for incident response).",
)
)
no_dest = [t for t in trails if not t.get("s3_bucket_name") and not t.get("cloudwatch_logs_log_group_arn")]
if no_dest:
out.append(
new_finding(
severity="MEDIUM",
title="CloudTrail trails with no visible log destination (best-effort)",
region=None,
evidence={"count": len(no_dest), "examples": no_dest[:10]},
remediation_hint="Ensure CloudTrail is configured to deliver logs to S3 and/or CloudWatch Logs with appropriate retention and access controls.",
)
)
return out
def check_cloudwatch_alarm_signal(snapshot: Dict[str, Any]) -> List[Finding]:
out: List[Finding] = []
cw = snapshot.get("telemetry", {}).get("cloudwatch_alarms", {})
total = cw.get("total_alarm_count")
if total is None:
return out
if total == 0:
out.append(
new_finding(
severity="LOW",
title="No CloudWatch alarms detected (signal-only)",
region=None,
evidence={"alarm_count": 0},
remediation_hint="Consider baseline alarms for CPU, disk, 5xx, latency, and RDS storage/CPU. This is a signal, not definitive coverage.",
)
)
return out