Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
ec2_optimization.py59.3 kB
""" EC2 Right Sizing Playbook This module implements the EC2 Right Sizing playbook from AWS Cost Optimization Playbooks. It provides functions to identify and recommend right-sizing opportunities for EC2 instances. Includes both core optimization functions and MCP runbook functions. """ import asyncio import json import logging import time import boto3 from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.types import TextContent from services.compute_optimizer import get_ec2_recommendations from services.trusted_advisor import get_trusted_advisor_checks from utils.error_handler import ResponseFormatter, handle_aws_error from utils.service_orchestrator import ServiceOrchestrator from utils.parallel_executor import create_task from utils.documentation_links import add_documentation_links logger = logging.getLogger(__name__) def get_underutilized_instances( region: Optional[str] = None, lookback_period_days: int = 14, cpu_threshold: float = 40.0, memory_threshold: Optional[float] = None, network_threshold: Optional[float] = None ) -> Dict[str, Any]: """ Identify underutilized EC2 instances using multiple data sources with fallback logic. Priority: 1) Compute Optimizer 2) Trusted Advisor 3) CloudWatch direct """ # Try Compute Optimizer first (primary) try: logger.info("Attempting EC2 analysis with Compute Optimizer") result = _get_instances_from_compute_optimizer(region, lookback_period_days) if result["status"] == "success" and result["data"]["count"] > 0: result["data_source"] = "Compute Optimizer" return result except Exception as e: logger.warning(f"Compute Optimizer failed: {str(e)}") # Try Trusted Advisor (secondary) try: logger.info("Attempting EC2 analysis with Trusted Advisor") result = _get_instances_from_trusted_advisor(region) if result["status"] == "success" and result["data"]["count"] > 0: result["data_source"] = "Trusted Advisor" return result except Exception as e: logger.warning(f"Trusted Advisor failed: {str(e)}") # Try CloudWatch direct (tertiary) try: logger.info("Attempting EC2 analysis with CloudWatch") result = _get_instances_from_cloudwatch(region, lookback_period_days, cpu_threshold) result["data_source"] = "CloudWatch" return result except Exception as e: logger.error(f"All data sources failed. CloudWatch error: {str(e)}") return { "status": "error", "message": f"All data sources unavailable. Last error: {str(e)}", "attempted_sources": ["Compute Optimizer", "Trusted Advisor", "CloudWatch"] } def _get_instances_from_compute_optimizer(region: Optional[str], lookback_period_days: int) -> Dict[str, Any]: """Get underutilized instances from Compute Optimizer""" recommendations_result = get_ec2_recommendations(region=region) if recommendations_result["status"] != "success": raise Exception("Compute Optimizer not available") recommendations = recommendations_result["data"].get("instanceRecommendations", []) underutilized_instances = [] for rec in recommendations: if rec.get('finding') in ['Underprovisioned', 'Overprovisioned']: instance_details = { 'instance_id': rec.get('instanceArn', '').split('/')[-1] if rec.get('instanceArn') else 'unknown', 'instance_type': rec.get('currentInstanceType', 'unknown'), 'finding': rec.get('finding', 'unknown'), 'lookback_period_days': lookback_period_days } if rec.get('recommendationOptions'): option = rec['recommendationOptions'][0] instance_details['recommendation'] = { 'recommended_instance_type': option.get('instanceType', 'unknown'), 'estimated_monthly_savings': option.get('estimatedMonthlySavings', {}).get('value', 0) } underutilized_instances.append(instance_details) total_monthly_savings = sum( instance.get('recommendation', {}).get('estimated_monthly_savings', 0) for instance in underutilized_instances ) return { "status": "success", "data": { "underutilized_instances": underutilized_instances, "count": len(underutilized_instances), "total_monthly_savings": total_monthly_savings }, "message": f"Found {len(underutilized_instances)} underutilized EC2 instances via Compute Optimizer" } def _get_instances_from_trusted_advisor(region: Optional[str]) -> Dict[str, Any]: """Get underutilized instances from Trusted Advisor""" ta_result = get_trusted_advisor_checks(["cost_optimizing"]) if ta_result["status"] != "success": raise Exception("Trusted Advisor not available") underutilized_instances = [] checks = ta_result["data"].get("checks", []) for check in checks: if "Low Utilization Amazon EC2 Instances" in check.get('name', ''): resources = check.get('result', {}).get('flaggedResources', []) for resource in resources: instance_details = { 'instance_id': resource.get('resourceId', 'unknown'), 'instance_type': resource.get('metadata', {}).get('Instance Type', 'unknown'), 'finding': 'Low Utilization', 'avg_cpu_utilization': float(resource.get('metadata', {}).get('Average CPU Utilization', '0').replace('%', '')), 'recommendation': { 'action': 'Consider downsizing or terminating', 'estimated_monthly_savings': 50 # Placeholder estimate } } underutilized_instances.append(instance_details) return { "status": "success", "data": { "underutilized_instances": underutilized_instances, "count": len(underutilized_instances), "total_monthly_savings": len(underutilized_instances) * 50 }, "message": f"Found {len(underutilized_instances)} underutilized EC2 instances via Trusted Advisor" } def _get_instances_from_cloudwatch(region: Optional[str], lookback_period_days: int, cpu_threshold: float) -> Dict[str, Any]: """Get underutilized instances from CloudWatch metrics directly""" if region: ec2_client = boto3.client('ec2', region_name=region) cloudwatch_client = boto3.client('cloudwatch', region_name=region) else: ec2_client = boto3.client('ec2') cloudwatch_client = boto3.client('cloudwatch') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_period_days) underutilized_instances = [] for reservation in response['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] try: cpu_response = cloudwatch_client.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=start_time, EndTime=end_time, Period=86400, Statistics=['Average'] ) if cpu_response['Datapoints']: avg_cpu = sum(dp['Average'] for dp in cpu_response['Datapoints']) / len(cpu_response['Datapoints']) if avg_cpu < cpu_threshold: underutilized_instances.append({ 'instance_id': instance_id, 'instance_type': instance_type, 'avg_cpu_utilization': round(avg_cpu, 2), 'finding': 'Low CPU Utilization', 'recommendation': { 'action': 'Consider right-sizing', 'estimated_monthly_savings': 30 # Placeholder estimate } }) except Exception: continue return { "status": "success", "data": { "underutilized_instances": underutilized_instances, "count": len(underutilized_instances), "total_monthly_savings": len(underutilized_instances) * 30 }, "message": f"Found {len(underutilized_instances)} underutilized EC2 instances via CloudWatch" } def get_right_sizing_recommendation( instance_type: str, avg_cpu_utilization: float, region: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Get right-sizing recommendation for an EC2 instance. Args: instance_type: Current instance type avg_cpu_utilization: Average CPU utilization percentage region: AWS region Returns: Dictionary containing right-sizing recommendation """ try: # Use simplified pricing for now pricing_client = None # Get current instance pricing current_price = get_instance_price(instance_type, region, pricing_client) if not current_price: return None # Determine target instance family instance_family = instance_type.split('.')[0] # Simple right-sizing logic based on CPU utilization if avg_cpu_utilization < 20: # Recommend downsizing by 2 sizes within same family target_size = downsize_instance(instance_type, steps=2) elif avg_cpu_utilization < 40: # Recommend downsizing by 1 size within same family target_size = downsize_instance(instance_type, steps=1) else: # No recommendation needed return None if not target_size: return None # Get target instance pricing target_price = get_instance_price(target_size, region, pricing_client) if not target_price: return None # Calculate savings monthly_hours = 730 # Average hours in a month monthly_savings = (current_price - target_price) * monthly_hours return { "current_instance_type": instance_type, "recommended_instance_type": target_size, "current_hourly_cost": current_price, "recommended_hourly_cost": target_price, "estimated_monthly_savings": monthly_savings, "recommendation_reason": f"Average CPU utilization is {avg_cpu_utilization:.2f}%, which is below the threshold" } except Exception as e: logger.error(f"Error getting right-sizing recommendation: {str(e)}") return None def get_instance_price( instance_type: str, region: Optional[str] = None, pricing_client = None ) -> Optional[float]: """Get EC2 instance price from AWS Price List API.""" try: from services.pricing import get_ec2_pricing pricing_result = get_ec2_pricing( instance_type=instance_type, region=region or 'us-east-1' ) if pricing_result.get('status') == 'success': return pricing_result.get('hourly_price', 0.1) else: # Fallback to estimate price_map = { 't2.micro': 0.0116, 't2.small': 0.023, 't2.medium': 0.046, 't3.micro': 0.0104, 't3.small': 0.0208, 't3.medium': 0.0416, 'm5.large': 0.096, 'm5.xlarge': 0.192 } return price_map.get(instance_type, 0.1) except Exception as e: logger.warning(f"Error getting pricing for {instance_type}: {str(e)}") return 0.1 # Default fallback def downsize_instance( instance_type: str, steps: int = 1 ) -> Optional[str]: """ Downsize an EC2 instance type by a number of steps. Args: instance_type: Current instance type steps: Number of steps to downsize Returns: Downsized instance type """ # Instance size hierarchy (from largest to smallest) size_hierarchy = { 'general': ['metal', '48xlarge', '32xlarge', '24xlarge', '18xlarge', '16xlarge', '12xlarge', '9xlarge', '8xlarge', '6xlarge', '4xlarge', '3xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small', 'micro', 'nano'], 'compute': ['metal', '48xlarge', '32xlarge', '24xlarge', '18xlarge', '16xlarge', '12xlarge', '9xlarge', '8xlarge', '6xlarge', '4xlarge', '3xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small', 'micro', 'nano'], 'memory': ['metal', '48xlarge', '32xlarge', '24xlarge', '18xlarge', '16xlarge', '12xlarge', '9xlarge', '8xlarge', '6xlarge', '4xlarge', '3xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small', 'micro', 'nano'], 'storage': ['metal', '48xlarge', '32xlarge', '24xlarge', '18xlarge', '16xlarge', '12xlarge', '9xlarge', '8xlarge', '6xlarge', '4xlarge', '3xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small', 'micro', 'nano'], 'accelerated': ['metal', '48xlarge', '32xlarge', '24xlarge', '18xlarge', '16xlarge', '12xlarge', '9xlarge', '8xlarge', '6xlarge', '4xlarge', '3xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small', 'micro', 'nano'] } try: # Parse instance type parts = instance_type.split('.') if len(parts) != 2: return None family = parts[0] size = parts[1] # Determine instance category category = 'general' if family.startswith('c'): category = 'compute' elif family.startswith('r') or family.startswith('x'): category = 'memory' elif family.startswith('d') or family.startswith('i'): category = 'storage' elif family.startswith('p') or family.startswith('g'): category = 'accelerated' # Find current size index hierarchy = size_hierarchy[category] if size not in hierarchy: return None current_index = hierarchy.index(size) # Calculate target index target_index = current_index + steps if target_index >= len(hierarchy): return None # Get target size target_size = hierarchy[target_index] return f"{family}.{target_size}" except Exception as e: logger.error(f"Error downsizing instance: {str(e)}") return None def get_stopped_instances( region: Optional[str] = None, min_stopped_days: int = 7 ) -> Dict[str, Any]: """Identify stopped EC2 instances that could be terminated.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') # Use paginator for EC2 describe_instances paginator = ec2_client.get_paginator('describe_instances') page_iterator = paginator.paginate( Filters=[{'Name': 'instance-state-name', 'Values': ['stopped']}] ) stopped_instances = [] # Process each page of results for page in page_iterator: for reservation in page['Reservations']: for instance in reservation['Instances']: instance_details = { 'instance_id': instance['InstanceId'], 'instance_type': instance['InstanceType'], 'state': instance['State']['Name'], 'launch_time': instance.get('LaunchTime', '').isoformat() if instance.get('LaunchTime') else '', 'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}, 'recommendation': 'Consider terminating if no longer needed' } stopped_instances.append(instance_details) return { "status": "success", "data": { "stopped_instances": stopped_instances, "count": len(stopped_instances) }, "message": f"Found {len(stopped_instances)} stopped EC2 instances" } except Exception as e: return {"status": "error", "message": str(e)} def get_unattached_elastic_ips( region: Optional[str] = None ) -> Dict[str, Any]: """Identify unattached Elastic IP addresses.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') # Note: describe_addresses doesn't support pagination via paginator, unattached_eips = [] next_token = None while True: # Prepare pagination parameters params = {} if next_token: params['NextToken'] = next_token # Make the API call response = ec2_client.describe_addresses(**params) # Process results for address in response['Addresses']: if 'InstanceId' not in address and 'NetworkInterfaceId' not in address: eip_details = { 'allocation_id': address.get('AllocationId', 'unknown'), 'public_ip': address.get('PublicIp', 'unknown'), 'domain': address.get('Domain', 'unknown'), 'tags': {tag['Key']: tag['Value'] for tag in address.get('Tags', [])}, 'monthly_cost': 3.65, 'recommendation': 'Release if not needed' } unattached_eips.append(eip_details) # Check if there are more results if 'NextToken' in response: next_token = response['NextToken'] else: break total_monthly_cost = len(unattached_eips) * 3.65 return { "status": "success", "data": { "unattached_eips": unattached_eips, "count": len(unattached_eips), "total_monthly_cost": total_monthly_cost }, "message": f"Found {len(unattached_eips)} unattached Elastic IPs costing ${total_monthly_cost:.2f}/month" } except Exception as e: return {"status": "error", "message": str(e)} def get_old_generation_instances( region: Optional[str] = None ) -> Dict[str, Any]: """Identify old generation EC2 instances.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running', 'stopped']}] ) old_generations = ['t1', 't2', 'm1', 'm2', 'm3', 'c1', 'c3', 'r3', 'i2', 'hs1'] old_instances = [] for reservation in response['Reservations']: for instance in reservation['Instances']: instance_type = instance['InstanceType'] instance_family = instance_type.split('.')[0] if instance_family in old_generations: modern_equivalent = _get_modern_equivalent(instance_family) instance_details = { 'instance_id': instance['InstanceId'], 'instance_type': instance_type, 'instance_family': instance_family, 'state': instance['State']['Name'], 'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}, 'recommendation': { 'action': 'Upgrade to modern generation', 'suggested_family': modern_equivalent, 'benefits': 'Better performance, lower cost' } } old_instances.append(instance_details) return { "status": "success", "data": { "old_generation_instances": old_instances, "count": len(old_instances) }, "message": f"Found {len(old_instances)} old generation EC2 instances" } except Exception as e: return {"status": "error", "message": str(e)} def _get_modern_equivalent(old_family: str) -> str: """Get modern equivalent for old instance family.""" equivalents = { 't1': 't3', 't2': 't3', 'm1': 'm5', 'm2': 'm5', 'm3': 'm5', 'c1': 'c5', 'c3': 'c5', 'r3': 'r5', 'i2': 'i3', 'hs1': 'd3' } return equivalents.get(old_family, 'm5') def get_instances_without_detailed_monitoring( region: Optional[str] = None ) -> Dict[str, Any]: """Identify instances without detailed monitoring.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) instances_without_monitoring = [] for reservation in response['Reservations']: for instance in reservation['Instances']: monitoring_state = instance.get('Monitoring', {}).get('State', 'disabled') if monitoring_state != 'enabled': instance_details = { 'instance_id': instance['InstanceId'], 'instance_type': instance['InstanceType'], 'monitoring_state': monitoring_state, 'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}, 'recommendation': { 'action': 'Enable detailed monitoring', 'benefit': 'Better insights for right-sizing', 'additional_cost': '$2.10/month per instance' } } instances_without_monitoring.append(instance_details) return { "status": "success", "data": { "instances_without_monitoring": instances_without_monitoring, "count": len(instances_without_monitoring) }, "message": f"Found {len(instances_without_monitoring)} instances without detailed monitoring" } except Exception as e: return {"status": "error", "message": str(e)} def generate_right_sizing_report( underutilized_instances: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Generate a comprehensive right-sizing report. Args: underutilized_instances: List of underutilized instances Returns: Dictionary containing the right-sizing report """ try: # Calculate total savings total_monthly_savings = sum( instance.get('recommendation', {}).get('estimated_monthly_savings', 0) for instance in underutilized_instances ) # Group by instance family family_savings = {} for instance in underutilized_instances: if 'recommendation' not in instance: continue instance_type = instance['instance_type'] family = instance_type.split('.')[0] if family not in family_savings: family_savings[family] = {'count': 0, 'savings': 0} family_savings[family]['count'] += 1 family_savings[family]['savings'] += instance['recommendation'].get('estimated_monthly_savings', 0) # Sort instances by savings potential sorted_instances = sorted( [i for i in underutilized_instances if 'recommendation' in i], key=lambda x: x['recommendation'].get('estimated_monthly_savings', 0), reverse=True ) # Generate top recommendations top_recommendations = sorted_instances[:10] if len(sorted_instances) > 10 else sorted_instances return { "status": "success", "data": { "total_instances": len(underutilized_instances), "total_monthly_savings": total_monthly_savings, "family_savings": family_savings, "top_recommendations": top_recommendations }, "message": f"Generated right-sizing report with potential monthly savings of ${total_monthly_savings:.2f}" } except Exception as e: logger.error(f"Error generating right-sizing report: {str(e)}") return { "status": "error", "message": f"Error generating right-sizing report: {str(e)}" } # Additional EC2 Cost Framework Playbooks def get_graviton_compatible_instances( region: Optional[str] = None ) -> Dict[str, Any]: """Identify instances that can be migrated to Graviton processors.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) graviton_compatible = [] x86_families = ['m5', 'm4', 'c5', 'c4', 'r5', 'r4', 't3', 't2'] for reservation in response['Reservations']: for instance in reservation['Instances']: instance_type = instance['InstanceType'] family = instance_type.split('.')[0] if family in x86_families: graviton_equivalent = _get_graviton_equivalent(family) if graviton_equivalent: graviton_compatible.append({ 'instance_id': instance['InstanceId'], 'current_type': instance_type, 'graviton_equivalent': graviton_equivalent, 'estimated_savings': 0.2, 'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])} }) return { "status": "success", "data": { "graviton_compatible_instances": graviton_compatible, "count": len(graviton_compatible) }, "message": f"Found {len(graviton_compatible)} instances compatible with Graviton" } except Exception as e: return {"status": "error", "message": str(e)} def _get_graviton_equivalent(x86_family: str) -> Optional[str]: """Get Graviton equivalent for x86 instance family.""" graviton_map = { 'm5': 'm6g', 'm4': 'm6g', 'c5': 'c6g', 'c4': 'c6g', 'r5': 'r6g', 'r4': 'r6g', 't3': 't4g', 't2': 't4g' } return graviton_map.get(x86_family) def get_burstable_instances_analysis( region: Optional[str] = None, lookback_period_days: int = 14 ) -> Dict[str, Any]: """Analyze burstable instances for credit usage and optimization.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) cloudwatch_client = boto3.client('cloudwatch', region_name=region) else: ec2_client = boto3.client('ec2') cloudwatch_client = boto3.client('cloudwatch') response = ec2_client.describe_instances( Filters=[ {'Name': 'instance-state-name', 'Values': ['running']}, {'Name': 'instance-type', 'Values': ['t2.*', 't3.*', 't4g.*']} ] ) burstable_analysis = [] end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_period_days) for reservation in response['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] try: credit_response = cloudwatch_client.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUCreditBalance', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=start_time, EndTime=end_time, Period=86400, Statistics=['Average'] ) if credit_response['Datapoints']: avg_credits = sum(dp['Average'] for dp in credit_response['Datapoints']) / len(credit_response['Datapoints']) recommendation = 'optimal' if avg_credits < 50: recommendation = 'consider_unlimited_or_larger' elif avg_credits > 500: recommendation = 'consider_smaller_or_standard' burstable_analysis.append({ 'instance_id': instance_id, 'instance_type': instance_type, 'avg_credit_balance': round(avg_credits, 2), 'recommendation': recommendation }) else: logger.info(f"No credit balance data found for instance {instance_id}") except Exception as e: logger.warning(f"Error retrieving credit balance for instance {instance_id}: {str(e)}") # Add instance with error information for better tracking burstable_analysis.append({ 'instance_id': instance_id, 'instance_type': instance_type, 'error': f"Failed to retrieve credit data: {str(e)}", 'recommendation': 'manual_review_needed' }) return { "status": "success", "data": { "burstable_instances": burstable_analysis, "count": len(burstable_analysis) }, "message": f"Analyzed {len(burstable_analysis)} burstable instances" } except Exception as e: return {"status": "error", "message": str(e)} def get_spot_instance_opportunities( region: Optional[str] = None ) -> Dict[str, Any]: """Identify instances suitable for Spot pricing.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) spot_opportunities = [] for reservation in response['Reservations']: for instance in reservation['Instances']: tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])} is_fault_tolerant = ( 'batch' in str(tags).lower() or 'dev' in str(tags).lower() or 'test' in str(tags).lower() or instance.get('RootDeviceType') == 'instance-store' ) if is_fault_tolerant: spot_opportunities.append({ 'instance_id': instance['InstanceId'], 'instance_type': instance['InstanceType'], 'estimated_savings': 0.7, 'tags': tags, 'reason': 'fault_tolerant_workload' }) return { "status": "success", "data": { "spot_opportunities": spot_opportunities, "count": len(spot_opportunities) }, "message": f"Found {len(spot_opportunities)} instances suitable for Spot pricing" } except Exception as e: return {"status": "error", "message": str(e)} def get_unused_capacity_reservations( region: Optional[str] = None ) -> Dict[str, Any]: """Identify unused On-Demand Capacity Reservations.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_capacity_reservations( Filters=[{'Name': 'state', 'Values': ['active']}] ) unused_reservations = [] for reservation in response['CapacityReservations']: if reservation['AvailableInstanceCount'] == reservation['TotalInstanceCount']: unused_reservations.append({ 'reservation_id': reservation['CapacityReservationId'], 'instance_type': reservation['InstanceType'], 'instance_count': reservation['TotalInstanceCount'], 'availability_zone': reservation['AvailabilityZone'], 'monthly_cost': reservation['TotalInstanceCount'] * 100 }) return { "status": "success", "data": { "unused_reservations": unused_reservations, "count": len(unused_reservations), "total_monthly_waste": sum(r['monthly_cost'] for r in unused_reservations) }, "message": f"Found {len(unused_reservations)} unused capacity reservations" } except Exception as e: return {"status": "error", "message": str(e)} def get_scheduling_opportunities( region: Optional[str] = None ) -> Dict[str, Any]: """Identify instances suitable for scheduling optimization.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) scheduling_opportunities = [] for reservation in response['Reservations']: for instance in reservation['Instances']: tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])} is_schedulable = ( 'dev' in str(tags).lower() or 'test' in str(tags).lower() or 'staging' in str(tags).lower() ) if is_schedulable: scheduling_opportunities.append({ 'instance_id': instance['InstanceId'], 'instance_type': instance['InstanceType'], 'estimated_savings': 0.6, 'tags': tags, 'recommendation': 'implement_start_stop_schedule' }) return { "status": "success", "data": { "scheduling_opportunities": scheduling_opportunities, "count": len(scheduling_opportunities) }, "message": f"Found {len(scheduling_opportunities)} instances suitable for scheduling" } except Exception as e: return {"status": "error", "message": str(e)} def get_commitment_plan_recommendations( region: Optional[str] = None ) -> Dict[str, Any]: """Analyze instances for Reserved Instance and Savings Plans opportunities.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) commitment_opportunities = [] instance_usage = {} for reservation in response['Reservations']: for instance in reservation['Instances']: instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'].replace(tzinfo=None) uptime_days = (datetime.utcnow() - launch_time).days if instance_type not in instance_usage: instance_usage[instance_type] = [] instance_usage[instance_type].append({ 'instance_id': instance['InstanceId'], 'uptime_days': uptime_days }) for instance_type, instances in instance_usage.items(): stable_instances = [i for i in instances if i['uptime_days'] > 30] if len(stable_instances) >= 1: commitment_opportunities.append({ 'instance_type': instance_type, 'instance_count': len(stable_instances), 'recommendation': 'reserved_instance_or_savings_plan', 'estimated_savings': 0.3, 'commitment_term': '1_year' }) return { "status": "success", "data": { "commitment_opportunities": commitment_opportunities, "count": len(commitment_opportunities) }, "message": f"Found {len(commitment_opportunities)} commitment opportunities" } except Exception as e: return {"status": "error", "message": str(e)} def get_governance_violations( region: Optional[str] = None ) -> Dict[str, Any]: """Detect EC2 governance violations and policy non-compliance.""" try: if region: ec2_client = boto3.client('ec2', region_name=region) else: ec2_client = boto3.client('ec2') response = ec2_client.describe_instances() violations = [] for reservation in response['Reservations']: for instance in reservation['Instances']: tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])} required_tags = ['Environment', 'Owner', 'Project'] missing_tags = [tag for tag in required_tags if tag not in tags] if missing_tags: violations.append({ 'instance_id': instance['InstanceId'], 'violation_type': 'missing_required_tags', 'missing_tags': missing_tags, 'severity': 'medium' }) if instance['InstanceType'].startswith(('x1', 'r5.24xlarge', 'm5.24xlarge')): violations.append({ 'instance_id': instance['InstanceId'], 'violation_type': 'oversized_instance', 'instance_type': instance['InstanceType'], 'severity': 'high' }) return { "status": "success", "data": { "violations": violations, "count": len(violations) }, "message": f"Found {len(violations)} governance violations" } except Exception as e: return {"status": "error", "message": str(e)} def generate_comprehensive_ec2_report( region: Optional[str] = None ) -> Dict[str, Any]: """Generate comprehensive EC2 optimization report covering all playbooks.""" try: report_sections = { 'underutilized_instances': get_underutilized_instances(region), 'stopped_instances': get_stopped_instances(region), 'unattached_eips': get_unattached_elastic_ips(region), 'old_generation': get_old_generation_instances(region), 'graviton_compatible': get_graviton_compatible_instances(region), 'burstable_analysis': get_burstable_instances_analysis(region), 'spot_opportunities': get_spot_instance_opportunities(region), 'unused_reservations': get_unused_capacity_reservations(region), 'scheduling_opportunities': get_scheduling_opportunities(region), 'commitment_opportunities': get_commitment_plan_recommendations(region), 'governance_violations': get_governance_violations(region) } total_savings = 0 for section_name, section_data in report_sections.items(): if section_data.get('status') == 'success': data = section_data.get('data', {}) if 'total_monthly_savings' in data: total_savings += data['total_monthly_savings'] return { "status": "success", "data": { "report_sections": report_sections, "total_estimated_monthly_savings": total_savings, "region": region }, "message": f"Generated comprehensive EC2 optimization report with ${total_savings:.2f} potential monthly savings" } except Exception as e: return { "status": "error", "message": f"Error generating comprehensive report: {str(e)}" } # MCP Runbook Functions # These functions provide MCP-compatible interfaces for the EC2 optimization playbook @handle_aws_error async def run_ec2_right_sizing_analysis(arguments: Dict[str, Any]) -> List[TextContent]: """Run comprehensive EC2 right-sizing analysis with parallel execution and session storage.""" start_time = time.time() try: region = arguments.get("region") lookback_period_days = arguments.get("lookback_period_days", 14) cpu_threshold = arguments.get("cpu_threshold", 40.0) memory_threshold = arguments.get("memory_threshold") network_threshold = arguments.get("network_threshold") # Initialize service orchestrator for parallel execution and session management orchestrator = ServiceOrchestrator() # Define parallel service calls for EC2 analysis service_calls = [ { 'service': 'ec2', 'operation': 'underutilized_instances', 'function': get_underutilized_instances, 'args': { 'region': region, 'lookback_period_days': lookback_period_days, 'cpu_threshold': cpu_threshold, 'memory_threshold': memory_threshold, 'network_threshold': network_threshold } }, { 'service': 'ec2', 'operation': 'stopped_instances', 'function': get_stopped_instances, 'args': { 'region': region, 'min_stopped_days': 7 } } ] # Execute parallel analysis results = orchestrator.execute_parallel_analysis( service_calls=service_calls, store_results=True, timeout=120.0 ) # Add documentation links results = add_documentation_links(results, "ec2") execution_time = time.time() - start_time # Format response with metadata results["ec2_rightsizing"] = { "analysis_type": "comprehensive_ec2_rightsizing", "region": region, "lookback_period_days": lookback_period_days, "cpu_threshold": cpu_threshold, "session_id": results.get("report_metadata", {}).get("session_id"), "parallel_execution": True, "sql_storage": True } return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=results, message="EC2 right-sizing analysis completed successfully", analysis_type="ec2_rightsizing", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error in EC2 right-sizing analysis: {str(e)}") raise @handle_aws_error async def generate_ec2_right_sizing_report(arguments: Dict[str, Any]) -> List[TextContent]: """Generate detailed EC2 right-sizing report.""" start_time = time.time() try: region = arguments.get("region") include_cost_analysis = arguments.get("include_cost_analysis", True) output_format = arguments.get("output_format", "json") # Get data from playbooks underutilized_result = get_underutilized_instances(region=region) stopped_result = get_stopped_instances(region=region) # Generate comprehensive report report = generate_right_sizing_report( underutilized_instances=underutilized_result.get("underutilized_instances", []), stopped_instances=stopped_result.get("stopped_instances", []), include_cost_analysis=include_cost_analysis, output_format=output_format ) # Add documentation links report = add_documentation_links(report, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=report, message="EC2 right-sizing report generated successfully", analysis_type="ec2_report", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error generating EC2 report: {str(e)}") raise @handle_aws_error async def identify_stopped_ec2_instances(arguments: Dict[str, Any]) -> List[TextContent]: """Identify stopped EC2 instances that could be terminated.""" start_time = time.time() try: region = arguments.get("region") min_stopped_days = arguments.get("min_stopped_days", 7) result = get_stopped_instances( region=region, min_stopped_days=min_stopped_days ) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('stopped_instances', []))} stopped instances", analysis_type="ec2_stopped_instances", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying stopped instances: {str(e)}") raise @handle_aws_error async def identify_unattached_elastic_ips(arguments: Dict[str, Any]) -> List[TextContent]: """Identify unattached Elastic IP addresses.""" start_time = time.time() try: region = arguments.get("region") result = get_unattached_elastic_ips(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('unattached_eips', []))} unattached Elastic IPs", analysis_type="ec2_unattached_eips", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying unattached EIPs: {str(e)}") raise @handle_aws_error async def identify_old_generation_instances(arguments: Dict[str, Any]) -> List[TextContent]: """Identify old generation EC2 instances that should be upgraded.""" start_time = time.time() try: region = arguments.get("region") result = get_old_generation_instances(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('old_generation_instances', []))} old generation instances", analysis_type="ec2_old_generation", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying old generation instances: {str(e)}") raise @handle_aws_error async def identify_instances_without_monitoring(arguments: Dict[str, Any]) -> List[TextContent]: """Identify instances without detailed monitoring enabled.""" start_time = time.time() try: region = arguments.get("region") result = get_instances_without_detailed_monitoring(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('instances_without_monitoring', []))} instances without detailed monitoring", analysis_type="ec2_detailed_monitoring", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying instances without monitoring: {str(e)}") raise # Extended EC2 MCP Functions # These provide MCP interfaces for additional EC2 optimization functions @handle_aws_error async def identify_graviton_compatible_instances_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Identify instances compatible with Graviton processors.""" start_time = time.time() try: region = arguments.get("region") result = get_graviton_compatible_instances(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('graviton_compatible_instances', []))} Graviton-compatible instances", analysis_type="ec2_graviton_compatible", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying Graviton-compatible instances: {str(e)}") raise @handle_aws_error async def analyze_burstable_instances_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Analyze burstable instances for credit usage optimization.""" start_time = time.time() try: region = arguments.get("region") lookback_period_days = arguments.get("lookback_period_days", 14) result = get_burstable_instances_analysis( region=region, lookback_period_days=lookback_period_days ) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Analyzed {len(result.get('burstable_instances', []))} burstable instances", analysis_type="ec2_burstable_analysis", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error analyzing burstable instances: {str(e)}") raise @handle_aws_error async def identify_spot_opportunities_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Identify instances suitable for Spot pricing.""" start_time = time.time() try: region = arguments.get("region") result = get_spot_instance_opportunities(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('spot_opportunities', []))} Spot pricing opportunities", analysis_type="ec2_spot_opportunities", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying Spot opportunities: {str(e)}") raise @handle_aws_error async def identify_unused_reservations_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Identify unused On-Demand Capacity Reservations.""" start_time = time.time() try: region = arguments.get("region") result = get_unused_capacity_reservations(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('unused_reservations', []))} unused capacity reservations", analysis_type="ec2_unused_reservations", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying unused reservations: {str(e)}") raise @handle_aws_error async def identify_scheduling_opportunities_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Identify instances suitable for scheduling optimization.""" start_time = time.time() try: region = arguments.get("region") result = get_scheduling_opportunities(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('scheduling_opportunities', []))} scheduling opportunities", analysis_type="ec2_scheduling_opportunities", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying scheduling opportunities: {str(e)}") raise @handle_aws_error async def analyze_commitment_plans_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Analyze instances for Reserved Instance and Savings Plans opportunities.""" start_time = time.time() try: region = arguments.get("region") result = get_commitment_plan_recommendations(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('commitment_opportunities', []))} commitment plan opportunities", analysis_type="ec2_commitment_plans", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error analyzing commitment plans: {str(e)}") raise @handle_aws_error async def identify_governance_violations_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Detect EC2 governance violations and policy non-compliance.""" start_time = time.time() try: region = arguments.get("region") result = get_governance_violations(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message=f"Found {len(result.get('governance_violations', []))} governance violations", analysis_type="ec2_governance_violations", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error identifying governance violations: {str(e)}") raise @handle_aws_error async def generate_comprehensive_report_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Generate comprehensive EC2 optimization report covering all playbooks.""" start_time = time.time() try: region = arguments.get("region") result = generate_comprehensive_ec2_report(region=region) # Add documentation links result = add_documentation_links(result, "ec2") execution_time = time.time() - start_time return ResponseFormatter.to_text_content( ResponseFormatter.success_response( data=result, message="Comprehensive EC2 optimization report generated successfully", analysis_type="ec2_comprehensive_report", execution_time=execution_time ) ) except Exception as e: logger.error(f"Error generating comprehensive EC2 report: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server