Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
test_cloudwatch_orchestrator_pagination.py20.1 kB
""" Integration Tests for CloudWatch Orchestrator Pagination These tests verify that the CloudWatch orchestrator correctly integrates result processing and pagination without incurring additional AWS costs. """ import pytest import asyncio import unittest.mock as mock from unittest.mock import MagicMock, patch, AsyncMock from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator class TestCloudWatchOrchestratorPagination: """Integration tests for CloudWatch orchestrator pagination.""" def setup_method(self): """Set up test fixtures.""" self.region = 'us-east-1' self.session_id = 'test-session-123' @pytest.mark.asyncio async def test_execute_analysis_with_pagination_no_additional_api_calls(self): """Test that execute_analysis with pagination doesn't make additional API calls.""" # Mock the analysis engine to return sample data mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': [ {'logGroupName': 'group1', 'storedBytes': 2147483648}, # 2 GB {'logGroupName': 'group2', 'storedBytes': 1073741824} # 1 GB ] }, 'metrics': { 'metrics': [ {'MetricName': 'CustomMetric', 'Namespace': 'MyApp'}, {'MetricName': 'CPUUtilization', 'Namespace': 'AWS/EC2'} ] } } }, 'recommendations': [ {'type': 'optimization', 'potential_monthly_savings': 15.0}, {'type': 'cleanup', 'potential_monthly_savings': 5.0} ] } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class, \ patch('boto3.client') as mock_boto3: # Mock the analysis engine mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine # Create orchestrator orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) # Execute analysis with pagination result = await orchestrator.execute_analysis( 'general_spend', page=1, allow_cost_explorer=False, allow_minimal_cost_metrics=False ) # Verify no additional boto3 clients were created during result processing # (The orchestrator itself may create clients during initialization) initial_call_count = mock_boto3.call_count # Process the result again to verify no additional calls if result.get('status') == 'success': # The result should already be processed and paginated assert 'pagination_applied' in result assert result['current_page'] == 1 # Verify no additional boto3 calls were made during result processing assert mock_boto3.call_count == initial_call_count @pytest.mark.asyncio async def test_result_processing_sorts_by_cost_descending(self): """Test that result processing correctly sorts items by cost in descending order.""" mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': [ {'logGroupName': 'small-group', 'storedBytes': 1073741824}, # 1 GB = $0.03 {'logGroupName': 'large-group', 'storedBytes': 10737418240}, # 10 GB = $0.30 {'logGroupName': 'medium-group', 'storedBytes': 5368709120} # 5 GB = $0.15 ] } } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) result = await orchestrator.execute_analysis('general_spend', page=1) # Verify result is processed and sorted if result.get('status') == 'success': log_groups_data = result['data']['configuration_analysis']['log_groups'] # Should have pagination metadata assert 'pagination' in log_groups_data assert log_groups_data['pagination']['current_page'] == 1 # Should be sorted by cost descending items = log_groups_data['items'] assert len(items) == 3 # Verify cost estimates are added and sorted correctly assert items[0]['logGroupName'] == 'large-group' assert items[0]['estimated_monthly_cost'] == 0.30 assert items[1]['logGroupName'] == 'medium-group' assert items[1]['estimated_monthly_cost'] == 0.15 assert items[2]['logGroupName'] == 'small-group' assert items[2]['estimated_monthly_cost'] == 0.03 @pytest.mark.asyncio async def test_pagination_metadata_accuracy(self): """Test that pagination metadata is accurate for different page scenarios.""" # Create a large dataset to test pagination large_log_groups = [ {'logGroupName': f'group-{i}', 'storedBytes': i * 1073741824} # i GB each for i in range(25) # 25 log groups = 3 pages (10 items per page) ] mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': large_log_groups } } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) # Test page 1 result_page1 = await orchestrator.execute_analysis('general_spend', page=1) log_groups_page1 = result_page1['data']['configuration_analysis']['log_groups'] assert log_groups_page1['pagination']['current_page'] == 1 assert log_groups_page1['pagination']['total_items'] == 25 assert log_groups_page1['pagination']['total_pages'] == 3 assert log_groups_page1['pagination']['has_next_page'] is True assert log_groups_page1['pagination']['has_previous_page'] is False assert len(log_groups_page1['items']) == 10 # Test page 2 result_page2 = await orchestrator.execute_analysis('general_spend', page=2) log_groups_page2 = result_page2['data']['configuration_analysis']['log_groups'] assert log_groups_page2['pagination']['current_page'] == 2 assert log_groups_page2['pagination']['has_next_page'] is True assert log_groups_page2['pagination']['has_previous_page'] is True assert len(log_groups_page2['items']) == 10 # Test page 3 (last page with partial results) result_page3 = await orchestrator.execute_analysis('general_spend', page=3) log_groups_page3 = result_page3['data']['configuration_analysis']['log_groups'] assert log_groups_page3['pagination']['current_page'] == 3 assert log_groups_page3['pagination']['has_next_page'] is False assert log_groups_page3['pagination']['has_previous_page'] is True assert len(log_groups_page3['items']) == 5 # Last page has 5 items @pytest.mark.asyncio async def test_comprehensive_analysis_pagination(self): """Test that comprehensive analysis also applies pagination correctly.""" mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': [ {'logGroupName': 'group1', 'storedBytes': 1073741824}, {'logGroupName': 'group2', 'storedBytes': 2147483648} ] } } }, 'recommendations': [ {'type': 'optimization', 'potential_monthly_savings': 10.0} ] } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) result = await orchestrator.execute_comprehensive_analysis(page=1) # Verify comprehensive analysis applies pagination to all analysis types if result.get('status') == 'success': # Each analysis type should have paginated results for analysis_type in ['general_spend', 'logs_optimization', 'metrics_optimization', 'alarms_and_dashboards']: if analysis_type in result.get('results', {}): analysis_result = result['results'][analysis_type] if analysis_result.get('status') == 'success': assert 'pagination_applied' in analysis_result assert analysis_result['current_page'] == 1 @pytest.mark.asyncio async def test_error_handling_during_result_processing(self): """Test that errors during result processing don't break the analysis.""" # Mock analysis result with invalid data structure mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': 'invalid_structure' # Should be a dict with 'log_groups' key } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) # Should not raise an exception, should return original result result = await orchestrator.execute_analysis('general_spend', page=1) # Should still return a successful result (original data preserved) assert result.get('status') == 'success' # Original data should be preserved if processing fails assert 'data' in result @pytest.mark.asyncio async def test_page_parameter_validation(self): """Test that invalid page parameters are handled correctly.""" mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': [ {'logGroupName': 'group1', 'storedBytes': 1073741824} ] } } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) # Test with page 0 (should default to page 1) result = await orchestrator.execute_analysis('general_spend', page=0) if result.get('status') == 'success': log_groups_data = result['data']['configuration_analysis']['log_groups'] assert log_groups_data['pagination']['current_page'] == 1 # Test with negative page (should default to page 1) result = await orchestrator.execute_analysis('general_spend', page=-5) if result.get('status') == 'success': log_groups_data = result['data']['configuration_analysis']['log_groups'] assert log_groups_data['pagination']['current_page'] == 1 def test_orchestrator_initialization_no_additional_costs(self): """Test that orchestrator initialization doesn't incur additional costs.""" with patch('boto3.client') as mock_boto3: # Track initial boto3 calls (orchestrator may create some clients during init) initial_call_count = mock_boto3.call_count # Create orchestrator orchestrator = CloudWatchOptimizationOrchestrator( region=self.region, session_id=self.session_id ) # Verify result processor is initialized assert hasattr(orchestrator, 'result_processor') assert orchestrator.result_processor is not None # The result processor itself should not create additional AWS clients # (any clients created should be from the orchestrator's other components) processor_specific_calls = mock_boto3.call_count - initial_call_count # Verify result processor doesn't have AWS clients assert not hasattr(orchestrator.result_processor, '_aws_clients') class TestZeroCostIntegration: """Integration tests specifically for zero-cost guarantee.""" @pytest.mark.asyncio async def test_end_to_end_no_additional_api_calls(self): """Test complete end-to-end flow without additional API calls.""" # Mock all AWS services to track API calls with patch('boto3.client') as mock_boto3, \ patch('boto3.resource') as mock_resource: # Create sample analysis result mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': [ {'logGroupName': 'test-group', 'storedBytes': 1073741824} ] } } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine # Track API calls before result processing calls_before_processing = mock_boto3.call_count orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1') result = await orchestrator.execute_analysis('general_spend', page=1) # Track API calls after result processing calls_after_processing = mock_boto3.call_count # Verify result processing didn't add API calls # (orchestrator initialization may create clients, but result processing should not) processing_calls = calls_after_processing - calls_before_processing # The result processing itself should not make additional API calls # Any calls should be from orchestrator initialization, not from result processing if result.get('status') == 'success' and 'pagination_applied' in result: # If pagination was applied, verify it was done without additional API calls # by checking that the processing phase specifically didn't add calls # We can't easily separate orchestrator init calls from processing calls # in this test, but we can verify the result processor methods don't call boto3 processor = orchestrator.result_processor # Test direct processor methods don't make API calls test_items = [{'test': 'data', 'estimated_monthly_cost': 1.0}] calls_before_direct = mock_boto3.call_count processor.sort_by_cost_descending(test_items) processor.paginate_results(test_items) calls_after_direct = mock_boto3.call_count # Direct processor methods should make no API calls assert calls_after_direct == calls_before_direct def test_memory_usage_efficiency(self): """Test that pagination doesn't cause excessive memory usage.""" # Create a large dataset large_dataset = [ {'id': i, 'logGroupName': f'group-{i}', 'storedBytes': i * 1073741824} for i in range(1000) # 1000 log groups ] mock_analysis_result = { 'status': 'success', 'data': { 'configuration_analysis': { 'log_groups': { 'log_groups': large_dataset } } } } with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_engine_class: mock_engine = AsyncMock() mock_engine.run_analysis.return_value = mock_analysis_result mock_engine_class.return_value = mock_engine orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1') # Process large dataset with pagination import tracemalloc tracemalloc.start() # This should be memory efficient due to pagination asyncio.run(orchestrator.execute_analysis('general_spend', page=1)) current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() # Memory usage should be reasonable (less than 100MB for this test) assert peak < 100 * 1024 * 1024 # 100MB limit if __name__ == '__main__': pytest.main([__file__])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server