Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
conftest.py16.8 kB
""" Pytest configuration and fixtures for S3 optimization testing. This module provides common fixtures and configuration for all tests, including mocked AWS services, test data factories, and performance monitoring. """ import pytest import asyncio import logging from unittest.mock import Mock, MagicMock, patch from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import boto3 from moto import mock_aws import json # Configure logging for tests logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @pytest.fixture(scope="session") def event_loop(): """Create an instance of the default event loop for the test session.""" loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() @pytest.fixture def mock_aws_credentials(): """Mock AWS credentials for testing.""" with patch.dict('os.environ', { 'AWS_ACCESS_KEY_ID': 'testing', 'AWS_SECRET_ACCESS_KEY': 'testing', 'AWS_SECURITY_TOKEN': 'testing', 'AWS_SESSION_TOKEN': 'testing', 'AWS_DEFAULT_REGION': 'us-east-1' }): yield @pytest.fixture def mock_s3_client(mock_aws_credentials): """Mock S3 client with moto.""" with mock_aws(): yield boto3.client('s3', region_name='us-east-1') @pytest.fixture def mock_cloudwatch_client(mock_aws_credentials): """Mock CloudWatch client with moto.""" with mock_aws(): yield boto3.client('cloudwatch', region_name='us-east-1') @pytest.fixture def mock_ce_client(mock_aws_credentials): """Mock Cost Explorer client with moto.""" with mock_aws(): yield boto3.client('ce', region_name='us-east-1') @pytest.fixture def mock_s3control_client(mock_aws_credentials): """Mock S3 Control client with moto.""" with mock_aws(): yield boto3.client('s3control', region_name='us-east-1') @pytest.fixture def sample_buckets(): """Sample bucket data for testing.""" return [ { "Name": "test-bucket-1", "CreationDate": datetime.now() - timedelta(days=30), "Region": "us-east-1" }, { "Name": "test-bucket-2", "CreationDate": datetime.now() - timedelta(days=60), "Region": "us-west-2" }, { "Name": "test-bucket-3", "CreationDate": datetime.now() - timedelta(days=90), "Region": "eu-west-1" } ] @pytest.fixture def sample_cost_explorer_data(): """Sample Cost Explorer response data.""" return { "ResultsByTime": [ { "TimePeriod": { "Start": "2024-01-01", "End": "2024-01-02" }, "Groups": [ { "Keys": ["S3-Storage-Standard"], "Metrics": { "UnblendedCost": {"Amount": "10.50", "Unit": "USD"}, "UsageQuantity": {"Amount": "1000", "Unit": "GB-Mo"} } }, { "Keys": ["S3-Storage-StandardIA"], "Metrics": { "UnblendedCost": {"Amount": "5.25", "Unit": "USD"}, "UsageQuantity": {"Amount": "500", "Unit": "GB-Mo"} } } ] } ] } @pytest.fixture def sample_storage_lens_config(): """Sample Storage Lens configuration.""" return { "StorageLensConfiguration": { "Id": "test-config", "AccountLevel": { "StorageMetrics": {"IsEnabled": True}, "BucketLevel": { "ActivityMetrics": {"IsEnabled": True}, "CostOptimizationMetrics": {"IsEnabled": True}, "DetailedStatusCodesMetrics": {"IsEnabled": False}, "AdvancedCostOptimizationMetrics": {"IsEnabled": False}, "AdvancedDataProtectionMetrics": {"IsEnabled": False} } }, "IsEnabled": True, "DataExport": { "S3BucketDestination": { "Bucket": "storage-lens-export-bucket", "Prefix": "exports/", "Format": "CSV" } } } } @pytest.fixture def mock_s3_service(): """Mock S3Service instance.""" service = Mock() service.region = "us-east-1" service._operation_call_count = {} # Mock async methods async def mock_list_buckets(): return { "status": "success", "data": { "Buckets": [ {"Name": "test-bucket-1", "CreationDate": datetime.now(), "Region": "us-east-1"}, {"Name": "test-bucket-2", "CreationDate": datetime.now(), "Region": "us-west-2"} ] } } async def mock_get_bucket_metrics(bucket_name, days=30): return { "status": "success", "data": { "SizeBytes": 1000000000, "SizeGB": 1.0, "ObjectCount": 1000, "AllRequests": 5000, "GetRequests": 4000, "PutRequests": 1000 } } service.list_buckets = mock_list_buckets service.get_bucket_metrics = mock_get_bucket_metrics service.get_operation_stats = Mock(return_value={"list_buckets": 1, "get_bucket_location": 2}) return service @pytest.fixture def mock_storage_lens_service(): """Mock StorageLensService instance.""" service = Mock() service.account_id = "123456789012" service.region = "us-east-1" # Mock async methods async def mock_get_storage_metrics(config_id="default-account-dashboard"): return { "status": "success", "data": { "ConfigurationId": config_id, "AccountId": "123456789012", "IsEnabled": True, "CostOptimizationMetrics": True, "DetailedStatusCodesMetrics": False } } async def mock_get_cost_optimization_metrics(config_id="default-account-dashboard"): return { "status": "success", "data": { "ConfigurationId": config_id, "CostOptimizationMetricsEnabled": True, "MultipartUploadTrackingAvailable": True } } service.get_storage_metrics = mock_get_storage_metrics service.get_cost_optimization_metrics = mock_get_cost_optimization_metrics return service @pytest.fixture def mock_pricing_service(): """Mock S3Pricing service instance.""" service = Mock() service.region = "us-east-1" def mock_get_storage_pricing(): return { "status": "success", "storage_pricing": { "STANDARD": 0.023, "STANDARD_IA": 0.0125, "ONEZONE_IA": 0.01, "GLACIER": 0.004, "DEEP_ARCHIVE": 0.00099 } } def mock_estimate_request_costs(requests): total_cost = sum(count * 0.0004 for count in requests.values()) return { "status": "success", "total_cost": total_cost, "breakdown": {req_type: count * 0.0004 for req_type, count in requests.items()} } service.get_storage_pricing = mock_get_storage_pricing service.estimate_request_costs = mock_estimate_request_costs return service @pytest.fixture def mock_performance_monitor(): """Mock performance monitor.""" monitor = Mock() monitor.start_analysis_monitoring = Mock(return_value="test_session_123") monitor.end_analysis_monitoring = Mock() monitor.record_metric = Mock() monitor.record_cache_hit = Mock() monitor.record_cache_miss = Mock() return monitor @pytest.fixture def mock_memory_manager(): """Mock memory manager.""" manager = Mock() manager.start_memory_tracking = Mock(return_value="test_tracker_123") manager.stop_memory_tracking = Mock(return_value={"peak_memory_mb": 50.0, "avg_memory_mb": 30.0}) manager.register_large_object = Mock() manager.add_cache_reference = Mock() manager.set_performance_monitor = Mock() return manager @pytest.fixture def mock_timeout_handler(): """Mock timeout handler.""" handler = Mock() handler.get_timeout_for_analysis = Mock(return_value=60.0) handler.record_execution_time = Mock() handler.get_complexity_level = Mock(return_value="medium") handler.set_performance_monitor = Mock() return handler @pytest.fixture def mock_cache(): """Mock cache instance.""" cache = Mock() cache.get = Mock(return_value=None) # Cache miss by default cache.put = Mock() cache.invalidate = Mock() cache.set_performance_monitor = Mock() return cache @pytest.fixture def mock_service_orchestrator(): """Mock ServiceOrchestrator instance.""" orchestrator = Mock() orchestrator.session_id = "test_session_123" orchestrator.execute_parallel_analysis = Mock(return_value={ "status": "success", "successful": 5, "total_tasks": 6, "results": { "general_spend": {"status": "success", "data": {}}, "storage_class": {"status": "success", "data": {}}, "archive_optimization": {"status": "success", "data": {}}, "api_cost": {"status": "success", "data": {}}, "multipart_cleanup": {"status": "success", "data": {}} }, "stored_tables": ["general_spend_results", "storage_class_results"] }) orchestrator.query_session_data = Mock(return_value=[]) orchestrator.get_stored_tables = Mock(return_value=["test_table_1", "test_table_2"]) return orchestrator @pytest.fixture def cost_constraint_validator(): """Fixture for validating no-cost constraints.""" class CostConstraintValidator: def __init__(self): self.forbidden_operations = { 'list_objects', 'list_objects_v2', 'head_object', 'get_object', 'get_object_attributes', 'select_object_content' } self.allowed_operations = { 'list_buckets', 'get_bucket_location', 'get_bucket_lifecycle_configuration', 'get_bucket_versioning', 'get_bucket_tagging', 'list_multipart_uploads' } self.operation_calls = [] def validate_operation(self, operation_name: str): """Validate that an operation doesn't incur costs.""" self.operation_calls.append(operation_name) if operation_name in self.forbidden_operations: raise ValueError(f"FORBIDDEN: Operation {operation_name} would incur costs") return operation_name in self.allowed_operations def get_operation_summary(self): """Get summary of operations called.""" return { "total_operations": len(self.operation_calls), "unique_operations": len(set(self.operation_calls)), "operations": list(set(self.operation_calls)), "forbidden_called": [op for op in self.operation_calls if op in self.forbidden_operations], "allowed_called": [op for op in self.operation_calls if op in self.allowed_operations] } return CostConstraintValidator() @pytest.fixture def performance_test_data(): """Test data for performance testing.""" return { "small_dataset": { "bucket_count": 5, "object_count_per_bucket": 100, "expected_max_time": 10.0 }, "medium_dataset": { "bucket_count": 50, "object_count_per_bucket": 1000, "expected_max_time": 30.0 }, "large_dataset": { "bucket_count": 500, "object_count_per_bucket": 10000, "expected_max_time": 120.0 } } @pytest.fixture def timeout_test_scenarios(): """Test scenarios for timeout handling.""" return { "quick_analysis": { "analysis_type": "general_spend", "timeout_seconds": 5.0, "expected_behavior": "complete" }, "medium_analysis": { "analysis_type": "comprehensive", "timeout_seconds": 30.0, "expected_behavior": "complete" }, "timeout_scenario": { "analysis_type": "comprehensive", "timeout_seconds": 1.0, "expected_behavior": "timeout" } } class TestDataFactory: """Factory for creating test data.""" @staticmethod def create_bucket_data(count: int = 3) -> List[Dict[str, Any]]: """Create sample bucket data.""" buckets = [] for i in range(count): buckets.append({ "Name": f"test-bucket-{i+1}", "CreationDate": datetime.now() - timedelta(days=30*(i+1)), "Region": ["us-east-1", "us-west-2", "eu-west-1"][i % 3] }) return buckets @staticmethod def create_cost_data(days: int = 30) -> Dict[str, Any]: """Create sample cost data.""" results = [] for i in range(days): date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') results.append({ "TimePeriod": {"Start": date, "End": date}, "Groups": [ { "Keys": ["S3-Storage-Standard"], "Metrics": { "UnblendedCost": {"Amount": str(10.0 + i * 0.5), "Unit": "USD"}, "UsageQuantity": {"Amount": str(1000 + i * 10), "Unit": "GB-Mo"} } } ] }) return {"ResultsByTime": results} @staticmethod def create_analysis_result(analysis_type: str, status: str = "success") -> Dict[str, Any]: """Create sample analysis result.""" return { "status": status, "analysis_type": analysis_type, "data": { "total_cost": 100.0, "optimization_opportunities": 3, "potential_savings": 25.0 }, "recommendations": [ { "type": "cost_optimization", "priority": "high", "title": f"Optimize {analysis_type}", "potential_savings": 25.0 } ], "execution_time": 5.0, "timestamp": datetime.now().isoformat() } @pytest.fixture def test_data_factory(): """Test data factory fixture.""" return TestDataFactory() # Performance testing utilities class PerformanceTracker: """Track performance metrics during tests.""" def __init__(self): self.metrics = {} self.start_times = {} def start_timer(self, name: str): """Start timing an operation.""" self.start_times[name] = datetime.now() def end_timer(self, name: str) -> float: """End timing and return duration.""" if name in self.start_times: duration = (datetime.now() - self.start_times[name]).total_seconds() self.metrics[name] = duration return duration return 0.0 def get_metrics(self) -> Dict[str, float]: """Get all recorded metrics.""" return self.metrics.copy() def assert_performance(self, name: str, max_time: float): """Assert that an operation completed within time limit.""" if name in self.metrics: assert self.metrics[name] <= max_time, f"Operation {name} took {self.metrics[name]:.2f}s, expected <= {max_time}s" else: raise ValueError(f"No timing data for operation: {name}") @pytest.fixture def performance_tracker(): """Performance tracker fixture.""" return PerformanceTracker() # Async test utilities def async_test(coro): """Decorator to run async tests.""" def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() return loop.run_until_complete(coro(*args, **kwargs)) return wrapper # Test markers pytest.mark.unit = pytest.mark.unit pytest.mark.integration = pytest.mark.integration pytest.mark.performance = pytest.mark.performance pytest.mark.no_cost_validation = pytest.mark.no_cost_validation

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server