AWSTemplateFormatVersion: '2010-09-09'
Description: 'Integration test for IAM Policy Autopilot - Lambda with insufficient S3 permissions'
Resources:
# S3 bucket for testing
TestBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'iam-autopilot-test-${AWS::AccountId}'
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
# DynamoDB table for testing
TestDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub 'iam-autopilot-test-table-${AWS::AccountId}'
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
DeletionProtectionEnabled: false
Tags:
- Key: Purpose
Value: IAM-Autopilot-Integration-Test
# KMS Key for testing
TestKMSKey:
Type: AWS::KMS::Key
Properties:
Description: Test KMS key for IAM Autopilot integration tests
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow Lambda Role to Describe Key
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:role/IamAutopilotTestLambdaRole'
Action:
- 'kms:DescribeKey'
Resource: '*'
Tags:
- Key: Purpose
Value: IAM-Autopilot-Integration-Test
TestKMSKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: !Sub 'alias/iam-autopilot-test-${AWS::AccountId}'
TargetKeyId: !Ref TestKMSKey
# Secrets Manager Secret for testing
TestSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub 'iam-autopilot-test-secret-${AWS::AccountId}'
Description: Test secret for IAM Autopilot integration tests
SecretString: !Sub |
{
"username": "testuser",
"password": "testpass123",
"database": "testdb",
"host": "localhost"
}
Tags:
- Key: Purpose
Value: IAM-Autopilot-Integration-Test
# SQS Queue for testing
TestQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub 'iam-autopilot-test-queue-${AWS::AccountId}'
MessageRetentionPeriod: 300 # 5 minutes
VisibilityTimeout: 30
Tags:
- Key: Purpose
Value: IAM-Autopilot-Integration-Test
# SNS Topic for testing
TestTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub 'iam-autopilot-test-topic-${AWS::AccountId}'
DisplayName: IAM Autopilot Test Topic
Tags:
- Key: Purpose
Value: IAM-Autopilot-Integration-Test
# Note: IAM role is created once and reused across test runs (CAZ prevents deletion)
# The role must exist before running this template. If it doesn't exist, run the
# setup script or manually create a role named "IamAutopilotTestLambdaRole" with:
# - AssumeRolePolicyDocument allowing lambda.amazonaws.com
# - ManagedPolicy: arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# Lambda function that attempts S3 operations
TestLambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: IamAutopilotTestFunction
Runtime: python3.12
Handler: index.lambda_handler
Role: !Sub 'arn:aws:iam::${AWS::AccountId}:role/IamAutopilotTestLambdaRole'
Timeout: 30
Environment:
Variables:
TEST_BUCKET: !Ref TestBucket
TEST_TABLE: !Ref TestDynamoDBTable
TEST_KMS_KEY_ID: !Ref TestKMSKey
TEST_SECRET_ARN: !Ref TestSecret
TEST_QUEUE_URL: !Ref TestQueue
TEST_TOPIC_ARN: !Ref TestTopic
Code:
ZipFile: |
import json
import os
import boto3
def test_s3_operations(bucket_name):
"""
Test S3 operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. s3:PutObject - Write object to bucket (creates test file)
2. s3:GetObject - Read object from bucket
3. s3:ListBucket - List bucket contents
"""
s3_client = boto3.client('s3')
test_key = 'test-object.txt'
# Test 1: s3:PutObject (will fail if permission missing)
print(f"[TEST] Attempting s3:PutObject on {bucket_name}/{test_key}")
s3_client.put_object(
Bucket=bucket_name,
Key=test_key,
Body=b'Test content'
)
print(f"[SUCCESS] s3:PutObject succeeded")
# Test 2: s3:GetObject
print(f"[TEST] Attempting s3:GetObject on {bucket_name}/{test_key}")
s3_client.get_object(Bucket=bucket_name, Key=test_key)
print(f"[SUCCESS] s3:GetObject succeeded")
# Test 3: s3:ListBucket
print(f"[TEST] Attempting s3:ListBucket on {bucket_name}")
s3_client.list_objects_v2(Bucket=bucket_name)
print(f"[SUCCESS] s3:ListBucket succeeded")
print(f"[COMPLETE] All S3 operations succeeded")
def test_dynamodb_operations(table_name):
"""
Test DynamoDB operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. dynamodb:PutItem - Write data to table
2. dynamodb:GetItem - Read specific item
3. dynamodb:Query - Query table with key condition
"""
dynamodb = boto3.client('dynamodb')
# Test 1: dynamodb:PutItem (will fail if permission missing)
print(f"[TEST] Attempting dynamodb:PutItem on table: {table_name}")
dynamodb.put_item(
TableName=table_name,
Item={
'id': {'S': 'test-id-1'},
'data': {'S': 'test-data'}
}
)
print(f"[SUCCESS] dynamodb:PutItem succeeded")
# Test 2: dynamodb:GetItem
print(f"[TEST] Attempting dynamodb:GetItem on table: {table_name}")
dynamodb.get_item(
TableName=table_name,
Key={'id': {'S': 'test-id-1'}}
)
print(f"[SUCCESS] dynamodb:GetItem succeeded")
# Test 3: dynamodb:Query
print(f"[TEST] Attempting dynamodb:Query on table: {table_name}")
dynamodb.query(
TableName=table_name,
KeyConditionExpression='id = :id',
ExpressionAttributeValues={':id': {'S': 'test-id-1'}}
)
print(f"[SUCCESS] dynamodb:Query succeeded")
print(f"[COMPLETE] All DynamoDB operations succeeded")
def test_kms_operations(key_id):
"""
Test KMS operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. kms:Encrypt - Encrypt plaintext data
2. kms:GenerateDataKey - Generate data encryption key
"""
kms = boto3.client('kms')
# Test 1: kms:Encrypt (will fail if permission missing)
print(f"[TEST] Attempting kms:Encrypt with key: {key_id}")
kms.encrypt(
KeyId=key_id,
Plaintext=b'test data to encrypt'
)
print(f"[SUCCESS] kms:Encrypt succeeded")
# Test 2: kms:GenerateDataKey
print(f"[TEST] Attempting kms:GenerateDataKey with key: {key_id}")
kms.generate_data_key(
KeyId=key_id,
KeySpec='AES_256'
)
print(f"[SUCCESS] kms:GenerateDataKey succeeded")
print(f"[COMPLETE] All KMS operations succeeded")
def test_secretsmanager_operations(secret_arn):
"""
Test Secrets Manager operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. secretsmanager:GetSecretValue - Retrieve secret value
2. secretsmanager:DescribeSecret - Get secret metadata
"""
secrets = boto3.client('secretsmanager')
# Test 1: secretsmanager:GetSecretValue (will fail if permission missing)
print(f"[TEST] Attempting secretsmanager:GetSecretValue on: {secret_arn}")
secrets.get_secret_value(SecretId=secret_arn)
print(f"[SUCCESS] secretsmanager:GetSecretValue succeeded")
# Test 2: secretsmanager:DescribeSecret
print(f"[TEST] Attempting secretsmanager:DescribeSecret on: {secret_arn}")
secrets.describe_secret(SecretId=secret_arn)
print(f"[SUCCESS] secretsmanager:DescribeSecret succeeded")
print(f"[COMPLETE] All Secrets Manager operations succeeded")
def test_sqs_operations(queue_url):
"""
Test SQS operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. sqs:SendMessage - Send message to queue
2. sqs:ReceiveMessage - Receive messages from queue
"""
sqs = boto3.client('sqs')
# Test 1: sqs:SendMessage (will fail if permission missing)
print(f"[TEST] Attempting sqs:SendMessage to: {queue_url}")
sqs.send_message(
QueueUrl=queue_url,
MessageBody='Test message from IAM Autopilot integration test'
)
print(f"[SUCCESS] sqs:SendMessage succeeded")
# Test 2: sqs:ReceiveMessage
print(f"[TEST] Attempting sqs:ReceiveMessage from: {queue_url}")
sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
print(f"[SUCCESS] sqs:ReceiveMessage succeeded")
print(f"[COMPLETE] All SQS operations succeeded")
def test_sns_operations(topic_arn):
"""
Test SNS operations that will fail on first missing permission.
Fails immediately on first AccessDenied - does not catch exceptions.
Tests (in order):
1. sns:Publish - Publish message to topic
2. sns:GetTopicAttributes - Get topic metadata
"""
sns = boto3.client('sns')
# Test 1: sns:Publish (will fail if permission missing)
print(f"[TEST] Attempting sns:Publish to: {topic_arn}")
sns.publish(
TopicArn=topic_arn,
Message='Test message from IAM Autopilot integration test',
Subject='Test Subject'
)
print(f"[SUCCESS] sns:Publish succeeded")
# Test 2: sns:GetTopicAttributes
print(f"[TEST] Attempting sns:GetTopicAttributes on: {topic_arn}")
sns.get_topic_attributes(TopicArn=topic_arn)
print(f"[SUCCESS] sns:GetTopicAttributes succeeded")
print(f"[COMPLETE] All SNS operations succeeded")
def lambda_handler(event, context):
"""
Main Lambda handler that tests all AWS service operations sequentially.
FAILS IMMEDIATELY on first AccessDenied error (realistic behavior).
Test order:
1. S3 operations (3 operations)
2. DynamoDB operations (3 operations)
3. KMS operations (2 operations)
4. Secrets Manager operations (2 operations)
5. SQS operations (2 operations)
6. SNS operations (2 operations)
Returns success response only if ALL operations succeed.
"""
print(f"[START] Integration test - testing 14 operations across 6 services")
# Test operations sequentially - will fail on first AccessDenied
test_s3_operations(os.environ['TEST_BUCKET'])
test_dynamodb_operations(os.environ['TEST_TABLE'])
test_kms_operations(os.environ['TEST_KMS_KEY_ID'])
test_secretsmanager_operations(os.environ['TEST_SECRET_ARN'])
test_sqs_operations(os.environ['TEST_QUEUE_URL'])
test_sns_operations(os.environ['TEST_TOPIC_ARN'])
# If we reach here, all operations succeeded!
print(f"[SUCCESS] All 14 operations completed successfully!")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'All operations completed successfully',
'services_tested': ['s3', 'dynamodb', 'kms', 'secretsmanager', 'sqs', 'sns'],
'total_operations': 14
})
}
Outputs:
LambdaRoleArn:
Description: ARN of the Lambda execution role
Value: !Sub 'arn:aws:iam::${AWS::AccountId}:role/IamAutopilotTestLambdaRole'
Export:
Name: IamAutopilotTestRoleArn
LambdaFunctionName:
Description: Name of the Lambda function
Value: !Ref TestLambdaFunction
Export:
Name: IamAutopilotTestFunctionName
TestBucketName:
Description: Name of the test S3 bucket
Value: !Ref TestBucket
Export:
Name: IamAutopilotTestBucketName
LambdaRoleName:
Description: Name of the Lambda execution role
Value: 'IamAutopilotTestLambdaRole'
Export:
Name: IamAutopilotTestRoleName
TestTableName:
Description: Name of the test DynamoDB table
Value: !Ref TestDynamoDBTable
Export:
Name: IamAutopilotTestTableName
TestTableArn:
Description: ARN of the test DynamoDB table
Value: !GetAtt TestDynamoDBTable.Arn
Export:
Name: IamAutopilotTestTableArn
TestKMSKeyId:
Description: ID of the test KMS key
Value: !Ref TestKMSKey
Export:
Name: IamAutopilotTestKMSKeyId
TestKMSKeyArn:
Description: ARN of the test KMS key
Value: !GetAtt TestKMSKey.Arn
Export:
Name: IamAutopilotTestKMSKeyArn
TestSecretArn:
Description: ARN of the test secret
Value: !Ref TestSecret
Export:
Name: IamAutopilotTestSecretArn
TestQueueUrl:
Description: URL of the test SQS queue
Value: !Ref TestQueue
Export:
Name: IamAutopilotTestQueueUrl
TestQueueArn:
Description: ARN of the test SQS queue
Value: !GetAtt TestQueue.Arn
Export:
Name: IamAutopilotTestQueueArn
TestTopicArn:
Description: ARN of the test SNS topic
Value: !Ref TestTopic
Export:
Name: IamAutopilotTestTopicArn