Skip to main content
Glama
aarora79

AWS Cost Explorer MCP Server

get_detailed_breakdown_by_day

Analyze AWS daily spending by region, service, and instance type to identify cost drivers and optimize cloud expenses.

Instructions

Retrieve daily spend breakdown by region, service, and instance type. Args: params: Parameters specifying the number of days to look back Returns: Dict[str, Any]: A tuple containing: - A nested dictionary with cost data organized by date, region, and service - A string containing the formatted output report or (None, error_message) if an error occurs.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • Main asynchronous handler function implementing the tool logic. Uses AWS Cost Explorer API to fetch daily cost breakdowns by region and service, with additional instance type breakdowns for EC2 and SageMaker. Formats output as a detailed tabular report.
    @mcp.tool() async def get_detailed_breakdown_by_day(params: EC2Params) -> str: #Dict[str, Any]: """ Retrieve daily spend breakdown by region, service, and instance type. Args: params: Parameters specifying the number of days to look back Returns: Dict[str, Any]: A tuple containing: - A nested dictionary with cost data organized by date, region, and service - A string containing the formatted output report or (None, error_message) if an error occurs. """ print(f"get_detailed_breakdown_by_day, params={params}") # Initialize the Cost Explorer client ce_client = get_aws_service_boto3_client("ce", params.aws_account_id, params.region) # Get the days parameter days = params.days # Calculate the time period end_date = datetime.now().strftime('%Y-%m-%d') start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Initialize output buffer output_buffer = [] try: output_buffer.append(f"\nDetailed Cost Breakdown by Region, Service, and Instance Type ({days} days):") output_buffer.append("-" * 75) # First get the daily costs by region and service response = ce_client.get_cost_and_usage( TimePeriod={ 'Start': start_date, 'End': end_date }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ { 'Type': 'DIMENSION', 'Key': 'REGION' }, { 'Type': 'DIMENSION', 'Key': 'SERVICE' } ] ) # Create data structure to hold the results all_data = defaultdict(lambda: defaultdict(lambda: defaultdict(float))) # Process the results for time_data in response['ResultsByTime']: date = time_data['TimePeriod']['Start'] output_buffer.append(f"\nDate: {date}") output_buffer.append("=" * 50) if 'Groups' in time_data and time_data['Groups']: # Create data structure for this date region_services = defaultdict(lambda: defaultdict(float)) # Process groups for group in time_data['Groups']: region, service = group['Keys'] cost = float(group['Metrics']['UnblendedCost']['Amount']) currency = group['Metrics']['UnblendedCost']['Unit'] region_services[region][service] = cost all_data[date][region][service] = cost # Add the results for this date to the buffer for region in sorted(region_services.keys()): output_buffer.append(f"\nRegion: {region}") output_buffer.append("-" * 40) # Create a DataFrame for this region's services services_df = pd.DataFrame({ 'Service': list(region_services[region].keys()), 'Cost': list(region_services[region].values()) }) # Sort by cost descending services_df = services_df.sort_values('Cost', ascending=False) # Get top services by cost top_services = services_df.head(5) # Add region's services table to buffer output_buffer.append(tabulate(top_services.round(2), headers='keys', tablefmt='pretty', showindex=False)) # If there are more services, indicate the total for other services if len(services_df) > 5: other_cost = services_df.iloc[5:]['Cost'].sum() output_buffer.append(f"... and {len(services_df) - 5} more services totaling {other_cost:.2f} {currency}") # For EC2, get instance type breakdown if any(s.startswith('Amazon Elastic Compute') for s in region_services[region].keys()): try: instance_response = get_instance_type_breakdown( ce_client, date, region, 'Amazon Elastic Compute Cloud - Compute', 'INSTANCE_TYPE' ) if instance_response: output_buffer.append("\n EC2 Instance Type Breakdown:") output_buffer.append(" " + "-" * 38) # Get table with indentation instance_table = tabulate(instance_response.round(2), headers='keys', tablefmt='pretty', showindex=False) for line in instance_table.split('\n'): output_buffer.append(f" {line}") except Exception as e: output_buffer.append(f" Note: Could not retrieve EC2 instance type breakdown: {str(e)}") # For SageMaker, get instance type breakdown if any(s == 'Amazon SageMaker' for s in region_services[region].keys()): try: sagemaker_instance_response = get_instance_type_breakdown( ce_client, date, region, 'Amazon SageMaker', 'INSTANCE_TYPE' ) if sagemaker_instance_response is not None and not sagemaker_instance_response.empty: output_buffer.append("\n SageMaker Instance Type Breakdown:") output_buffer.append(" " + "-" * 38) # Get table with indentation sagemaker_table = tabulate(sagemaker_instance_response.round(2), headers='keys', tablefmt='pretty', showindex=False) for line in sagemaker_table.split('\n'): output_buffer.append(f" {line}") # Also try to get usage type breakdown for SageMaker (notebooks, endpoints, etc.) sagemaker_usage_response = get_instance_type_breakdown( ce_client, date, region, 'Amazon SageMaker', 'USAGE_TYPE' ) if sagemaker_usage_response is not None and not sagemaker_usage_response.empty: output_buffer.append("\n SageMaker Usage Type Breakdown:") output_buffer.append(" " + "-" * 38) # Get table with indentation usage_table = tabulate(sagemaker_usage_response.round(2), headers='keys', tablefmt='pretty', showindex=False) for line in usage_table.split('\n'): output_buffer.append(f" {line}") except Exception as e: output_buffer.append(f" Note: Could not retrieve SageMaker breakdown: {str(e)}") else: output_buffer.append("No data found for this date") output_buffer.append("\n" + "-" * 75) # Join the buffer into a single string formatted_output = "\n".join(output_buffer) # Return both the raw data and the formatted output #return {"data": all_data, "formatted_output": formatted_output} return formatted_output except Exception as e: error_message = f"Error retrieving detailed breakdown: {str(e)}" #return {"data": None, "formatted_output": error_message} return error_message
  • Pydantic BaseModel schema defining input parameters for the tool: days (1-90), region (default us-east-1), optional aws_account_id for cross-account access.
    class EC2Params(BaseModel): """Parameters for retrieving EC2 Cost Explorer information.""" days: int = Field( default=1, description="Number of days to look back for Bedrock logs", ge=1, le=90 ) region: str = Field( default="us-east-1", description="AWS region to retrieve logs from" ) aws_account_id: Optional[str] = Field( description="AWS account id (if different from the current AWS account) of the account for which to get the cost data", default=None )
  • server.py:735-735 (registration)
    The @mcp.tool() decorator registers the get_detailed_breakdown_by_day function as an MCP tool.
    @mcp.tool()
  • Helper function called within the handler to fetch detailed breakdowns by INSTANCE_TYPE or USAGE_TYPE for services like EC2 and SageMaker.
    def get_instance_type_breakdown(ce_client, date, region, service, dimension_key): """ Helper function to get instance type or usage type breakdown for a specific service. Args: ce_client: The Cost Explorer client date: The date to query region: The AWS region service: The AWS service name dimension_key: The dimension to group by (e.g., 'INSTANCE_TYPE' or 'USAGE_TYPE') Returns: DataFrame containing the breakdown or None if no data """ tomorrow = (datetime.strptime(date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') instance_response = ce_client.get_cost_and_usage( TimePeriod={ 'Start': date, 'End': tomorrow }, Granularity='DAILY', Filter={ 'And': [ { 'Dimensions': { 'Key': 'REGION', 'Values': [region] } }, { 'Dimensions': { 'Key': 'SERVICE', 'Values': [service] } } ] }, Metrics=['UnblendedCost'], GroupBy=[ { 'Type': 'DIMENSION', 'Key': dimension_key } ] ) if ('ResultsByTime' in instance_response and instance_response['ResultsByTime'] and 'Groups' in instance_response['ResultsByTime'][0] and instance_response['ResultsByTime'][0]['Groups']): instance_data = instance_response['ResultsByTime'][0] instance_costs = [] for instance_group in instance_data['Groups']: type_value = instance_group['Keys'][0] cost_value = float(instance_group['Metrics']['UnblendedCost']['Amount']) # Add a better label for the dimension used column_name = 'Instance Type' if dimension_key == 'INSTANCE_TYPE' else 'Usage Type' instance_costs.append({ column_name: type_value, 'Cost': cost_value }) # Create DataFrame and sort by cost result_df = pd.DataFrame(instance_costs) if not result_df.empty: result_df = result_df.sort_values('Cost', ascending=False) return result_df return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aarora79/aws-cost-explorer-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server