Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
test_aws_session_management.py50.6 kB
#!/usr/bin/env python3 """ AWS Account Session Management Test Suite Tests session management functionality with real AWS APIs: 1. Parallel API calls for multiple MCP tools 2. Intelligent caching for avoiding redundant API calls 3. Automatic session clearing This script uses actual AWS services to verify session management. """ import asyncio import json import logging import time import sys import os from datetime import datetime, timedelta from typing import Dict, List, Any, Optional # Add current directory to path sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Import MCP server components from utils.session_manager import get_session_manager from utils.service_orchestrator import ServiceOrchestrator from utils.parallel_executor import get_parallel_executor from utils.intelligent_cache import get_pricing_cache, get_bucket_metadata_cache, get_analysis_results_cache # Import runbook functions directly from playbooks from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis from playbooks.ebs.ebs_optimization import run_ebs_optimization_analysis from playbooks.rds.rds_optimization import run_rds_optimization_analysis from playbooks.aws_lambda.lambda_optimization import run_lambda_optimization_analysis # Import services for direct API calls from services.cost_explorer import get_cost_and_usage from services.compute_optimizer import get_ec2_recommendations from services.trusted_advisor import get_trusted_advisor_checks # Import playbook functions from playbooks.ec2.ec2_optimization import get_underutilized_instances from playbooks.ebs.ebs_optimization import get_underutilized_volumes from playbooks.rds.rds_optimization import get_underutilized_rds_instances from playbooks.aws_lambda.lambda_optimization import get_underutilized_lambda_functions class AWSSessionManagementTester: """Test class for verifying session management with real AWS APIs.""" def __init__(self, region: str = "us-east-1"): self.region = region self.session_manager = get_session_manager() self.parallel_executor = get_parallel_executor() self.pricing_cache = get_pricing_cache() self.bucket_cache = get_bucket_metadata_cache() self.analysis_cache = get_analysis_results_cache() # Complete list of all 46 MCP tools self.all_mcp_tools = [ # Core AWS Service Tools (8 tools) "get_cost_explorer_data", "list_coh_enrollment", "get_coh_recommendations", "get_coh_summaries", "get_coh_recommendation", "get_compute_optimizer_recommendations", "get_trusted_advisor_checks", "get_performance_insights_metrics", # EC2 Optimization Tools (14 tools) "ec2_rightsizing", "ec2_report", "ec2_stopped_instances", "ec2_unattached_eips", "ec2_old_generation", "ec2_detailed_monitoring", "ec2_graviton_compatible", "ec2_burstable_analysis", "ec2_spot_opportunities", "ec2_unused_reservations", "ec2_scheduling_opportunities", "ec2_commitment_plans", "ec2_governance_violations", "ec2_comprehensive_report", # EBS Optimization Tools (3 tools) "ebs_optimization", "ebs_unused", "ebs_report", # RDS Optimization Tools (3 tools) "rds_optimization", "rds_idle", "rds_report", # Lambda Optimization Tools (3 tools) "lambda_optimization", "lambda_unused", "lambda_report", # CloudTrail Optimization Tools (3 tools) "get_management_trails", "run_cloudtrail_trails_analysis", "generate_cloudtrail_report", # S3 Optimization Tools (9 tools) "s3_general_spend_analysis", "s3_storage_class_selection", "s3_storage_class_validation", "s3_archive_optimization", "s3_api_cost_minimization", "s3_multipart_cleanup", "s3_governance_check", "s3_comprehensive_optimization_tool", "s3_comprehensive_analysis", "s3_quick_analysis", "s3_bucket_analysis", # Comprehensive Analysis Tools (1 tool) "comprehensive_analysis" ] logger.info(f"AWSSessionManagementTester initialized for region: {region}") logger.info(f"Total MCP tools to test: {len(self.all_mcp_tools)}") def test_parallel_aws_api_calls(self) -> Dict[str, Any]: """Test 1: Verify parallel API calls with real AWS services.""" logger.info("=== Testing Parallel AWS API Calls ===") start_time = time.time() # Create a service orchestrator orchestrator = ServiceOrchestrator() # Define real AWS service calls for parallel execution service_calls = [ { 'service': 'ec2', 'operation': 'underutilized_instances', 'function': get_underutilized_instances, 'kwargs': { 'region': self.region, 'lookback_period_days': 7, # Shorter period for faster testing 'cpu_threshold': 40.0 }, 'priority': 3, 'timeout': 60.0 }, { 'service': 'ebs', 'operation': 'underutilized_volumes', 'function': get_underutilized_volumes, 'kwargs': { 'region': self.region, 'lookback_period_days': 7, 'iops_threshold': 100.0 }, 'priority': 2, 'timeout': 45.0 }, { 'service': 'rds', 'operation': 'underutilized_instances', 'function': get_underutilized_rds_instances, 'kwargs': { 'region': self.region, 'lookback_period_days': 7, 'cpu_threshold': 40.0 }, 'priority': 2, 'timeout': 50.0 }, { 'service': 'lambda', 'operation': 'underutilized_functions', 'function': get_underutilized_lambda_functions, 'kwargs': { 'region': self.region, 'lookback_period_days': 7, 'memory_utilization_threshold': 50.0 }, 'priority': 1, 'timeout': 40.0 }, { 'service': 'cost_explorer', 'operation': 'cost_and_usage', 'function': get_cost_and_usage, 'kwargs': { 'start_date': (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d'), 'end_date': datetime.now().strftime('%Y-%m-%d'), 'granularity': 'DAILY', 'metrics': ['BlendedCost'] }, 'priority': 1, 'timeout': 30.0 } ] # Execute parallel analysis logger.info(f"Starting parallel execution of {len(service_calls)} AWS service calls") execution_summary = orchestrator.execute_parallel_analysis( service_calls, store_results=True, timeout=120.0 ) parallel_time = time.time() - start_time # Test sequential execution for comparison (with shorter timeout) logger.info("Starting sequential execution for comparison") start_time = time.time() sequential_results = [] sequential_errors = [] for call_def in service_calls[:3]: # Only test first 3 for sequential to save time try: result = call_def['function'](**call_def['kwargs']) sequential_results.append(result) except Exception as e: sequential_errors.append(str(e)) logger.warning(f"Sequential call failed: {e}") sequential_time = time.time() - start_time # Calculate performance improvement performance_improvement = 0 if sequential_time > 0: performance_improvement = (sequential_time - (parallel_time * 0.6)) / sequential_time * 100 # Adjust for fewer sequential calls # Analyze stored data stored_tables = orchestrator.get_stored_tables() total_stored_records = 0 for table in stored_tables: try: records = orchestrator.query_session_data(f'SELECT COUNT(*) as count FROM "{table}"') if records: total_stored_records += records[0].get('count', 0) except Exception as e: logger.warning(f"Error querying table {table}: {e}") test_result = { "test_name": "parallel_aws_api_calls", "status": "success" if execution_summary['successful'] > 0 else "failed", "region": self.region, "parallel_execution_time": parallel_time, "sequential_execution_time": sequential_time, "performance_improvement_percent": performance_improvement, "aws_service_calls": { "total_attempted": execution_summary['total_tasks'], "successful": execution_summary['successful'], "failed": execution_summary['failed'], "timeout": execution_summary['timeout'] }, "session_storage": { "session_id": orchestrator.session_id, "stored_tables": len(stored_tables), "total_records_stored": total_stored_records, "table_names": stored_tables }, "service_results": { task_id: { "service": result['service'], "operation": result['operation'], "status": result['status'], "execution_time": result['execution_time'], "has_stored_data": 'stored_table' in result } for task_id, result in execution_summary['results'].items() }, "sequential_comparison": { "successful_calls": len(sequential_results), "errors": sequential_errors } } logger.info(f"Parallel AWS calls: {parallel_time:.2f}s, Sequential: {sequential_time:.2f}s") logger.info(f"AWS API calls successful: {execution_summary['successful']}/{execution_summary['total_tasks']}") logger.info(f"Data stored in {len(stored_tables)} tables with {total_stored_records} total records") return test_result def test_intelligent_caching_with_aws(self) -> Dict[str, Any]: """Test 2: Verify intelligent caching with real AWS data.""" logger.info("=== Testing Intelligent Caching with AWS Data ===") # Test 1: Cost Explorer data caching cost_cache_key = ["cost_explorer", self.region, "daily", "7days"] # First call (cache miss) start_time = time.time() cached_cost_data = self.analysis_cache.get(cost_cache_key, "not_found") miss_time = time.time() - start_time # Make actual AWS call and cache the result try: cost_data = get_cost_and_usage( start_date=(datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'), granularity='DAILY', metrics=['BlendedCost'] ) # Cache the result self.analysis_cache.put(cost_cache_key, cost_data, ttl_seconds=1800) # 30 minutes # Second call (cache hit) start_time = time.time() cached_cost_data = self.analysis_cache.get(cost_cache_key) hit_time = time.time() - start_time cost_caching_success = cached_cost_data == cost_data except Exception as e: logger.warning(f"Cost Explorer caching test failed: {e}") cost_caching_success = False hit_time = miss_time # Test 2: EC2 instance data caching ec2_cache_key = ["ec2_instances", self.region, "underutilized"] try: # Make AWS call for EC2 data ec2_start_time = time.time() ec2_data = get_underutilized_instances(region=self.region, lookback_period_days=3) ec2_call_time = time.time() - ec2_start_time # Cache the result self.analysis_cache.put(ec2_cache_key, ec2_data, ttl_seconds=3600) # 1 hour # Test cache retrieval start_time = time.time() cached_ec2_data = self.analysis_cache.get(ec2_cache_key) ec2_hit_time = time.time() - start_time ec2_caching_success = cached_ec2_data == ec2_data except Exception as e: logger.warning(f"EC2 caching test failed: {e}") ec2_caching_success = False ec2_call_time = 0 ec2_hit_time = 0 # Test 3: Pricing data caching pricing_cache_key = ["ec2_pricing", self.region, "m5.large"] pricing_data = { "instance_type": "m5.large", "region": self.region, "on_demand_price": 0.096, "currency": "USD", "last_updated": datetime.now().isoformat() } self.pricing_cache.put(pricing_cache_key, pricing_data, ttl_seconds=21600) # 6 hours start_time = time.time() cached_pricing = self.pricing_cache.get(pricing_cache_key) pricing_hit_time = time.time() - start_time pricing_caching_success = cached_pricing == pricing_data # Get comprehensive cache statistics pricing_stats = self.pricing_cache.get_statistics() bucket_stats = self.bucket_cache.get_statistics() analysis_stats = self.analysis_cache.get_statistics() test_result = { "test_name": "intelligent_caching_with_aws", "status": "success", "region": self.region, "cost_explorer_caching": { "success": cost_caching_success, "cache_miss_time_ms": miss_time * 1000, "cache_hit_time_ms": hit_time * 1000, "performance_improvement": (miss_time / hit_time) if hit_time > 0 else float('inf') }, "ec2_data_caching": { "success": ec2_caching_success, "aws_call_time_ms": ec2_call_time * 1000, "cache_hit_time_ms": ec2_hit_time * 1000, "cache_vs_api_improvement": (ec2_call_time / ec2_hit_time) if ec2_hit_time > 0 else float('inf') }, "pricing_data_caching": { "success": pricing_caching_success, "cache_hit_time_ms": pricing_hit_time * 1000 }, "cache_statistics": { "pricing_cache": { "hit_rate_percent": pricing_stats["cache_performance"]["hit_rate_percent"], "total_entries": pricing_stats["cache_size"]["current_entries"], "size_mb": pricing_stats["cache_size"]["current_size_mb"] }, "bucket_cache": { "hit_rate_percent": bucket_stats["cache_performance"]["hit_rate_percent"], "total_entries": bucket_stats["cache_size"]["current_entries"], "size_mb": bucket_stats["cache_size"]["current_size_mb"] }, "analysis_cache": { "hit_rate_percent": analysis_stats["cache_performance"]["hit_rate_percent"], "total_entries": analysis_stats["cache_size"]["current_entries"], "size_mb": analysis_stats["cache_size"]["current_size_mb"] } } } logger.info(f"Cost Explorer caching: {cost_caching_success}, EC2 caching: {ec2_caching_success}") logger.info(f"Cache hit improvement: {(miss_time/hit_time):.1f}x faster" if hit_time > 0 else "Cache instantaneous") return test_result def test_aws_session_persistence(self) -> Dict[str, Any]: """Test 3: Verify session persistence with AWS data.""" logger.info("=== Testing AWS Session Persistence ===") # Create multiple sessions with real AWS data test_sessions = [] session_data_summary = [] for i in range(2): # Create 2 test sessions session_id = self.session_manager.create_session(f"aws_test_session_{i}_{int(time.time())}") test_sessions.append(session_id) # Store real AWS data in each session try: if i == 0: # Session 1: EC2 data ec2_data = get_underutilized_instances(region=self.region, lookback_period_days=3) if ec2_data.get('status') == 'success' and ec2_data.get('data'): instances = ec2_data['data'].get('underutilized_instances', []) if instances: # Convert to list of dicts for storage storage_data = [] for instance in instances[:5]: # Limit to 5 instances storage_data.append({ 'instance_id': instance.get('instance_id', 'unknown'), 'instance_type': instance.get('instance_type', 'unknown'), 'cpu_utilization': instance.get('cpu_utilization', 0), 'region': self.region, 'analysis_type': 'ec2_underutilized' }) success = self.session_manager.store_data(session_id, "ec2_analysis", storage_data) session_data_summary.append({ 'session': i, 'type': 'ec2_analysis', 'records': len(storage_data), 'stored': success }) else: # Session 2: Cost Explorer data cost_data = get_cost_and_usage( start_date=(datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'), granularity='DAILY', metrics=['BlendedCost'] ) if cost_data.get('status') == 'success' and cost_data.get('data'): # Convert cost data to storage format storage_data = [] results_by_time = cost_data['data'].get('ResultsByTime', []) for result in results_by_time[:3]: # Limit to 3 days storage_data.append({ 'time_period': result.get('TimePeriod', {}).get('Start', 'unknown'), 'total_cost': float(result.get('Total', {}).get('BlendedCost', {}).get('Amount', 0)), 'currency': result.get('Total', {}).get('BlendedCost', {}).get('Unit', 'USD'), 'region': self.region, 'analysis_type': 'cost_explorer' }) success = self.session_manager.store_data(session_id, "cost_analysis", storage_data) session_data_summary.append({ 'session': i, 'type': 'cost_analysis', 'records': len(storage_data), 'stored': success }) except Exception as e: logger.warning(f"Error storing data in session {i}: {e}") session_data_summary.append({ 'session': i, 'type': 'error', 'error': str(e), 'stored': False }) # Test session info retrieval session_info_results = [] for session_id in test_sessions: info = self.session_manager.get_session_info(session_id) session_info_results.append({ 'session_id': session_id, 'has_info': 'error' not in info, 'tables': info.get('tables', []), 'created_at': info.get('created_at') }) # Test SQL queries on stored AWS data query_results = [] for i, session_id in enumerate(test_sessions): try: if i == 0: # Query EC2 data result = self.session_manager.execute_query( session_id, 'SELECT COUNT(*) as instance_count, AVG(CAST(cpu_utilization AS REAL)) as avg_cpu FROM ec2_analysis' ) query_results.append({ 'session': i, 'query_type': 'ec2_analysis', 'success': len(result) > 0, 'result': result[0] if result else None }) else: # Query cost data result = self.session_manager.execute_query( session_id, 'SELECT COUNT(*) as day_count, SUM(CAST(total_cost AS REAL)) as total_cost FROM cost_analysis' ) query_results.append({ 'session': i, 'query_type': 'cost_analysis', 'success': len(result) > 0, 'result': result[0] if result else None }) except Exception as e: query_results.append({ 'session': i, 'query_type': 'error', 'success': False, 'error': str(e) }) # Test manual session cleanup cleanup_success = self.session_manager.close_session(test_sessions[0]) remaining_sessions = self.session_manager.list_sessions() test_result = { "test_name": "aws_session_persistence", "status": "success", "region": self.region, "test_sessions_created": len(test_sessions), "aws_data_storage": session_data_summary, "session_info_retrieval": session_info_results, "sql_query_results": query_results, "session_cleanup": { "manual_cleanup_success": cleanup_success, "remaining_sessions": len(remaining_sessions) }, "session_manager_status": { "active_sessions": len(self.session_manager.active_sessions), "cleanup_thread_active": self.session_manager._cleanup_thread.is_alive() if self.session_manager._cleanup_thread else False } } # Clean up remaining test session try: self.session_manager.close_session(test_sessions[1]) except Exception as e: logger.warning(f"Error cleaning up remaining session: {e}") logger.info(f"AWS session persistence test completed. Data stored: {len([s for s in session_data_summary if s.get('stored')])}") return test_result async def test_all_46_mcp_tools(self) -> Dict[str, Any]: """Test all 46 MCP tools for session management functionality.""" logger.info("=== Testing All 46 MCP Tools ===") start_time = time.time() # Import the MCP server functions try: # Import functions directly from playbooks from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis from playbooks.ebs.ebs_optimization import run_ebs_optimization_analysis except ImportError as e: logger.error(f"Failed to import runbook functions: {e}") return {"status": "error", "error": "Failed to import runbook functions"} # Define test parameters for each tool category tool_test_configs = { # Core AWS Service Tools "get_cost_explorer_data": { "args": { "start_date": (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d'), "end_date": datetime.now().strftime('%Y-%m-%d'), "granularity": "DAILY" }, "timeout": 30 }, "list_coh_enrollment": {"args": {}, "timeout": 20}, "get_coh_recommendations": {"args": {"max_results": 10}, "timeout": 30}, "get_coh_summaries": {"args": {"max_results": 5}, "timeout": 25}, "get_coh_recommendation": {"args": {"recommendation_id": "test-rec-123"}, "timeout": 20}, "get_compute_optimizer_recommendations": {"args": {"resource_type": "Ec2Instance"}, "timeout": 30}, "get_trusted_advisor_checks": {"args": {"check_categories": ["cost_optimizing"]}, "timeout": 40}, "get_performance_insights_metrics": {"args": {"db_instance_identifier": "test-db"}, "timeout": 25}, # EC2 Tools "ec2_rightsizing": {"args": {"region": self.region, "lookback_period_days": 3}, "timeout": 45}, "ec2_report": {"args": {"region": self.region}, "timeout": 35}, "ec2_stopped_instances": {"args": {"region": self.region}, "timeout": 25}, "ec2_unattached_eips": {"args": {"region": self.region}, "timeout": 20}, "ec2_old_generation": {"args": {"region": self.region}, "timeout": 25}, "ec2_detailed_monitoring": {"args": {"region": self.region}, "timeout": 20}, "ec2_graviton_compatible": {"args": {"region": self.region}, "timeout": 25}, "ec2_burstable_analysis": {"args": {"region": self.region}, "timeout": 30}, "ec2_spot_opportunities": {"args": {"region": self.region}, "timeout": 25}, "ec2_unused_reservations": {"args": {"region": self.region}, "timeout": 20}, "ec2_scheduling_opportunities": {"args": {"region": self.region}, "timeout": 25}, "ec2_commitment_plans": {"args": {"region": self.region}, "timeout": 30}, "ec2_governance_violations": {"args": {"region": self.region}, "timeout": 25}, "ec2_comprehensive_report": {"args": {"region": self.region}, "timeout": 60}, # EBS Tools "ebs_optimization": {"args": {"region": self.region, "lookback_period_days": 7}, "timeout": 40}, "ebs_unused": {"args": {"region": self.region}, "timeout": 30}, "ebs_report": {"args": {"region": self.region}, "timeout": 35}, # RDS Tools "rds_optimization": {"args": {"region": self.region, "lookback_period_days": 7}, "timeout": 45}, "rds_idle": {"args": {"region": self.region}, "timeout": 30}, "rds_report": {"args": {"region": self.region}, "timeout": 35}, # Lambda Tools "lambda_optimization": {"args": {"region": self.region, "lookback_period_days": 7}, "timeout": 40}, "lambda_unused": {"args": {"region": self.region}, "timeout": 30}, "lambda_report": {"args": {"region": self.region}, "timeout": 35}, # CloudTrail Tools "get_management_trails": {"args": {"region": self.region}, "timeout": 25}, "run_cloudtrail_trails_analysis": {"args": {"region": self.region}, "timeout": 35}, "generate_cloudtrail_report": {"args": {"region": self.region}, "timeout": 30}, # S3 Tools (with reduced timeouts to prevent long waits) "s3_general_spend_analysis": {"args": {"region": self.region, "lookback_months": 3}, "timeout": 45}, "s3_storage_class_selection": {"args": {"region": self.region}, "timeout": 20}, "s3_storage_class_validation": {"args": {"region": self.region}, "timeout": 40}, "s3_archive_optimization": {"args": {"region": self.region}, "timeout": 40}, "s3_api_cost_minimization": {"args": {"region": self.region}, "timeout": 35}, "s3_multipart_cleanup": {"args": {"region": self.region}, "timeout": 35}, "s3_governance_check": {"args": {"region": self.region}, "timeout": 30}, "s3_comprehensive_optimization_tool": {"args": {"region": self.region, "timeout_seconds": 30}, "timeout": 40}, "s3_comprehensive_analysis": {"args": {"region": self.region}, "timeout": 35}, "s3_quick_analysis": {"args": {"region": self.region}, "timeout": 35}, "s3_bucket_analysis": {"args": {"bucket_names": ["test-bucket-123"], "region": self.region}, "timeout": 40}, # Comprehensive Analysis "comprehensive_analysis": {"args": {"region": self.region, "services": ["ec2", "ebs"], "lookback_period_days": 3}, "timeout": 90} } # Test results tracking test_results = { "total_tools": len(self.all_mcp_tools), "tested_tools": 0, "successful_tools": 0, "failed_tools": 0, "skipped_tools": 0, "tool_results": {}, "session_data": {}, "performance_metrics": {} } # Function mapping for available functions available_functions = {} try: # Extended functions are now in EC2 playbook try: from playbooks.ec2.ec2_optimization import identify_graviton_compatible_instances_mcp except ImportError: identify_graviton_compatible_instances_mcp = None logger.warning("Extended EC2 functions not available") # Core service functions are now in MCP server directly core_service_functions = { "get_cost_explorer_data": None, # These are in MCP server, not playbooks "list_coh_enrollment": None, "get_coh_recommendations": None, "get_coh_summaries": None, "get_coh_recommendation": None, "get_compute_optimizer_recommendations": None, "get_trusted_advisor_checks": None, "get_performance_insights_metrics": None, } # EC2 functions are now in EC2 playbook ec2_functions = { "ec2_rightsizing": run_ec2_right_sizing_analysis, "ec2_report": None, # Available in playbook "ec2_stopped_instances": None, # Available in playbook "ec2_unattached_eips": None, # Available in playbook "ec2_old_generation": None, # Available in playbook "ec2_detailed_monitoring": None, # Available in playbook "ec2_comprehensive_report": None, # Available in playbook } # EC2 extended functions are now in EC2 playbook if identify_graviton_compatible_instances_mcp: ec2_extended_functions = { "ec2_graviton_compatible": identify_graviton_compatible_instances_mcp, "ec2_burstable_analysis": None, # Available in EC2 playbook "ec2_spot_opportunities": None, # Available in EC2 playbook "ec2_unused_reservations": None, # Available in EC2 playbook "ec2_scheduling_opportunities": None, # Available in EC2 playbook "ec2_commitment_plans": None, # Available in EC2 playbook "ec2_governance_violations": None, # Available in EC2 playbook } ec2_functions.update(ec2_extended_functions) # EBS functions are now in EBS playbook ebs_functions = { "ebs_optimization": run_ebs_optimization_analysis, "ebs_unused": None, # Available in EBS playbook "ebs_report": None, # Available in EBS playbook } # RDS functions are now in RDS playbook rds_functions = { "rds_optimization": run_rds_optimization_analysis, "rds_idle": None, # Available in RDS playbook "rds_report": None, # Available in RDS playbook } # Lambda functions are now in Lambda playbook lambda_functions = { "lambda_optimization": run_lambda_optimization_analysis, "lambda_unused": None, # Available in Lambda playbook "lambda_report": None, # Available in Lambda playbook } # CloudTrail functions are now in CloudTrail playbook cloudtrail_functions = { "get_management_trails": None, # Available in CloudTrail playbook "run_cloudtrail_trails_analysis": None, # Available in CloudTrail playbook "generate_cloudtrail_report": getattr(runbook_functions, 'generate_cloudtrail_report', None), } # S3 functions s3_functions = { "s3_general_spend_analysis": getattr(runbook_functions, 'run_s3_general_spend_analysis', None), "s3_storage_class_selection": getattr(runbook_functions, 'run_s3_storage_class_selection', None), "s3_storage_class_validation": getattr(runbook_functions, 'run_s3_storage_class_validation', None), "s3_archive_optimization": getattr(runbook_functions, 'run_s3_archive_optimization', None), "s3_api_cost_minimization": getattr(runbook_functions, 'run_s3_api_cost_minimization', None), "s3_multipart_cleanup": getattr(runbook_functions, 'run_s3_multipart_cleanup', None), "s3_governance_check": getattr(runbook_functions, 'run_s3_governance_check', None), "s3_comprehensive_optimization_tool": getattr(runbook_functions, 'run_s3_comprehensive_optimization_tool', None), "s3_comprehensive_analysis": getattr(runbook_functions, 'run_s3_comprehensive_analysis', None), "s3_quick_analysis": getattr(runbook_functions, 'run_s3_quick_analysis', None), "s3_bucket_analysis": getattr(runbook_functions, 'run_s3_bucket_analysis', None), } # Comprehensive analysis comprehensive_functions = { "comprehensive_analysis": getattr(runbook_functions, 'run_comprehensive_cost_analysis', None), } # Combine all functions available_functions.update(core_service_functions) available_functions.update(ec2_functions) available_functions.update(ebs_functions) available_functions.update(rds_functions) available_functions.update(lambda_functions) available_functions.update(cloudtrail_functions) available_functions.update(s3_functions) available_functions.update(comprehensive_functions) # Remove None values available_functions = {k: v for k, v in available_functions.items() if v is not None} logger.info(f"Found {len(available_functions)} available functions out of {len(self.all_mcp_tools)} total tools") except Exception as e: logger.warning(f"Error setting up function mapping: {e}") # Test each tool for tool_name in self.all_mcp_tools: logger.info(f"Testing tool: {tool_name}") test_results["tested_tools"] += 1 tool_start_time = time.time() try: # Get test configuration config = tool_test_configs.get(tool_name, {"args": {"region": self.region}, "timeout": 30}) # Get the function func = available_functions.get(tool_name) if not func: test_results["tool_results"][tool_name] = { "status": "skipped", "reason": "Function not available in test environment", "execution_time": 0 } test_results["skipped_tools"] += 1 continue # Execute the function with timeout try: if asyncio.iscoroutinefunction(func): result = await asyncio.wait_for(func(config["args"]), timeout=config["timeout"]) else: # For sync functions, call them directly and wrap result sync_result = func(config["args"]) # Wrap sync result to match async format if isinstance(sync_result, dict): result = [type('MockTextContent', (), {'text': json.dumps(sync_result, default=str)})()] else: result = [type('MockTextContent', (), {'text': str(sync_result)})()] except asyncio.TimeoutError: test_results["tool_results"][tool_name] = { "status": "timeout", "execution_time": config["timeout"], "timeout_seconds": config["timeout"] } test_results["failed_tools"] += 1 continue execution_time = time.time() - tool_start_time # Analyze result if result and len(result) > 0: try: result_data = json.loads(result[0].text) if hasattr(result[0], 'text') else result[0] # Check for session metadata has_session_data = any(key in str(result_data) for key in ['session_id', 'session_metadata', 'stored_table']) test_results["tool_results"][tool_name] = { "status": "success", "execution_time": execution_time, "has_session_data": has_session_data, "result_size": len(str(result_data)) } test_results["successful_tools"] += 1 except Exception as parse_error: test_results["tool_results"][tool_name] = { "status": "success_parse_error", "execution_time": execution_time, "parse_error": str(parse_error) } test_results["successful_tools"] += 1 else: test_results["tool_results"][tool_name] = { "status": "success_no_result", "execution_time": execution_time } test_results["successful_tools"] += 1 except Exception as e: execution_time = time.time() - tool_start_time test_results["tool_results"][tool_name] = { "status": "failed", "execution_time": execution_time, "error": str(e) } test_results["failed_tools"] += 1 logger.warning(f"Tool {tool_name} failed: {e}") # Calculate performance metrics total_execution_time = time.time() - start_time successful_executions = [r for r in test_results["tool_results"].values() if r["status"].startswith("success")] test_results["performance_metrics"] = { "total_execution_time": total_execution_time, "average_tool_time": sum(r["execution_time"] for r in test_results["tool_results"].values()) / len(test_results["tool_results"]) if test_results["tool_results"] else 0, "fastest_tool_time": min(r["execution_time"] for r in test_results["tool_results"].values()) if test_results["tool_results"] else 0, "slowest_tool_time": max(r["execution_time"] for r in test_results["tool_results"].values()) if test_results["tool_results"] else 0, "success_rate": (test_results["successful_tools"] / test_results["tested_tools"] * 100) if test_results["tested_tools"] > 0 else 0 } # Session data analysis session_tools = sum(1 for r in successful_executions if r.get("has_session_data", False)) test_results["session_data"] = { "tools_with_session_data": session_tools, "session_data_rate": (session_tools / len(successful_executions) * 100) if successful_executions else 0 } logger.info(f"All 46 MCP tools test completed in {total_execution_time:.2f}s") logger.info(f"Success rate: {test_results['performance_metrics']['success_rate']:.1f}%") return test_results async def run_comprehensive_aws_test(self) -> Dict[str, Any]: """Run comprehensive AWS session management test.""" logger.info("Starting comprehensive AWS session management test") start_time = time.time() test_results = { "test_suite": "aws_session_management_verification", "region": self.region, "timestamp": datetime.now().isoformat(), "tests": {} } try: # Test 1: Parallel AWS API calls logger.info("Running Test 1: Parallel AWS API Calls") test_results["tests"]["parallel_aws_api_calls"] = self.test_parallel_aws_api_calls() # Test 2: Intelligent caching with AWS data logger.info("Running Test 2: Intelligent Caching with AWS Data") test_results["tests"]["intelligent_caching_with_aws"] = self.test_intelligent_caching_with_aws() # Test 3: AWS session persistence logger.info("Running Test 3: AWS Session Persistence") test_results["tests"]["aws_session_persistence"] = self.test_aws_session_persistence() # Test 4: All 46 MCP Tools (new comprehensive test) logger.info("Running Test 4: All 46 MCP Tools Session Management") test_results["tests"]["all_46_mcp_tools"] = await self.test_all_46_mcp_tools() # Overall results all_tests_passed = all( test.get("status") == "success" or test.get("performance_metrics", {}).get("success_rate", 0) > 70 for test in test_results["tests"].values() ) test_results["overall_status"] = "success" if all_tests_passed else "partial_success" test_results["total_execution_time"] = time.time() - start_time test_results["tests_passed"] = sum(1 for test in test_results["tests"].values() if test.get("status") == "success" or test.get("performance_metrics", {}).get("success_rate", 0) > 70) test_results["total_tests"] = len(test_results["tests"]) # Enhanced summary statistics total_aws_calls = 0 successful_aws_calls = 0 total_cached_items = 0 total_stored_records = 0 total_mcp_tools_tested = 0 successful_mcp_tools = 0 for test_name, test_result in test_results["tests"].items(): if "aws_service_calls" in test_result: total_aws_calls += test_result["aws_service_calls"]["total_attempted"] successful_aws_calls += test_result["aws_service_calls"]["successful"] if "session_storage" in test_result: total_stored_records += test_result["session_storage"]["total_records_stored"] if "cache_statistics" in test_result: for cache_name, cache_stats in test_result["cache_statistics"].items(): total_cached_items += cache_stats["total_entries"] if "tested_tools" in test_result: total_mcp_tools_tested += test_result["tested_tools"] successful_mcp_tools += test_result["successful_tools"] test_results["summary"] = { "total_aws_api_calls": total_aws_calls, "successful_aws_api_calls": successful_aws_calls, "aws_success_rate": (successful_aws_calls / total_aws_calls * 100) if total_aws_calls > 0 else 0, "total_cached_items": total_cached_items, "total_stored_records": total_stored_records, "total_mcp_tools_tested": total_mcp_tools_tested, "successful_mcp_tools": successful_mcp_tools, "mcp_tools_success_rate": (successful_mcp_tools / total_mcp_tools_tested * 100) if total_mcp_tools_tested > 0 else 0 } except Exception as e: test_results["overall_status"] = "error" test_results["error"] = str(e) logger.error(f"Error during comprehensive AWS test: {e}") logger.info(f"Comprehensive AWS test completed in {test_results.get('total_execution_time', 0):.2f}s") return test_results async def main(): """Main function to run AWS session management tests.""" print("CFM Tips MCP Server - AWS Session Management Verification") print("=" * 65) # Check for region argument region = "us-east-1" if len(sys.argv) > 1: region = sys.argv[1] print(f"Testing with AWS region: {region}") print("Note: This test will make actual AWS API calls to your account") print(f"Testing all {46} MCP tools for session management functionality") print("-" * 65) tester = AWSSessionManagementTester(region=region) results = await tester.run_comprehensive_aws_test() # Print results print("\nAWS Test Results:") print("=" * 50) print(json.dumps(results, indent=2, default=str)) # Summary print(f"\nSummary:") print(f"Overall Status: {results['overall_status']}") print(f"Tests Passed: {results.get('tests_passed', 0)}/{results.get('total_tests', 0)}") print(f"Execution Time: {results.get('total_execution_time', 0):.2f}s") print(f"AWS Region: {results['region']}") print(f"Total MCP Tools: {results.get('total_mcp_tools', 0)}") if 'summary' in results: summary = results['summary'] print(f"AWS API Calls: {summary['successful_aws_api_calls']}/{summary['total_aws_api_calls']} successful ({summary['aws_success_rate']:.1f}%)") print(f"MCP Tools: {summary.get('successful_mcp_tools', 0)}/{summary.get('total_mcp_tools_tested', 0)} successful ({summary.get('mcp_tools_success_rate', 0):.1f}%)") print(f"Cached Items: {summary['total_cached_items']}") print(f"Stored Records: {summary['total_stored_records']}") # Individual test status print("\nDetailed Results:") for test_name, test_result in results.get("tests", {}).items(): if test_name == "all_46_mcp_tools": success_rate = test_result.get("performance_metrics", {}).get("success_rate", 0) status_icon = "✅" if success_rate > 70 else "⚠️" if success_rate > 50 else "❌" print(f"{status_icon} {test_name}: {success_rate:.1f}% success rate") print(f" └─ Tools: {test_result.get('successful_tools', 0)}/{test_result.get('tested_tools', 0)} successful") print(f" └─ Avg Time: {test_result.get('performance_metrics', {}).get('average_tool_time', 0):.2f}s per tool") else: status_icon = "✅" if test_result.get("status") == "success" else "⚠️" if test_result.get("status") == "partial_success" else "❌" print(f"{status_icon} {test_name}: {test_result.get('status', 'unknown')}") # Show key metrics for each test if test_name == "parallel_aws_api_calls" and "aws_service_calls" in test_result: calls = test_result["aws_service_calls"] print(f" └─ AWS Calls: {calls['successful']}/{calls['total_attempted']} successful") elif test_name == "intelligent_caching_with_aws" and "cache_statistics" in test_result: total_entries = sum(cache["total_entries"] for cache in test_result["cache_statistics"].values()) print(f" └─ Cache Entries: {total_entries}") elif test_name == "aws_session_persistence" and "aws_data_storage" in test_result: stored_count = len([s for s in test_result["aws_data_storage"] if s.get("stored")]) print(f" └─ Data Storage: {stored_count} successful") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server