Skip to main content
Glama
lambda_handler.py26.4 kB
""" AWS Pricing MCP Lambda Handler This Lambda function implements the Model Context Protocol (MCP) for AWS EC2 pricing data without the fastmcp dependency. It handles JSON-RPC requests and provides tools for finding EC2 instances based on specified criteria. """ import json import os import sys import traceback import gzip import urllib.request import time from typing import List, Dict, Any, Optional, Union from datetime import datetime from pathlib import Path # Set up error logging def log_error(message): print(f"ERROR: {message}", file=sys.stderr) # Global variable to store pricing data PRICING_DATA = None PRICING_DATA_LAST_UPDATE = 0 CACHE_DURATION = 3600 # Cache for 1 hour (3600 seconds) def download_and_cache_pricing_data(): """ Download pricing data from S3 and store it in memory. Returns the pricing data. """ global PRICING_DATA, PRICING_DATA_LAST_UPDATE current_time = time.time() # Check if we have cached data that's still valid if PRICING_DATA is not None and (current_time - PRICING_DATA_LAST_UPDATE) < CACHE_DURATION: return PRICING_DATA # S3 URL for pricing data pricing_url = "https://cloudfix-public-aws-pricing.s3.us-east-1.amazonaws.com/pricing/ec2_pricing.json.gz" try: # Download the gzipped file log_error(f"Downloading pricing data from {pricing_url}") # Download and decompress in memory with urllib.request.urlopen(pricing_url) as response: compressed_data = response.read() # Decompress the gzipped data decompressed_data = gzip.decompress(compressed_data) # Parse the JSON data PRICING_DATA = json.loads(decompressed_data.decode('utf-8')) PRICING_DATA_LAST_UPDATE = current_time log_error(f"Successfully downloaded and cached pricing data. Size: {len(decompressed_data)} bytes") return PRICING_DATA except Exception as e: log_error(f"Failed to download pricing data: {str(e)}") log_error(traceback.format_exc()) # If download fails and we have cached data, use it if PRICING_DATA is not None: log_error("Using cached pricing data as fallback") return PRICING_DATA # If all else fails, raise the original error raise # Initialize pricing data on module load try: download_and_cache_pricing_data() except Exception as e: log_error(f"Failed to initialize pricing data: {str(e)}") # Don't raise here - let the handler try to download when needed PLATFORM_TO_OP_CODE = { "Linux/UNIX": "", "Red Hat BYOL Linux": "00g0", "Red Hat Enterprise Linux": "0010", "Red Hat Enterprise Linux with HA": "1010", "Red Hat Enterprise Linux with SQL Server Standard and HA": "1014", "Red Hat Enterprise Linux with SQL Server Enterprise and HA": "1110", "Red Hat Enterprise Linux with SQL Server Standard": "0014", "Red Hat Enterprise Linux with SQL Server Web": "0210", "Red Hat Enterprise Linux with SQL Server Enterprise": "0110", "Linux with SQL Server Enterprise": "0100", "Linux with SQL Server Standard": "0004", "Linux with SQL Server Web": "0200", "SUSE Linux": "000g", "Windows": "0002", "Windows BYOL": "0800", "Windows with SQL Server Enterprise": "0102", "Windows with SQL Server Standard": "0006", "Windows with SQL Server Web": "0202", } PRICING_MODELS = ["On Demand", "1-yr No Upfront", "1-yr Partial Upfront", "1-yr All Upfront", "3-yr No Upfront", "3-yr Partial Upfront", "3-yr All Upfront"] TENANCIES = ["Shared", "Dedicated"] # MCP Server Info SERVER_INFO = { "name": "AWS EC2 Pricing MCP", "version": "1.0.0" } # MCP Protocol Version PROTOCOL_VERSION = "2025-03-26" def create_error_response(request_id: str, code: int, message: str, data: Optional[Dict] = None) -> Dict: """Create a JSON-RPC error response""" error = { "code": code, "message": message } if data: error["data"] = data return { "jsonrpc": "2.0", "id": request_id, "error": error } def create_success_response(request_id: str, result: Any) -> Dict: """Create a JSON-RPC success response""" return { "jsonrpc": "2.0", "id": request_id, "result": result } def handle_initialize(params: Dict) -> Dict: """Handle MCP initialize request""" return { "protocolVersion": PROTOCOL_VERSION, "capabilities": { "logging": {}, "prompts": { "listChanged": True }, "resources": { "subscribe": True, "listChanged": True }, "tools": { "listChanged": True } }, "serverInfo": SERVER_INFO, "instructions": "AWS EC2 Pricing MCP Server - Provides tools to find EC2 instances based on pricing and specifications" } def handle_ping() -> Dict: """Handle MCP ping request""" return {} def handle_tools_list(params: Dict) -> Dict: """Handle MCP tools/list request""" tools = [ { "name": "ec2_instances_pricing", "description": "Find AWS EC2 instances based on specified criteria including pricing, specifications, and filters", "inputSchema": { "type": "object", "properties": { "filter_region": { "type": "string", "description": "AWS region (default: us-east-1)", "default": "us-east-1" }, "filter_platform": { "type": "string", "description": "OS platform", "enum": list(PLATFORM_TO_OP_CODE.keys()), "default": "Linux/UNIX" }, "filter_tenancy": { "type": "string", "description": "Instance tenancy", "enum": TENANCIES, "default": "Shared" }, "filter_pricing_model": { "type": "string", "description": "Pricing model", "enum": PRICING_MODELS, "default": "On Demand" }, "filter_min_vcpu": { "type": "integer", "description": "Minimum number of vCPUs", "default": 0 }, "filter_min_ram": { "type": "number", "description": "Minimum amount of RAM in GB", "default": 0 }, "filter_min_gpu": { "type": "integer", "description": "Minimum number of GPUs", "default": 0 }, "filter_min_gpu_memory": { "type": "integer", "description": "Minimum GPU memory in GB", "default": 0 }, "filter_min_cpu_ghz": { "type": "number", "description": "Minimum CPU clock speed in GHz", "default": 0 }, "filter_min_network_performance": { "type": "integer", "description": "Minimum network performance in Mbps", "default": 0 }, "filter_min_ebs_throughput": { "type": "integer", "description": "Minimum dedicated EBS throughput in Mbps", "default": 0 }, "filter_min_ephemeral_storage": { "type": "integer", "description": "Minimum ephemeral storage in GB", "default": 0 }, "filter_max_price_per_hour": { "type": "number", "description": "Maximum price per hour in USD", "default": None }, "filter_family": { "type": "string", "description": "Filter by instance family (e.g., 'm5', 'c6g')", "default": "" }, "filter_size": { "type": "string", "description": "Filter by instance size (e.g., 'large', '2xlarge')", "default": "" }, "filter_processor": { "type": "string", "description": "Filter by physical processor (e.g., 'Graviton', 'Xeon', 'AMD')", "default": "" }, "sort_by": { "type": "string", "description": "Field to sort by", "enum": ["Price", "Clock Speed GHz", "vCPU cores", "Memory GB", "Ephemeral Storage GB", "Network Performance Mbps", "Dedicated EBS Throughput Mbps", "GPU cores", "GPU Memory GB"], "default": "Price" }, "sort_order": { "type": "string", "description": "Sort order", "enum": ["Ascending", "Descending"], "default": "Descending" }, "page_num": { "type": "integer", "description": "Page number for pagination", "default": 0 } } } } ] return { "tools": tools } def ec2_instances_pricing( filter_region: str = "us-east-1", filter_platform: str = "Linux/UNIX", filter_tenancy: str = "Shared", filter_pricing_model: str = "On Demand", filter_min_vcpu: int = 0, filter_min_ram: float = 0, filter_min_gpu: int = 0, filter_min_gpu_memory: int = 0, filter_min_cpu_ghz: float = 0, filter_min_network_performance: int = 0, filter_min_ebs_throughput: int = 0, filter_min_ephemeral_storage: int = 0, filter_max_price_per_hour: Optional[float] = None, filter_family: str = "", filter_size: str = "", filter_processor: str = "", sort_by: str = "Price", sort_order: str = "Descending", page_num: int = 0 ) -> List[Dict[str, Any]]: """ Find AWS EC2 instances based on specified criteria. Returns: - List of instances matching the criteria (5 per page). CloudFix RightSpend pricing is provided when using the flexible cRIs provided by RightSpend (a third-party solution). The benefit of RightSpend is that it 1) eliminates the need for complex forecasting or frequent consultations with engineering about usage fluctuations 2) removes the risk of unused reservations 3) provides 3-yr All Upfront discounts without the need for prepayment. """ global PRICING_DATA # Ensure we have pricing data if PRICING_DATA is None: try: download_and_cache_pricing_data() except Exception as e: raise RuntimeError(f"Failed to load pricing data: {str(e)}") # Get the operation code for the platform if filter_platform not in PLATFORM_TO_OP_CODE: raise ValueError(f"Invalid platform: {filter_platform}; valid platforms: {list(PLATFORM_TO_OP_CODE.keys())}") filter_op_code = PLATFORM_TO_OP_CODE.get(filter_platform, "") if filter_tenancy not in TENANCIES: raise ValueError(f"Invalid tenancy: {filter_tenancy}; valid tenancies: {list(TENANCIES)}") if filter_pricing_model not in PRICING_MODELS: raise ValueError(f"Invalid pricing model: {filter_pricing_model}; valid pricing models: {list(PRICING_MODELS)}") # Find matching instances on_demand_price_offset = 7 * TENANCIES.index(filter_tenancy) price_offset = on_demand_price_offset + PRICING_MODELS.index(filter_pricing_model) matching_instances = [] for family_name, family in PRICING_DATA.items(): # Filter by family if specified if filter_family and family_name != filter_family: continue # Filter by processor if specified if filter_processor and filter_processor.lower() not in family.get("Physical Processor", "").lower(): continue if family["Clock Speed, GHz"] < filter_min_cpu_ghz: continue for size_name, size in family["sizes"].items(): # Filter by size if specified if filter_size and size_name != filter_size: continue # Check if the instance meets the minimum requirements if size.get("vCPU, cores", 0) < filter_min_vcpu: continue if size.get("Memory, GB", 0) < filter_min_ram: continue if size.get("GPU, cores", 0) < filter_min_gpu: continue if size.get("GPU Memory, GB", 0) < filter_min_gpu_memory: continue if size.get("Network Performance, Mbps", 0) < filter_min_network_performance: continue if size.get("Dedicated EBS Throughput, Mbps", 0) < filter_min_ebs_throughput: continue if size.get("Ephemeral Storage, GB", 0) < filter_min_ephemeral_storage: continue if "operations" not in size: raise ValueError(f"Instance {family_name}.{size_name} does not have operations") for op_code, regions in size["operations"].items(): if op_code != filter_op_code: continue for region, prices in regions.items(): if region != filter_region: continue price_arr = prices.split(",") if len(price_arr) < price_offset: continue price = price_arr[price_offset] if price == "": continue on_demand_price = price_arr[on_demand_price_offset] instance = { "Instance Type": f"{family_name}.{size_name}", "Region": filter_region, "Platform": filter_platform, "Tenancy": filter_tenancy, "Pricing Model": filter_pricing_model, "Effective Price per hour, USD": float(price), "Effective Price per month, USD": round(float(price) * 24 * 365 / 12, 2), "Effective Price per year, USD": round(float(price) * 24 * 365, 2) } if filter_pricing_model == "1-yr Partial Upfront": instance["Upfront Payment, USD"] = round(float(price) * 24 * 365 / 2, 2) elif filter_pricing_model == "1-yr All Upfront": instance["Upfront Payment, USD"] = round(float(price) * 24 * 365, 2) elif filter_pricing_model == "3-yr Partial Upfront": instance["Upfront Payment, USD"] = round(float(price) * 24 * 365 * 3 / 2, 2) elif filter_pricing_model == "3-yr All Upfront": instance["Upfront Payment, USD"] = round(float(price) * 24 * 365 * 3, 2) if on_demand_price and on_demand_price != price: instance["On-Demand Price per hour, USD"] = float(on_demand_price) instance["Discount Percentage"] = (1 - (float(price) / float(on_demand_price))) * 100 if len(price_arr) > on_demand_price_offset+6: cloudfix_rightspend_price = price_arr[on_demand_price_offset+6] if cloudfix_rightspend_price != "": instance["CloudFix RightSpend Price per hour, USD"] = float(cloudfix_rightspend_price) instance["CloudFix RightSpend Price per month, USD"] = round(float(cloudfix_rightspend_price) * 24 * 365 / 12, 2) instance["CloudFix RightSpend Price per year, USD"] = round(float(cloudfix_rightspend_price) * 24 * 365, 2) instance["CloudFix RightSpend Upfront Payment, USD"] = round(float(cloudfix_rightspend_price) * 24 * 7, 2) instance.update({key: value for key, value in family.items() if key != "sizes"}) instance.update({key: value for key, value in size.items() if key != "operations"}) matching_instances.append(instance) # Filter by max price if specified if filter_max_price_per_hour is not None: matching_instances = [i for i in matching_instances if i["Effective Price per hour, USD"] <= filter_max_price_per_hour] # Define sort key mapping sort_key_map = { "Price": "Effective Price per hour, USD", "Clock Speed GHz": "Clock Speed, GHz", "vCPU cores": "vCPU, cores", "Memory GB": "Memory, GB", "Ephemeral Storage GB": "Ephemeral Storage, GB", "Network Performance Mbps": "Network Performance, Mbps", "Dedicated EBS Throughput Mbps": "Dedicated EBS Throughput, Mbps", "GPU cores": "GPU, cores", "GPU Memory GB": "GPU Memory, GB" } # Sort by selected field if sort_by not in sort_key_map: raise ValueError(f"Invalid sort by: {sort_by}; valid sort by: {list(sort_key_map.keys())}") sort_key = sort_key_map.get(sort_by, "Effective Price per hour, USD") # Two-pass sorting approach: # 1. First sort by price (ascending) - this will be our secondary sort matching_instances.sort(key=lambda x: x.get("Effective Price per hour, USD", 0)) # 2. Then sort by the primary field with the specified direction # Since Python's sort is stable, when primary fields are equal, the price order is preserved matching_instances.sort( key=lambda x: x.get(sort_key, 0), reverse=(sort_order == "Descending") ) # Calculate pagination items_per_page = 5 start_idx = page_num * items_per_page end_idx = start_idx + items_per_page # Return the requested page return matching_instances[start_idx:end_idx] def handle_tools_call(params: Dict) -> Dict: """Handle MCP tools/call request""" try: tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "ec2_instances_pricing": result = ec2_instances_pricing(**arguments) return { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ], "isError": False } else: return { "content": [ { "type": "text", "text": f"Unknown tool: {tool_name}" } ], "isError": True } except Exception as e: return { "content": [ { "type": "text", "text": f"Error executing tool: {str(e)}" } ], "isError": True } def handle_resources_list(params: Dict) -> Dict: """Handle MCP resources/list request""" return { "resources": [] } def handle_resources_read(params: Dict) -> Dict: """Handle MCP resources/read request""" return { "contents": [] } def handle_prompts_list(params: Dict) -> Dict: """Handle MCP prompts/list request""" return { "prompts": [] } def handle_prompts_get(params: Dict) -> Dict: """Handle MCP prompts/get request""" return { "description": "", "messages": [] } def process_mcp_request(request: Dict) -> Dict: """ Process MCP protocol request and return response Args: request: JSON-RPC request object Returns: JSON-RPC response object """ try: # Extract request details jsonrpc = request.get("jsonrpc") request_id = request.get("id") method = request.get("method") params = request.get("params", {}) # Validate JSON-RPC version if jsonrpc != "2.0": return create_error_response( request_id, -32600, "Invalid Request: jsonrpc must be '2.0'" ) # Handle different MCP methods if method == "initialize": result = handle_initialize(params) return create_success_response(request_id, result) elif method == "ping": result = handle_ping() return create_success_response(request_id, result) elif method == "tools/list": result = handle_tools_list(params) return create_success_response(request_id, result) elif method == "tools/call": result = handle_tools_call(params) return create_success_response(request_id, result) elif method == "resources/list": result = handle_resources_list(params) return create_success_response(request_id, result) elif method == "resources/read": result = handle_resources_read(params) return create_success_response(request_id, result) elif method == "prompts/list": result = handle_prompts_list(params) return create_success_response(request_id, result) elif method == "prompts/get": result = handle_prompts_get(params) return create_success_response(request_id, result) elif method == "notifications/initialized": # This is a notification, no response needed return None else: return create_error_response( request_id, -32601, f"Method not found: {method}" ) except json.JSONDecodeError as e: return create_error_response( None, -32700, f"Parse error: {str(e)}" ) except Exception as e: log_error(f"Unexpected error: {str(e)}") log_error(traceback.format_exc()) return create_error_response( request_id if 'request_id' in locals() else None, -32603, f"Internal error: {str(e)}" ) def lambda_handler(event: Dict, context: Any) -> Dict: """ AWS Lambda handler for HTTP requests via Function URL Args: event: Lambda event containing HTTP request details context: Lambda context object Returns: HTTP response with JSON-RPC response """ try: # Handle CORS preflight requests if event.get("requestContext", {}).get("http", {}).get("method") == "OPTIONS": return { "statusCode": 200, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Credentials": "true" }, "body": "" } # Get the HTTP method and body http_method = event.get("requestContext", {}).get("http", {}).get("method", "POST") body = event.get("body", "{}") # Parse the request body if isinstance(body, str): try: request = json.loads(body) except json.JSONDecodeError: return { "statusCode": 400, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }, "body": json.dumps({ "error": "Invalid JSON in request body" }) } else: request = body # Process the MCP request response = process_mcp_request(request) # Return HTTP response if response is None: # For notifications, return empty success return { "statusCode": 200, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }, "body": "" } else: return { "statusCode": 200, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }, "body": json.dumps(response) } except Exception as e: log_error(f"Unexpected error in lambda_handler: {str(e)}") log_error(traceback.format_exc()) return { "statusCode": 500, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }, "body": json.dumps({ "error": "Internal server error", "message": str(e) }) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/trilogy-group/aws-pricing-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server