Skip to main content
Glama
aarora79

AWS Cost Explorer MCP Server

get_bedrock_daily_usage_stats

Retrieve daily AWS Bedrock usage statistics with detailed breakdowns for cost analysis and monitoring.

Instructions

Get daily usage statistics with detailed breakdowns.

Args:
    params: Parameters specifying the number of days to look back and region

Returns:
    str: Formatted string representation of daily usage statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • The main handler function for the 'get_bedrock_daily_usage_stats' tool. Decorated with @mcp.tool(), it retrieves Bedrock logs, processes them into daily statistics grouped by date, region, model, and user, and returns a formatted markdown table summary.
    @mcp.tool()
    def get_bedrock_daily_usage_stats(params: BedrockLogsParams) -> str:
        """
        Get daily usage statistics with detailed breakdowns.
    
        Args:
            params: Parameters specifying the number of days to look back and region
    
        Returns:
            str: Formatted string representation of daily usage statistics
        """
        print(f"get_bedrock_daily_usage_stats, params={params}")
        df = get_bedrock_logs(params)
    
        if df is None or df.empty:
            return "No usage data found for the specified period."
        
        # Initialize result string
        result_parts = []
        
        # Add header
        result_parts.append(f"Bedrock Usage Statistics (Past {params.days} days - {params.region})")
        result_parts.append("=" * 80)
        
        # Add a date column for easier grouping
        df['date'] = df['timestamp'].dt.date
        
        # === REGION -> MODEL GROUPING ===
        result_parts.append("\n=== Daily Region-wise -> Model-wise Analysis ===")
        
        # Group by date, region, model and calculate metrics
        region_model_stats = df.groupby(['date', 'region', 'modelId']).agg({
            'inputTokens': ['count', 'sum', 'mean', 'max', 'median'],
            'completionTokens': ['sum', 'mean', 'max', 'median'],
            'totalTokens': ['sum', 'mean', 'max', 'median']
        })
        
        # Flatten the column multi-index
        region_model_stats.columns = [f"{col[0]}_{col[1]}" for col in region_model_stats.columns]
        
        # Reset the index to get a flat dataframe
        flattened_stats = region_model_stats.reset_index()
        
        # Rename inputTokens_count to request_count
        flattened_stats = flattened_stats.rename(columns={'inputTokens_count': 'request_count'})
        
        # Add the flattened stats to result
        result_parts.append(flattened_stats.to_string(index=False))
        
        # Add summary statistics
        result_parts.append("\n=== Summary Statistics ===")
        
        # Total requests and tokens
        total_requests = flattened_stats['request_count'].sum()
        total_input_tokens = flattened_stats['inputTokens_sum'].sum()
        total_completion_tokens = flattened_stats['completionTokens_sum'].sum()
        total_tokens = flattened_stats['totalTokens_sum'].sum()
        
        result_parts.append(f"Total Requests: {total_requests:,}")
        result_parts.append(f"Total Input Tokens: {total_input_tokens:,}")
        result_parts.append(f"Total Completion Tokens: {total_completion_tokens:,}")
        result_parts.append(f"Total Tokens: {total_tokens:,}")
        
        # === REGION SUMMARY ===
        result_parts.append("\n=== Region Summary ===")
        region_summary = df.groupby('region').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten region summary columns
        region_summary.columns = [f"{col[0]}_{col[1]}" for col in region_summary.columns]
        region_summary = region_summary.reset_index()
        region_summary = region_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        result_parts.append(region_summary.to_string(index=False))
        
        # === MODEL SUMMARY ===
        result_parts.append("\n=== Model Summary ===")
        model_summary = df.groupby('modelId').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten model summary columns
        model_summary.columns = [f"{col[0]}_{col[1]}" for col in model_summary.columns]
        model_summary = model_summary.reset_index()
        model_summary = model_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        # Format model IDs to be more readable
        model_summary['modelId'] = model_summary['modelId'].apply(
            lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
        )
        
        result_parts.append(model_summary.to_string(index=False))
        
        # === USER SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== User Summary ===")
            user_summary = df.groupby('userId').agg({
                'inputTokens': ['count', 'sum'],
                'completionTokens': ['sum'],
                'totalTokens': ['sum']
            })
            
            # Flatten user summary columns
            user_summary.columns = [f"{col[0]}_{col[1]}" for col in user_summary.columns]
            user_summary = user_summary.reset_index()
            user_summary = user_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            result_parts.append(user_summary.to_string(index=False))
            
        # === REGION -> USER -> MODEL DETAILED SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== Region -> User -> Model Detailed Summary ===")
            region_user_model_summary = df.groupby(['region', 'userId', 'modelId']).agg({
                'inputTokens': ['count', 'sum', 'mean'],
                'completionTokens': ['sum', 'mean'],
                'totalTokens': ['sum', 'mean']
            })
            
            # Flatten columns
            region_user_model_summary.columns = [f"{col[0]}_{col[1]}" for col in region_user_model_summary.columns]
            region_user_model_summary = region_user_model_summary.reset_index()
            region_user_model_summary = region_user_model_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            # Format model IDs to be more readable
            region_user_model_summary['modelId'] = region_user_model_summary['modelId'].apply(
                lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
            )
            
            result_parts.append(region_user_model_summary.to_string(index=False))
    
        
        # Combine all parts into a single string
        result = "\n".join(result_parts)
        
        return result
  • Pydantic BaseModel schema defining the input parameters for the tool: days (1-90), region, log_group_name, and optional aws_account_id.
    class BedrockLogsParams(BaseModel):
        """Parameters for retrieving Bedrock invocation logs."""
        days: int = Field(
            default=7,
            description="Number of days to look back for Bedrock logs",
            ge=1,
            le=90
        )
        region: str = Field(
            default="us-east-1",
            description="AWS region to retrieve logs from"
        )
        log_group_name: str = Field(
            description="Bedrock Log Group Name",
            default=os.environ.get('BEDROCK_LOG_GROUP_NAME', 'BedrockModelInvocationLogGroup')
        )
        aws_account_id: Optional[str] = Field(        
            description="AWS account id (if different from the current AWS account) of the account for which to get the cost data",
            default=None
        )
  • Supporting function that queries CloudWatch Logs for Bedrock model invocation events, parses the JSON log messages to extract timestamp, modelId, userId, input/completion/total tokens, and returns a structured pandas DataFrame.
    def get_bedrock_logs(params: BedrockLogsParams) -> Optional[pd.DataFrame]:
        """
        Retrieve Bedrock invocation logs for the last n days in a given region as a dataframe
    
        Args:
            params: Pydantic model containing parameters:
                - days: Number of days to look back (default: 7)
                - region: AWS region to query (default: us-east-1)
    
        Returns:
            pd.DataFrame: DataFrame containing the log data with columns:
                - timestamp: Timestamp of the invocation
                - region: AWS region
                - modelId: Bedrock model ID
                - userId: User ARN
                - inputTokens: Number of input tokens
                - completionTokens: Number of completion tokens
                - totalTokens: Total tokens used
        """
        # Initialize CloudWatch Logs client
        print(f"get_bedrock_logs, params={params}")
        client = get_aws_service_boto3_client("logs", params.aws_account_id, params.region)
    
        # Calculate time range
        end_time = datetime.now()
        start_time = end_time - timedelta(days=params.days)
    
        # Convert to milliseconds since epoch
        start_time_ms = int(start_time.timestamp() * 1000)
        end_time_ms = int(end_time.timestamp() * 1000)
    
        filtered_logs = []
    
        try:
            paginator = client.get_paginator("filter_log_events")
    
            # Parameters for the log query        
            query_params = {
                "logGroupName": params.log_group_name,  # Use the provided log group name
                "logStreamNames": [
                    "aws/bedrock/modelinvocations"
                ],  # The specific log stream
                "startTime": start_time_ms,
                "endTime": end_time_ms,
            }
            
            # Paginate through results
            for page in paginator.paginate(**query_params):
                for event in page.get("events", []):
                    try:
                        # Parse the message as JSON
    
                        message = json.loads(event["message"])
    
                        # Get user prompt from the input messages
                        prompt = ""
          
                        input = message.get("input", {})
                        input_json = input.get("inputBodyJson", {})
                        messages = input_json.get("messages", None)
    
                        if messages:
                            for msg in message["input"]["inputBodyJson"]["messages"]:
                                #print(f"debug 2.2, {type(msg)}")
                                if msg.get("role") == "user" and msg.get("content"):
                                    for content in msg["content"]:
    
                                        if isinstance(content, dict):
                                            if content.get("text"):
                                                prompt += content["text"] + " "
                                        else:
                                            prompt += content
    
                            prompt = prompt.strip()
    
                        # Extract only the required fields
    
                        filtered_event = {
                            "timestamp": message.get("timestamp"),
                            "region": message.get("region"),
                            "modelId": message.get("modelId"),
                            "userId": message.get("identity", {}).get("arn"),
                            "inputTokens": message.get("input", {}).get("inputTokenCount"),
                            "completionTokens": message.get("output", {}).get(
                                "outputTokenCount"
                            ),
                            "totalTokens": (
                                message.get("input", {}).get("inputTokenCount", 0)
                                + message.get("output", {}).get("outputTokenCount", 0)
                            ),
                        }
    
                        filtered_logs.append(filtered_event)
                    except json.JSONDecodeError:
                        continue  # Skip non-JSON messages
                    except KeyError:
                        continue  # Skip messages missing required fields
            
            # Create DataFrame if we have logs
            if filtered_logs:
                df = pd.DataFrame(filtered_logs)
                df["timestamp"] = pd.to_datetime(df["timestamp"])
                return df
            else:
                print("No logs found for the specified time period.")
                return None
    
        except client.exceptions.ResourceNotFoundException:
            print(
                f"Log group '{params.log_group_name}' or stream 'aws/bedrock/modelinvocations' not found"
            )
            return None
        except Exception as e:
            print(f"Error retrieving logs: {str(e)}")
            return None
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries full burden. It mentions 'detailed breakdowns' and that it returns a 'formatted string representation,' but doesn't disclose important behavioral traits like whether this is a read-only operation, requires specific AWS permissions, has rate limits, or what format the string output takes. For a tool with no annotation coverage, this leaves significant gaps in understanding how it behaves.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately concise with three sentences that are front-loaded: the first states the purpose, followed by Args and Returns sections. There's minimal waste, though the parameter explanation could be more informative. The structure is clear but could better integrate parameter details into the flow.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (4 nested parameters, no annotations, no output schema), the description is incomplete. It doesn't adequately explain the parameters, behavioral aspects, or output format. For a tool that likely interacts with AWS Bedrock and returns usage statistics, more context on permissions, data scope, and result interpretation is needed to be fully helpful.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The schema description coverage is 0%, meaning parameters are undocumented in the schema. The description only vaguely mentions 'Parameters specifying the number of days to look back and region,' but doesn't explain the actual parameters (days, region, log_group_name, aws_account_id) or their purposes. It fails to compensate for the schema's lack of descriptions, leaving most parameters semantically unclear.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Get daily usage statistics with detailed breakdowns.' It specifies the verb ('Get') and resource ('daily usage statistics'), and distinguishes from the hourly sibling tool by emphasizing 'daily' usage. However, it doesn't explicitly differentiate from 'get_detailed_breakdown_by_day' which might provide similar functionality.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It doesn't mention the sibling tools (get_bedrock_hourly_usage_stats, get_detailed_breakdown_by_day, get_ec2_spend_last_day) or explain when daily statistics are preferred over hourly ones or other breakdowns. The only implied context is the need for 'daily' statistics, but no explicit usage rules are provided.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aarora79/aws-cost-explorer-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server