Skip to main content
Glama
aarora79

AWS Cost Explorer MCP Server

get_bedrock_hourly_usage_stats

Retrieve hourly AWS Bedrock usage statistics with detailed breakdowns to monitor model invocation patterns and analyze cloud spending trends.

Instructions

Get hourly usage statistics with detailed breakdowns.

Args:
    params: Parameters specifying the number of days to look back and region

Returns:
    str: Formatted string representation of hourly usage statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • Main handler function implementing the get_bedrock_hourly_usage_stats tool. Retrieves Bedrock logs, performs hourly grouping and analysis on tokens and requests, generates detailed tabular reports for hourly, region-model, user breakdowns, and usage patterns.
    @mcp.tool()
    def get_bedrock_hourly_usage_stats(params: BedrockLogsParams) -> str:
        """
        Get hourly usage statistics with detailed breakdowns.
    
        Args:
            params: Parameters specifying the number of days to look back and region
    
        Returns:
            str: Formatted string representation of hourly usage statistics
        """
        print(f"get_bedrock_hourly_usage_stats, params={params}")
        df = get_bedrock_logs(params)
    
        if df is None or df.empty:
            return "No usage data found for the specified period."
        
        # Initialize result string
        result_parts = []
        
        # Add header
        result_parts.append(f"Hourly Bedrock Usage Statistics (Past {params.days} days - {params.region})")
        result_parts.append("=" * 80)
        
        # Add date and hour columns for easier grouping
        df['date'] = df['timestamp'].dt.date
        df['hour'] = df['timestamp'].dt.hour
        df['datetime'] = df['timestamp'].dt.strftime('%Y-%m-%d %H:00')
        
        # === HOURLY USAGE ANALYSIS ===
        result_parts.append("\n=== Hourly Usage Analysis ===")
        
        # Group by datetime (date + hour)
        hourly_stats = df.groupby('datetime').agg({
            'inputTokens': ['count', 'sum', 'mean'],
            'completionTokens': ['sum', 'mean'],
            'totalTokens': ['sum', 'mean']
        })
        
        # Flatten the column multi-index
        hourly_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_stats.columns]
        
        # Reset the index to get a flat dataframe
        hourly_stats = hourly_stats.reset_index()
        
        # Rename inputTokens_count to request_count
        hourly_stats = hourly_stats.rename(columns={'inputTokens_count': 'request_count'})
        
        # Add the hourly stats to result
        result_parts.append(hourly_stats.to_string(index=False))
        
        # === HOURLY REGION -> MODEL GROUPING ===
        result_parts.append("\n=== Hourly Region-wise -> Model-wise Analysis ===")
        
        # Group by datetime, region, model and calculate metrics
        hourly_region_model_stats = df.groupby(['datetime', 'region', 'modelId']).agg({
            'inputTokens': ['count', 'sum', 'mean', 'max', 'median'],
            'completionTokens': ['sum', 'mean', 'max', 'median'],
            'totalTokens': ['sum', 'mean', 'max', 'median']
        })
        
        # Flatten the column multi-index
        hourly_region_model_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_model_stats.columns]
        
        # Reset the index to get a flat dataframe
        hourly_region_model_stats = hourly_region_model_stats.reset_index()
        
        # Rename inputTokens_count to request_count
        hourly_region_model_stats = hourly_region_model_stats.rename(columns={'inputTokens_count': 'request_count'})
        
        # Format model IDs to be more readable
        hourly_region_model_stats['modelId'] = hourly_region_model_stats['modelId'].apply(
            lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
        )
        
        # Add the hourly region-model stats to result
        result_parts.append(hourly_region_model_stats.to_string(index=False))
        
        # Add summary statistics
        result_parts.append("\n=== Summary Statistics ===")
        
        # Total requests and tokens
        total_requests = hourly_stats['request_count'].sum()
        total_input_tokens = hourly_stats['inputTokens_sum'].sum()
        total_completion_tokens = hourly_stats['completionTokens_sum'].sum()
        total_tokens = hourly_stats['totalTokens_sum'].sum()
        
        result_parts.append(f"Total Requests: {total_requests:,}")
        result_parts.append(f"Total Input Tokens: {total_input_tokens:,}")
        result_parts.append(f"Total Completion Tokens: {total_completion_tokens:,}")
        result_parts.append(f"Total Tokens: {total_tokens:,}")
        
        # === REGION SUMMARY ===
        result_parts.append("\n=== Region Summary ===")
        region_summary = df.groupby('region').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten region summary columns
        region_summary.columns = [f"{col[0]}_{col[1]}" for col in region_summary.columns]
        region_summary = region_summary.reset_index()
        region_summary = region_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        result_parts.append(region_summary.to_string(index=False))
        
        # === MODEL SUMMARY ===
        result_parts.append("\n=== Model Summary ===")
        model_summary = df.groupby('modelId').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten model summary columns
        model_summary.columns = [f"{col[0]}_{col[1]}" for col in model_summary.columns]
        model_summary = model_summary.reset_index()
        model_summary = model_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        # Format model IDs to be more readable
        model_summary['modelId'] = model_summary['modelId'].apply(
            lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
        )
        
        result_parts.append(model_summary.to_string(index=False))
        
        # === USER SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== User Summary ===")
            user_summary = df.groupby('userId').agg({
                'inputTokens': ['count', 'sum'],
                'completionTokens': ['sum'],
                'totalTokens': ['sum']
            })
            
            # Flatten user summary columns
            user_summary.columns = [f"{col[0]}_{col[1]}" for col in user_summary.columns]
            user_summary = user_summary.reset_index()
            user_summary = user_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            result_parts.append(user_summary.to_string(index=False))
            
        # === HOURLY REGION -> USER -> MODEL DETAILED SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== Hourly Region -> User -> Model Detailed Summary ===")
            hourly_region_user_model_summary = df.groupby(['datetime', 'region', 'userId', 'modelId']).agg({
                'inputTokens': ['count', 'sum', 'mean'],
                'completionTokens': ['sum', 'mean'],
                'totalTokens': ['sum', 'mean']
            })
            
            # Flatten columns
            hourly_region_user_model_summary.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_user_model_summary.columns]
            hourly_region_user_model_summary = hourly_region_user_model_summary.reset_index()
            hourly_region_user_model_summary = hourly_region_user_model_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            # Format model IDs to be more readable
            hourly_region_user_model_summary['modelId'] = hourly_region_user_model_summary['modelId'].apply(
                lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
            )
            
            result_parts.append(hourly_region_user_model_summary.to_string(index=False))
        
        # === HOURLY USAGE PATTERN ANALYSIS ===
        result_parts.append("\n=== Hourly Usage Pattern Analysis ===")
        
        # Group by hour of day (ignoring date) to see hourly patterns
        hour_pattern = df.groupby(df['timestamp'].dt.hour).agg({
            'inputTokens': ['count', 'sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten hour pattern columns
        hour_pattern.columns = [f"{col[0]}_{col[1]}" for col in hour_pattern.columns]
        hour_pattern = hour_pattern.reset_index()
        hour_pattern = hour_pattern.rename(columns={
            'timestamp': 'hour_of_day',
            'inputTokens_count': 'request_count'
        })
        
        # Format the hour to be more readable
        hour_pattern['hour_of_day'] = hour_pattern['hour_of_day'].apply(
            lambda hour: f"{hour:02d}:00 - {hour:02d}:59"
        )
        
        result_parts.append(hour_pattern.to_string(index=False))
        
        # Combine all parts into a single string
        result = "\n".join(result_parts)
        
        return result
  • Pydantic schema defining input parameters for Bedrock usage stats tools, including days, region, log group, and optional AWS account ID.
    class BedrockLogsParams(BaseModel):
        """Parameters for retrieving Bedrock invocation logs."""
        days: int = Field(
            default=7,
            description="Number of days to look back for Bedrock logs",
            ge=1,
            le=90
        )
        region: str = Field(
            default="us-east-1",
            description="AWS region to retrieve logs from"
        )
        log_group_name: str = Field(
            description="Bedrock Log Group Name",
            default=os.environ.get('BEDROCK_LOG_GROUP_NAME', 'BedrockModelInvocationLogGroup')
        )
        aws_account_id: Optional[str] = Field(        
            description="AWS account id (if different from the current AWS account) of the account for which to get the cost data",
            default=None
        )
  • server.py:451-451 (registration)
    MCP tool registration decorator applied to the handler function.
    @mcp.tool()
  • Core helper function that fetches Bedrock model invocation logs from CloudWatch Logs, parses JSON events for token counts, timestamps, models, users, and returns a structured pandas DataFrame used by the handler.
    def get_bedrock_logs(params: BedrockLogsParams) -> Optional[pd.DataFrame]:
        """
        Retrieve Bedrock invocation logs for the last n days in a given region as a dataframe
    
        Args:
            params: Pydantic model containing parameters:
                - days: Number of days to look back (default: 7)
                - region: AWS region to query (default: us-east-1)
    
        Returns:
            pd.DataFrame: DataFrame containing the log data with columns:
                - timestamp: Timestamp of the invocation
                - region: AWS region
                - modelId: Bedrock model ID
                - userId: User ARN
                - inputTokens: Number of input tokens
                - completionTokens: Number of completion tokens
                - totalTokens: Total tokens used
        """
        # Initialize CloudWatch Logs client
        print(f"get_bedrock_logs, params={params}")
        client = get_aws_service_boto3_client("logs", params.aws_account_id, params.region)
    
        # Calculate time range
        end_time = datetime.now()
        start_time = end_time - timedelta(days=params.days)
    
        # Convert to milliseconds since epoch
        start_time_ms = int(start_time.timestamp() * 1000)
        end_time_ms = int(end_time.timestamp() * 1000)
    
        filtered_logs = []
    
        try:
            paginator = client.get_paginator("filter_log_events")
    
            # Parameters for the log query        
            query_params = {
                "logGroupName": params.log_group_name,  # Use the provided log group name
                "logStreamNames": [
                    "aws/bedrock/modelinvocations"
                ],  # The specific log stream
                "startTime": start_time_ms,
                "endTime": end_time_ms,
            }
            
            # Paginate through results
            for page in paginator.paginate(**query_params):
                for event in page.get("events", []):
                    try:
                        # Parse the message as JSON
    
                        message = json.loads(event["message"])
    
                        # Get user prompt from the input messages
                        prompt = ""
          
                        input = message.get("input", {})
                        input_json = input.get("inputBodyJson", {})
                        messages = input_json.get("messages", None)
    
                        if messages:
                            for msg in message["input"]["inputBodyJson"]["messages"]:
                                #print(f"debug 2.2, {type(msg)}")
                                if msg.get("role") == "user" and msg.get("content"):
                                    for content in msg["content"]:
    
                                        if isinstance(content, dict):
                                            if content.get("text"):
                                                prompt += content["text"] + " "
                                        else:
                                            prompt += content
    
                            prompt = prompt.strip()
    
                        # Extract only the required fields
    
                        filtered_event = {
                            "timestamp": message.get("timestamp"),
                            "region": message.get("region"),
                            "modelId": message.get("modelId"),
                            "userId": message.get("identity", {}).get("arn"),
                            "inputTokens": message.get("input", {}).get("inputTokenCount"),
                            "completionTokens": message.get("output", {}).get(
                                "outputTokenCount"
                            ),
                            "totalTokens": (
                                message.get("input", {}).get("inputTokenCount", 0)
                                + message.get("output", {}).get("outputTokenCount", 0)
                            ),
                        }
    
                        filtered_logs.append(filtered_event)
                    except json.JSONDecodeError:
                        continue  # Skip non-JSON messages
                    except KeyError:
                        continue  # Skip messages missing required fields
            
            # Create DataFrame if we have logs
            if filtered_logs:
                df = pd.DataFrame(filtered_logs)
                df["timestamp"] = pd.to_datetime(df["timestamp"])
                return df
            else:
                print("No logs found for the specified time period.")
                return None
    
        except client.exceptions.ResourceNotFoundException:
            print(
                f"Log group '{params.log_group_name}' or stream 'aws/bedrock/modelinvocations' not found"
            )
            return None
        except Exception as e:
            print(f"Error retrieving logs: {str(e)}")
            return None
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries full burden. While it mentions the tool returns 'Formatted string representation of hourly usage statistics,' it doesn't disclose important behavioral traits like whether this is a read-only operation, authentication requirements, rate limits, data freshness, or what happens if parameters are invalid. The description is minimal and lacks operational context.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness3/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is brief (4 sentences) and front-loaded with the core purpose. However, the 'Args' and 'Returns' sections are redundant since they don't add value beyond the tool name and basic function. The structure is clear but includes unnecessary boilerplate that doesn't enhance understanding.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (4 parameters with 0% schema coverage, no annotations, no output schema), the description is inadequate. It doesn't explain what 'hourly usage statistics' includes, how data is aggregated, what format the returned string uses, or provide context about the Bedrock service. For a tool with multiple parameters and no structured documentation, the description should do much more to compensate.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description states 'Parameters specifying the number of days to look back and region,' but the actual input schema shows 4 parameters (days, region, log_group_name, aws_account_id) with 0% schema description coverage. The description fails to mention two parameters entirely (log_group_name and aws_account_id) and provides no meaningful semantic context beyond what's minimally implied by parameter names.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Get hourly usage statistics with detailed breakdowns.' It specifies the verb ('Get'), resource ('hourly usage statistics'), and scope ('detailed breakdowns'). However, it doesn't explicitly differentiate from sibling tools like 'get_bedrock_daily_usage_stats' or 'get_detailed_breakdown_by_day'.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives. The description doesn't mention sibling tools, specify use cases, or provide any context about when this hourly breakdown is preferred over daily statistics or other available tools.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aarora79/aws-cost-explorer-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server