Skip to main content
Glama
aarora79

AWS Cost Explorer MCP Server

get_bedrock_hourly_usage_stats

Retrieve hourly AWS Bedrock usage statistics with detailed breakdowns to monitor model invocation patterns and analyze cloud spending trends.

Instructions

Get hourly usage statistics with detailed breakdowns.

Args:
    params: Parameters specifying the number of days to look back and region

Returns:
    str: Formatted string representation of hourly usage statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • Main handler function implementing the get_bedrock_hourly_usage_stats tool. Retrieves Bedrock logs, performs hourly grouping and analysis on tokens and requests, generates detailed tabular reports for hourly, region-model, user breakdowns, and usage patterns.
    @mcp.tool()
    def get_bedrock_hourly_usage_stats(params: BedrockLogsParams) -> str:
        """
        Get hourly usage statistics with detailed breakdowns.
    
        Args:
            params: Parameters specifying the number of days to look back and region
    
        Returns:
            str: Formatted string representation of hourly usage statistics
        """
        print(f"get_bedrock_hourly_usage_stats, params={params}")
        df = get_bedrock_logs(params)
    
        if df is None or df.empty:
            return "No usage data found for the specified period."
        
        # Initialize result string
        result_parts = []
        
        # Add header
        result_parts.append(f"Hourly Bedrock Usage Statistics (Past {params.days} days - {params.region})")
        result_parts.append("=" * 80)
        
        # Add date and hour columns for easier grouping
        df['date'] = df['timestamp'].dt.date
        df['hour'] = df['timestamp'].dt.hour
        df['datetime'] = df['timestamp'].dt.strftime('%Y-%m-%d %H:00')
        
        # === HOURLY USAGE ANALYSIS ===
        result_parts.append("\n=== Hourly Usage Analysis ===")
        
        # Group by datetime (date + hour)
        hourly_stats = df.groupby('datetime').agg({
            'inputTokens': ['count', 'sum', 'mean'],
            'completionTokens': ['sum', 'mean'],
            'totalTokens': ['sum', 'mean']
        })
        
        # Flatten the column multi-index
        hourly_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_stats.columns]
        
        # Reset the index to get a flat dataframe
        hourly_stats = hourly_stats.reset_index()
        
        # Rename inputTokens_count to request_count
        hourly_stats = hourly_stats.rename(columns={'inputTokens_count': 'request_count'})
        
        # Add the hourly stats to result
        result_parts.append(hourly_stats.to_string(index=False))
        
        # === HOURLY REGION -> MODEL GROUPING ===
        result_parts.append("\n=== Hourly Region-wise -> Model-wise Analysis ===")
        
        # Group by datetime, region, model and calculate metrics
        hourly_region_model_stats = df.groupby(['datetime', 'region', 'modelId']).agg({
            'inputTokens': ['count', 'sum', 'mean', 'max', 'median'],
            'completionTokens': ['sum', 'mean', 'max', 'median'],
            'totalTokens': ['sum', 'mean', 'max', 'median']
        })
        
        # Flatten the column multi-index
        hourly_region_model_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_model_stats.columns]
        
        # Reset the index to get a flat dataframe
        hourly_region_model_stats = hourly_region_model_stats.reset_index()
        
        # Rename inputTokens_count to request_count
        hourly_region_model_stats = hourly_region_model_stats.rename(columns={'inputTokens_count': 'request_count'})
        
        # Format model IDs to be more readable
        hourly_region_model_stats['modelId'] = hourly_region_model_stats['modelId'].apply(
            lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
        )
        
        # Add the hourly region-model stats to result
        result_parts.append(hourly_region_model_stats.to_string(index=False))
        
        # Add summary statistics
        result_parts.append("\n=== Summary Statistics ===")
        
        # Total requests and tokens
        total_requests = hourly_stats['request_count'].sum()
        total_input_tokens = hourly_stats['inputTokens_sum'].sum()
        total_completion_tokens = hourly_stats['completionTokens_sum'].sum()
        total_tokens = hourly_stats['totalTokens_sum'].sum()
        
        result_parts.append(f"Total Requests: {total_requests:,}")
        result_parts.append(f"Total Input Tokens: {total_input_tokens:,}")
        result_parts.append(f"Total Completion Tokens: {total_completion_tokens:,}")
        result_parts.append(f"Total Tokens: {total_tokens:,}")
        
        # === REGION SUMMARY ===
        result_parts.append("\n=== Region Summary ===")
        region_summary = df.groupby('region').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten region summary columns
        region_summary.columns = [f"{col[0]}_{col[1]}" for col in region_summary.columns]
        region_summary = region_summary.reset_index()
        region_summary = region_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        result_parts.append(region_summary.to_string(index=False))
        
        # === MODEL SUMMARY ===
        result_parts.append("\n=== Model Summary ===")
        model_summary = df.groupby('modelId').agg({
            'inputTokens': ['count', 'sum'],
            'completionTokens': ['sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten model summary columns
        model_summary.columns = [f"{col[0]}_{col[1]}" for col in model_summary.columns]
        model_summary = model_summary.reset_index()
        model_summary = model_summary.rename(columns={'inputTokens_count': 'request_count'})
        
        # Format model IDs to be more readable
        model_summary['modelId'] = model_summary['modelId'].apply(
            lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
        )
        
        result_parts.append(model_summary.to_string(index=False))
        
        # === USER SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== User Summary ===")
            user_summary = df.groupby('userId').agg({
                'inputTokens': ['count', 'sum'],
                'completionTokens': ['sum'],
                'totalTokens': ['sum']
            })
            
            # Flatten user summary columns
            user_summary.columns = [f"{col[0]}_{col[1]}" for col in user_summary.columns]
            user_summary = user_summary.reset_index()
            user_summary = user_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            result_parts.append(user_summary.to_string(index=False))
            
        # === HOURLY REGION -> USER -> MODEL DETAILED SUMMARY ===
        if 'userId' in df.columns:
            result_parts.append("\n=== Hourly Region -> User -> Model Detailed Summary ===")
            hourly_region_user_model_summary = df.groupby(['datetime', 'region', 'userId', 'modelId']).agg({
                'inputTokens': ['count', 'sum', 'mean'],
                'completionTokens': ['sum', 'mean'],
                'totalTokens': ['sum', 'mean']
            })
            
            # Flatten columns
            hourly_region_user_model_summary.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_user_model_summary.columns]
            hourly_region_user_model_summary = hourly_region_user_model_summary.reset_index()
            hourly_region_user_model_summary = hourly_region_user_model_summary.rename(columns={'inputTokens_count': 'request_count'})
            
            # Format model IDs to be more readable
            hourly_region_user_model_summary['modelId'] = hourly_region_user_model_summary['modelId'].apply(
                lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1]
            )
            
            result_parts.append(hourly_region_user_model_summary.to_string(index=False))
        
        # === HOURLY USAGE PATTERN ANALYSIS ===
        result_parts.append("\n=== Hourly Usage Pattern Analysis ===")
        
        # Group by hour of day (ignoring date) to see hourly patterns
        hour_pattern = df.groupby(df['timestamp'].dt.hour).agg({
            'inputTokens': ['count', 'sum'],
            'totalTokens': ['sum']
        })
        
        # Flatten hour pattern columns
        hour_pattern.columns = [f"{col[0]}_{col[1]}" for col in hour_pattern.columns]
        hour_pattern = hour_pattern.reset_index()
        hour_pattern = hour_pattern.rename(columns={
            'timestamp': 'hour_of_day',
            'inputTokens_count': 'request_count'
        })
        
        # Format the hour to be more readable
        hour_pattern['hour_of_day'] = hour_pattern['hour_of_day'].apply(
            lambda hour: f"{hour:02d}:00 - {hour:02d}:59"
        )
        
        result_parts.append(hour_pattern.to_string(index=False))
        
        # Combine all parts into a single string
        result = "\n".join(result_parts)
        
        return result
  • Pydantic schema defining input parameters for Bedrock usage stats tools, including days, region, log group, and optional AWS account ID.
    class BedrockLogsParams(BaseModel):
        """Parameters for retrieving Bedrock invocation logs."""
        days: int = Field(
            default=7,
            description="Number of days to look back for Bedrock logs",
            ge=1,
            le=90
        )
        region: str = Field(
            default="us-east-1",
            description="AWS region to retrieve logs from"
        )
        log_group_name: str = Field(
            description="Bedrock Log Group Name",
            default=os.environ.get('BEDROCK_LOG_GROUP_NAME', 'BedrockModelInvocationLogGroup')
        )
        aws_account_id: Optional[str] = Field(        
            description="AWS account id (if different from the current AWS account) of the account for which to get the cost data",
            default=None
        )
  • server.py:451-451 (registration)
    MCP tool registration decorator applied to the handler function.
    @mcp.tool()
  • Core helper function that fetches Bedrock model invocation logs from CloudWatch Logs, parses JSON events for token counts, timestamps, models, users, and returns a structured pandas DataFrame used by the handler.
    def get_bedrock_logs(params: BedrockLogsParams) -> Optional[pd.DataFrame]:
        """
        Retrieve Bedrock invocation logs for the last n days in a given region as a dataframe
    
        Args:
            params: Pydantic model containing parameters:
                - days: Number of days to look back (default: 7)
                - region: AWS region to query (default: us-east-1)
    
        Returns:
            pd.DataFrame: DataFrame containing the log data with columns:
                - timestamp: Timestamp of the invocation
                - region: AWS region
                - modelId: Bedrock model ID
                - userId: User ARN
                - inputTokens: Number of input tokens
                - completionTokens: Number of completion tokens
                - totalTokens: Total tokens used
        """
        # Initialize CloudWatch Logs client
        print(f"get_bedrock_logs, params={params}")
        client = get_aws_service_boto3_client("logs", params.aws_account_id, params.region)
    
        # Calculate time range
        end_time = datetime.now()
        start_time = end_time - timedelta(days=params.days)
    
        # Convert to milliseconds since epoch
        start_time_ms = int(start_time.timestamp() * 1000)
        end_time_ms = int(end_time.timestamp() * 1000)
    
        filtered_logs = []
    
        try:
            paginator = client.get_paginator("filter_log_events")
    
            # Parameters for the log query        
            query_params = {
                "logGroupName": params.log_group_name,  # Use the provided log group name
                "logStreamNames": [
                    "aws/bedrock/modelinvocations"
                ],  # The specific log stream
                "startTime": start_time_ms,
                "endTime": end_time_ms,
            }
            
            # Paginate through results
            for page in paginator.paginate(**query_params):
                for event in page.get("events", []):
                    try:
                        # Parse the message as JSON
    
                        message = json.loads(event["message"])
    
                        # Get user prompt from the input messages
                        prompt = ""
          
                        input = message.get("input", {})
                        input_json = input.get("inputBodyJson", {})
                        messages = input_json.get("messages", None)
    
                        if messages:
                            for msg in message["input"]["inputBodyJson"]["messages"]:
                                #print(f"debug 2.2, {type(msg)}")
                                if msg.get("role") == "user" and msg.get("content"):
                                    for content in msg["content"]:
    
                                        if isinstance(content, dict):
                                            if content.get("text"):
                                                prompt += content["text"] + " "
                                        else:
                                            prompt += content
    
                            prompt = prompt.strip()
    
                        # Extract only the required fields
    
                        filtered_event = {
                            "timestamp": message.get("timestamp"),
                            "region": message.get("region"),
                            "modelId": message.get("modelId"),
                            "userId": message.get("identity", {}).get("arn"),
                            "inputTokens": message.get("input", {}).get("inputTokenCount"),
                            "completionTokens": message.get("output", {}).get(
                                "outputTokenCount"
                            ),
                            "totalTokens": (
                                message.get("input", {}).get("inputTokenCount", 0)
                                + message.get("output", {}).get("outputTokenCount", 0)
                            ),
                        }
    
                        filtered_logs.append(filtered_event)
                    except json.JSONDecodeError:
                        continue  # Skip non-JSON messages
                    except KeyError:
                        continue  # Skip messages missing required fields
            
            # Create DataFrame if we have logs
            if filtered_logs:
                df = pd.DataFrame(filtered_logs)
                df["timestamp"] = pd.to_datetime(df["timestamp"])
                return df
            else:
                print("No logs found for the specified time period.")
                return None
    
        except client.exceptions.ResourceNotFoundException:
            print(
                f"Log group '{params.log_group_name}' or stream 'aws/bedrock/modelinvocations' not found"
            )
            return None
        except Exception as e:
            print(f"Error retrieving logs: {str(e)}")
            return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aarora79/aws-cost-explorer-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server