Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
test_s3_service.py27.3 kB
""" Unit tests for S3Service class with strict no-cost constraint validation. Tests all S3Service functionality while ensuring no cost-incurring operations are performed. Includes comprehensive validation of the cost constraint system. """ import pytest import asyncio from unittest.mock import Mock, AsyncMock, patch, MagicMock from datetime import datetime, timedelta import boto3 from botocore.exceptions import ClientError from services.s3_service import S3Service, S3CostConstraintViolationError, ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS @pytest.mark.unit @pytest.mark.no_cost_validation class TestS3ServiceCostConstraints: """Test cost constraint validation in S3Service.""" def test_allowed_operations_whitelist(self): """Test that allowed operations are properly whitelisted.""" expected_allowed = { 'list_buckets', 'get_bucket_location', 'get_bucket_lifecycle_configuration', 'get_bucket_versioning', 'get_bucket_tagging', 'list_multipart_uploads', 'get_bucket_policy', 'get_bucket_acl', 'get_bucket_encryption', 'get_public_access_block', 'get_bucket_notification_configuration', 'get_bucket_cors', 'get_bucket_website', 'get_bucket_logging', 'get_bucket_replication', 'get_bucket_request_payment' } assert ALLOWED_S3_OPERATIONS == expected_allowed def test_forbidden_operations_list(self): """Test that forbidden operations are properly identified.""" expected_forbidden = { 'list_objects', 'list_objects_v2', 'head_object', 'get_object', 'get_object_attributes', 'select_object_content', 'restore_object' } assert FORBIDDEN_S3_OPERATIONS == expected_forbidden def test_validate_allowed_operation(self, mock_aws_credentials): """Test validation of allowed operations.""" service = S3Service(region="us-east-1") # Should not raise exception service._validate_s3_operation('list_buckets') service._validate_s3_operation('get_bucket_location') service._validate_s3_operation('get_bucket_lifecycle_configuration') # Check operation tracking assert service._operation_call_count['list_buckets'] == 1 assert service._operation_call_count['get_bucket_location'] == 1 def test_validate_forbidden_operation(self, mock_aws_credentials): """Test validation rejects forbidden operations.""" service = S3Service(region="us-east-1") forbidden_ops = ['list_objects', 'list_objects_v2', 'head_object', 'get_object'] for op in forbidden_ops: with pytest.raises(S3CostConstraintViolationError) as exc_info: service._validate_s3_operation(op) assert f"FORBIDDEN: S3 operation '{op}' would incur costs" in str(exc_info.value) def test_validate_unknown_operation_warning(self, mock_aws_credentials, caplog): """Test that unknown operations generate warnings but don't fail.""" service = S3Service(region="us-east-1") # Should not raise exception but should log warning service._validate_s3_operation('unknown_operation') assert "not in whitelist" in caplog.text assert service._operation_call_count['unknown_operation'] == 1 def test_operation_stats_tracking(self, mock_aws_credentials): """Test operation statistics tracking.""" service = S3Service(region="us-east-1") # Call operations multiple times service._validate_s3_operation('list_buckets') service._validate_s3_operation('list_buckets') service._validate_s3_operation('get_bucket_location') stats = service.get_operation_stats() assert stats['list_buckets'] == 2 assert stats['get_bucket_location'] == 1 assert len(stats) == 2 @pytest.mark.unit class TestS3ServiceInitialization: """Test S3Service initialization and client setup.""" def test_initialization_with_region(self, mock_aws_credentials): """Test service initialization with specific region.""" service = S3Service(region="us-west-2") assert service.region == "us-west-2" assert service._operation_call_count == {} assert hasattr(service, 's3_client') assert hasattr(service, 'cloudwatch_client') assert hasattr(service, 'ce_client') def test_initialization_without_region(self, mock_aws_credentials): """Test service initialization without region.""" service = S3Service() assert service.region is None assert hasattr(service, 's3_client') def test_initialization_failure(self): """Test handling of initialization failures.""" with patch('boto3.client', side_effect=Exception("AWS client error")): with pytest.raises(Exception) as exc_info: S3Service(region="us-east-1") assert "AWS client error" in str(exc_info.value) @pytest.mark.unit class TestS3ServiceSafeApiCall: """Test safe API call wrapper functionality.""" @pytest.mark.asyncio async def test_safe_api_call_success(self, mock_aws_credentials): """Test successful API call.""" service = S3Service(region="us-east-1") mock_func = Mock(return_value={"Buckets": []}) mock_func.__name__ = "list_buckets" result = await service.safe_api_call(mock_func) assert result["status"] == "success" assert result["data"] == {"Buckets": []} assert result["source"] == "primary" @pytest.mark.asyncio async def test_safe_api_call_cost_constraint_violation(self, mock_aws_credentials): """Test API call with cost constraint violation.""" service = S3Service(region="us-east-1") # Mock S3 client method that would incur costs mock_s3_client = Mock() mock_s3_client._service_model.service_name = 's3' mock_func = Mock() mock_func.__name__ = "list_objects_v2" mock_func.__self__ = mock_s3_client result = await service.safe_api_call(mock_func, fallback_sources=['storage_lens']) assert result["status"] == "error" assert result["error_code"] == "COST_CONSTRAINT_VIOLATION" assert result["fallback_available"] is True assert "FORBIDDEN" in result["message"] @pytest.mark.asyncio async def test_safe_api_call_access_denied(self, mock_aws_credentials): """Test API call with access denied error.""" service = S3Service(region="us-east-1") mock_func = Mock() mock_func.__name__ = "list_buckets" mock_func.side_effect = ClientError( error_response={'Error': {'Code': 'AccessDenied', 'Message': 'Access denied'}}, operation_name='ListBuckets' ) result = await service.safe_api_call(mock_func, fallback_sources=['storage_lens']) assert result["status"] == "error" assert result["error_code"] == "AccessDenied" assert result["fallback_available"] is True assert result["suggested_fallback"] == "storage_lens" @pytest.mark.asyncio async def test_safe_api_call_throttling_with_retry(self, mock_aws_credentials): """Test API call with throttling and retry logic.""" service = S3Service(region="us-east-1") mock_func = Mock() mock_func.__name__ = "list_buckets" # First call fails with throttling, second succeeds mock_func.side_effect = [ ClientError( error_response={'Error': {'Code': 'Throttling', 'Message': 'Rate exceeded'}}, operation_name='ListBuckets' ), {"Buckets": []} ] with patch('asyncio.sleep', new_callable=AsyncMock): result = await service.safe_api_call(mock_func, max_retries=3) assert result["status"] == "success" assert result["data"] == {"Buckets": []} assert mock_func.call_count == 2 @pytest.mark.asyncio async def test_safe_api_call_max_retries_exceeded(self, mock_aws_credentials): """Test API call with max retries exceeded.""" service = S3Service(region="us-east-1") mock_func = Mock() mock_func.__name__ = "list_buckets" mock_func.side_effect = ClientError( error_response={'Error': {'Code': 'Throttling', 'Message': 'Rate exceeded'}}, operation_name='ListBuckets' ) with patch('asyncio.sleep', new_callable=AsyncMock): result = await service.safe_api_call(mock_func, max_retries=2) assert result["status"] == "error" assert result["error_code"] == "Throttling" assert "Max retries exceeded" in result["message"] assert mock_func.call_count == 3 # Initial + 2 retries @pytest.mark.asyncio async def test_safe_api_call_resource_not_found(self, mock_aws_credentials): """Test API call with resource not found (expected error).""" service = S3Service(region="us-east-1") mock_func = Mock() mock_func.__name__ = "get_bucket_lifecycle_configuration" mock_func.side_effect = ClientError( error_response={'Error': {'Code': 'NoSuchLifecycleConfiguration', 'Message': 'Not found'}}, operation_name='GetBucketLifecycleConfiguration' ) result = await service.safe_api_call(mock_func) assert result["status"] == "success" # Not found is treated as success assert result["data"] == {} assert result["resource_not_found"] is True def test_safe_api_call_sync(self, mock_aws_credentials): """Test synchronous version of safe API call.""" service = S3Service(region="us-east-1") mock_func = Mock(return_value={"Buckets": []}) mock_func.__name__ = "list_buckets" result = service.safe_api_call_sync(mock_func) assert result["status"] == "success" assert result["data"] == {"Buckets": []} @pytest.mark.unit class TestS3ServiceBucketOperations: """Test S3Service bucket-level operations (all no-cost).""" @pytest.mark.asyncio async def test_list_buckets_success(self, mock_aws_credentials): """Test successful bucket listing with region information.""" service = S3Service(region="us-east-1") # Mock list_buckets response mock_buckets_response = { "Buckets": [ {"Name": "test-bucket-1", "CreationDate": datetime.now()}, {"Name": "test-bucket-2", "CreationDate": datetime.now()} ], "Owner": {"ID": "owner-id"} } # Mock get_bucket_location responses mock_location_responses = [ {"LocationConstraint": "us-west-2"}, {"LocationConstraint": None} # us-east-1 returns None ] with patch.object(service, 'safe_api_call') as mock_safe_call: # Setup mock responses mock_safe_call.side_effect = [ {"status": "success", "data": mock_buckets_response}, # list_buckets {"status": "success", "data": mock_location_responses[0]}, # get_bucket_location 1 {"status": "success", "data": mock_location_responses[1]} # get_bucket_location 2 ] result = await service.list_buckets() assert result["status"] == "success" assert len(result["data"]["Buckets"]) == 2 assert result["data"]["Buckets"][0]["Region"] == "us-west-2" assert result["data"]["Buckets"][1]["Region"] == "us-east-1" # None becomes us-east-1 assert "Retrieved 2 buckets with regions" in result["message"] @pytest.mark.asyncio async def test_list_buckets_region_failure(self, mock_aws_credentials): """Test bucket listing with region lookup failures.""" service = S3Service(region="us-east-1") mock_buckets_response = { "Buckets": [{"Name": "test-bucket", "CreationDate": datetime.now()}], "Owner": {"ID": "owner-id"} } with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.side_effect = [ {"status": "success", "data": mock_buckets_response}, # list_buckets {"status": "error", "message": "Access denied"} # get_bucket_location ] result = await service.list_buckets() assert result["status"] == "success" assert result["data"]["Buckets"][0]["Region"] == "unknown" def test_list_buckets_sync(self, mock_aws_credentials): """Test synchronous bucket listing.""" service = S3Service(region="us-east-1") with patch.object(service, 'list_buckets') as mock_async_list: mock_async_list.return_value = {"status": "success", "data": {"Buckets": []}} result = service.list_buckets_sync() assert result["status"] == "success" @pytest.mark.asyncio async def test_get_bucket_lifecycle(self, mock_aws_credentials): """Test getting bucket lifecycle configuration.""" service = S3Service(region="us-east-1") mock_lifecycle = { "Rules": [ { "ID": "rule1", "Status": "Enabled", "Transitions": [{"Days": 30, "StorageClass": "STANDARD_IA"}] } ] } with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.return_value = {"status": "success", "data": mock_lifecycle} result = await service.get_bucket_lifecycle("test-bucket") assert result["status"] == "success" assert result["data"] == mock_lifecycle assert "Retrieved lifecycle configuration" in result["message"] @pytest.mark.asyncio async def test_get_bucket_lifecycle_not_found(self, mock_aws_credentials): """Test getting lifecycle configuration when none exists.""" service = S3Service(region="us-east-1") with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.return_value = { "status": "success", "data": {}, "resource_not_found": True } result = await service.get_bucket_lifecycle("test-bucket") assert result["status"] == "success" assert result["data"] == {"Rules": []} assert "No lifecycle configuration found" in result["message"] @pytest.mark.asyncio async def test_get_bucket_versioning(self, mock_aws_credentials): """Test getting bucket versioning configuration.""" service = S3Service(region="us-east-1") mock_versioning = {"Status": "Enabled", "MfaDelete": "Disabled"} with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.return_value = {"status": "success", "data": mock_versioning} result = await service.get_bucket_versioning("test-bucket") assert result["status"] == "success" assert result["data"] == mock_versioning assert "Retrieved versioning configuration" in result["message"] @pytest.mark.asyncio async def test_get_bucket_tagging(self, mock_aws_credentials): """Test getting bucket tagging configuration.""" service = S3Service(region="us-east-1") mock_tags = { "TagSet": [ {"Key": "Environment", "Value": "Production"}, {"Key": "Owner", "Value": "DataTeam"} ] } with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.return_value = {"status": "success", "data": mock_tags} result = await service.get_bucket_tagging("test-bucket") assert result["status"] == "success" assert result["data"] == mock_tags @pytest.mark.unit class TestS3ServiceMetrics: """Test S3Service metrics operations (no-cost CloudWatch only).""" @pytest.mark.asyncio async def test_get_bucket_metrics_success(self, mock_aws_credentials): """Test successful bucket metrics retrieval.""" service = S3Service(region="us-east-1") # Mock the individual metric methods with patch.object(service, 'get_bucket_size_metrics') as mock_size, \ patch.object(service, 'get_request_metrics') as mock_requests, \ patch.object(service, 'get_storage_class_distribution_from_storage_lens') as mock_storage_class: mock_size.return_value = { "status": "success", "data": {"SizeBytes": 1000000, "ObjectCount": 100} } mock_requests.return_value = { "status": "success", "data": {"AllRequests": 5000, "GetRequests": 4000} } mock_storage_class.return_value = { "status": "error", "suggested_fallback": "storage_lens_service" } result = await service.get_bucket_metrics("test-bucket", days=30) assert result["status"] == "success" assert result["data"]["SizeBytes"] == 1000000 assert result["data"]["AllRequests"] == 5000 assert result["data"]["StorageClassDistribution"]["fallback_required"] is True assert result["cost_constraint_applied"] is True @pytest.mark.asyncio async def test_get_bucket_size_metrics(self, mock_aws_credentials): """Test bucket size metrics from CloudWatch.""" service = S3Service(region="us-east-1") mock_size_response = { "Datapoints": [ {"Timestamp": datetime.now(), "Average": 1000000000} # 1GB ] } mock_objects_response = { "Datapoints": [ {"Timestamp": datetime.now(), "Average": 1000} ] } with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.side_effect = [ {"status": "success", "data": mock_size_response}, {"status": "success", "data": mock_objects_response} ] result = await service.get_bucket_size_metrics("test-bucket", days=30) assert result["status"] == "success" assert result["data"]["SizeBytes"] == 1000000000 assert result["data"]["SizeGB"] == 1.0 assert result["data"]["ObjectCount"] == 1000 assert result["source"] == "cloudwatch" @pytest.mark.asyncio async def test_get_request_metrics(self, mock_aws_credentials): """Test request metrics from CloudWatch.""" service = S3Service(region="us-east-1") # Mock responses for different request types mock_responses = [ {"status": "success", "data": {"Datapoints": [{"Sum": 5000}]}}, # AllRequests {"status": "success", "data": {"Datapoints": [{"Sum": 4000}]}}, # GetRequests {"status": "success", "data": {"Datapoints": [{"Sum": 1000}]}}, # PutRequests {"status": "success", "data": {"Datapoints": [{"Sum": 100}]}}, # DeleteRequests {"status": "success", "data": {"Datapoints": [{"Sum": 500}]}}, # HeadRequests {"status": "success", "data": {"Datapoints": [{"Sum": 200}]}}, # PostRequests {"status": "success", "data": {"Datapoints": [{"Sum": 50}]}} # ListRequests ] with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.side_effect = mock_responses result = await service.get_request_metrics("test-bucket", days=30) assert result["status"] == "success" assert result["data"]["AllRequests"] == 5000 assert result["data"]["GetRequests"] == 4000 assert result["data"]["PutRequests"] == 1000 assert result["source"] == "cloudwatch" @pytest.mark.asyncio async def test_get_storage_class_distribution_cost_constraint(self, mock_aws_credentials): """Test that storage class distribution enforces cost constraints.""" service = S3Service(region="us-east-1") result = await service.get_storage_class_distribution_from_storage_lens("test-bucket") assert result["status"] == "error" assert result["error_code"] == "REQUIRES_STORAGE_LENS" assert result["cost_constraint"] == "FORBIDDEN: list_objects_v2 would incur costs" assert result["suggested_fallback"] == "storage_lens_service" assert result["fallback_available"] is True @pytest.mark.unit @pytest.mark.no_cost_validation class TestS3ServiceCostConstraintIntegration: """Integration tests for cost constraint validation.""" @pytest.mark.asyncio async def test_no_forbidden_operations_in_bucket_listing(self, mock_aws_credentials, cost_constraint_validator): """Test that bucket listing uses only allowed operations.""" service = S3Service(region="us-east-1") # Patch the validation method to track operations original_validate = service._validate_s3_operation def tracking_validate(op_name): cost_constraint_validator.validate_operation(op_name) return original_validate(op_name) service._validate_s3_operation = tracking_validate with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.side_effect = [ {"status": "success", "data": {"Buckets": [{"Name": "test", "CreationDate": datetime.now()}]}}, {"status": "success", "data": {"LocationConstraint": "us-west-2"}} ] await service.list_buckets() summary = cost_constraint_validator.get_operation_summary() assert len(summary["forbidden_called"]) == 0 assert "list_buckets" in summary["allowed_called"] assert "get_bucket_location" in summary["allowed_called"] @pytest.mark.asyncio async def test_no_forbidden_operations_in_metrics(self, mock_aws_credentials, cost_constraint_validator): """Test that metrics collection uses only allowed operations.""" service = S3Service(region="us-east-1") # Track all operations original_validate = service._validate_s3_operation def tracking_validate(op_name): cost_constraint_validator.validate_operation(op_name) return original_validate(op_name) service._validate_s3_operation = tracking_validate with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.return_value = {"status": "success", "data": {"Datapoints": []}} await service.get_bucket_size_metrics("test-bucket") await service.get_request_metrics("test-bucket") summary = cost_constraint_validator.get_operation_summary() assert len(summary["forbidden_called"]) == 0 # CloudWatch operations are not S3 operations, so no S3 operations should be tracked assert len(summary["operations"]) == 0 def test_cost_constraint_error_details(self, mock_aws_credentials): """Test detailed error information for cost constraint violations.""" service = S3Service(region="us-east-1") with pytest.raises(S3CostConstraintViolationError) as exc_info: service._validate_s3_operation('list_objects_v2') error_msg = str(exc_info.value) assert "FORBIDDEN" in error_msg assert "list_objects_v2" in error_msg assert "would incur costs" in error_msg assert "no-cost operations" in error_msg # Should list allowed operations assert "list_buckets" in error_msg @pytest.mark.unit class TestS3ServicePerformance: """Performance tests for S3Service operations.""" @pytest.mark.asyncio async def test_parallel_bucket_region_lookup(self, mock_aws_credentials, performance_tracker): """Test that bucket region lookups are performed in parallel.""" service = S3Service(region="us-east-1") # Mock 5 buckets mock_buckets = [ {"Name": f"bucket-{i}", "CreationDate": datetime.now()} for i in range(5) ] async def slow_region_lookup(*args, **kwargs): await asyncio.sleep(0.1) # Simulate API delay return {"status": "success", "data": {"LocationConstraint": "us-west-2"}} with patch.object(service, 'safe_api_call') as mock_safe_call: mock_safe_call.side_effect = [ {"status": "success", "data": {"Buckets": mock_buckets}}, # list_buckets *[slow_region_lookup() for _ in range(5)] # 5 region lookups ] performance_tracker.start_timer("bucket_listing") result = await service.list_buckets() execution_time = performance_tracker.end_timer("bucket_listing") assert result["status"] == "success" assert len(result["data"]["Buckets"]) == 5 # With parallel execution, 5 operations taking 0.1s each should complete in ~0.1s, not 0.5s assert execution_time < 0.3 # Allow some overhead @pytest.mark.asyncio async def test_parallel_metrics_collection(self, mock_aws_credentials, performance_tracker): """Test that metrics are collected in parallel.""" service = S3Service(region="us-east-1") async def slow_metric_call(*args, **kwargs): await asyncio.sleep(0.05) # Small delay per metric return {"status": "success", "data": {"Datapoints": []}} with patch.object(service, 'get_bucket_size_metrics', side_effect=slow_metric_call), \ patch.object(service, 'get_request_metrics', side_effect=slow_metric_call), \ patch.object(service, 'get_storage_class_distribution_from_storage_lens', side_effect=slow_metric_call): performance_tracker.start_timer("metrics_collection") result = await service.get_bucket_metrics("test-bucket") execution_time = performance_tracker.end_timer("metrics_collection") assert result["status"] == "success" # 3 parallel operations at 0.05s each should complete in ~0.05s, not 0.15s assert execution_time < 0.1

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server