get_bedrock_hourly_usage_stats
Retrieve detailed hourly usage statistics for Amazon Bedrock by specifying days to look back and AWS region. Analyze AWS cloud costs efficiently with structured, formatted data.
Instructions
Get hourly usage statistics with detailed breakdowns.
Args:
params: Parameters specifying the number of days to look back and region
Returns:
str: Formatted string representation of hourly usage statistics
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| params | Yes |
Implementation Reference
- server.py:451-642 (handler)The main handler function implementing the get_bedrock_hourly_usage_stats tool. It is decorated with @mcp.tool() for registration. Fetches Bedrock logs, computes hourly statistics grouped by time, region, model, user, and provides various summaries in a formatted tabular string.@mcp.tool() def get_bedrock_hourly_usage_stats(params: BedrockLogsParams) -> str: """ Get hourly usage statistics with detailed breakdowns. Args: params: Parameters specifying the number of days to look back and region Returns: str: Formatted string representation of hourly usage statistics """ print(f"get_bedrock_hourly_usage_stats, params={params}") df = get_bedrock_logs(params) if df is None or df.empty: return "No usage data found for the specified period." # Initialize result string result_parts = [] # Add header result_parts.append(f"Hourly Bedrock Usage Statistics (Past {params.days} days - {params.region})") result_parts.append("=" * 80) # Add date and hour columns for easier grouping df['date'] = df['timestamp'].dt.date df['hour'] = df['timestamp'].dt.hour df['datetime'] = df['timestamp'].dt.strftime('%Y-%m-%d %H:00') # === HOURLY USAGE ANALYSIS === result_parts.append("\n=== Hourly Usage Analysis ===") # Group by datetime (date + hour) hourly_stats = df.groupby('datetime').agg({ 'inputTokens': ['count', 'sum', 'mean'], 'completionTokens': ['sum', 'mean'], 'totalTokens': ['sum', 'mean'] }) # Flatten the column multi-index hourly_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_stats.columns] # Reset the index to get a flat dataframe hourly_stats = hourly_stats.reset_index() # Rename inputTokens_count to request_count hourly_stats = hourly_stats.rename(columns={'inputTokens_count': 'request_count'}) # Add the hourly stats to result result_parts.append(hourly_stats.to_string(index=False)) # === HOURLY REGION -> MODEL GROUPING === result_parts.append("\n=== Hourly Region-wise -> Model-wise Analysis ===") # Group by datetime, region, model and calculate metrics hourly_region_model_stats = df.groupby(['datetime', 'region', 'modelId']).agg({ 'inputTokens': ['count', 'sum', 'mean', 'max', 'median'], 'completionTokens': ['sum', 'mean', 'max', 'median'], 'totalTokens': ['sum', 'mean', 'max', 'median'] }) # Flatten the column multi-index hourly_region_model_stats.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_model_stats.columns] # Reset the index to get a flat dataframe hourly_region_model_stats = hourly_region_model_stats.reset_index() # Rename inputTokens_count to request_count hourly_region_model_stats = hourly_region_model_stats.rename(columns={'inputTokens_count': 'request_count'}) # Format model IDs to be more readable hourly_region_model_stats['modelId'] = hourly_region_model_stats['modelId'].apply( lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1] ) # Add the hourly region-model stats to result result_parts.append(hourly_region_model_stats.to_string(index=False)) # Add summary statistics result_parts.append("\n=== Summary Statistics ===") # Total requests and tokens total_requests = hourly_stats['request_count'].sum() total_input_tokens = hourly_stats['inputTokens_sum'].sum() total_completion_tokens = hourly_stats['completionTokens_sum'].sum() total_tokens = hourly_stats['totalTokens_sum'].sum() result_parts.append(f"Total Requests: {total_requests:,}") result_parts.append(f"Total Input Tokens: {total_input_tokens:,}") result_parts.append(f"Total Completion Tokens: {total_completion_tokens:,}") result_parts.append(f"Total Tokens: {total_tokens:,}") # === REGION SUMMARY === result_parts.append("\n=== Region Summary ===") region_summary = df.groupby('region').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten region summary columns region_summary.columns = [f"{col[0]}_{col[1]}" for col in region_summary.columns] region_summary = region_summary.reset_index() region_summary = region_summary.rename(columns={'inputTokens_count': 'request_count'}) result_parts.append(region_summary.to_string(index=False)) # === MODEL SUMMARY === result_parts.append("\n=== Model Summary ===") model_summary = df.groupby('modelId').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten model summary columns model_summary.columns = [f"{col[0]}_{col[1]}" for col in model_summary.columns] model_summary = model_summary.reset_index() model_summary = model_summary.rename(columns={'inputTokens_count': 'request_count'}) # Format model IDs to be more readable model_summary['modelId'] = model_summary['modelId'].apply( lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1] ) result_parts.append(model_summary.to_string(index=False)) # === USER SUMMARY === if 'userId' in df.columns: result_parts.append("\n=== User Summary ===") user_summary = df.groupby('userId').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten user summary columns user_summary.columns = [f"{col[0]}_{col[1]}" for col in user_summary.columns] user_summary = user_summary.reset_index() user_summary = user_summary.rename(columns={'inputTokens_count': 'request_count'}) result_parts.append(user_summary.to_string(index=False)) # === HOURLY REGION -> USER -> MODEL DETAILED SUMMARY === if 'userId' in df.columns: result_parts.append("\n=== Hourly Region -> User -> Model Detailed Summary ===") hourly_region_user_model_summary = df.groupby(['datetime', 'region', 'userId', 'modelId']).agg({ 'inputTokens': ['count', 'sum', 'mean'], 'completionTokens': ['sum', 'mean'], 'totalTokens': ['sum', 'mean'] }) # Flatten columns hourly_region_user_model_summary.columns = [f"{col[0]}_{col[1]}" for col in hourly_region_user_model_summary.columns] hourly_region_user_model_summary = hourly_region_user_model_summary.reset_index() hourly_region_user_model_summary = hourly_region_user_model_summary.rename(columns={'inputTokens_count': 'request_count'}) # Format model IDs to be more readable hourly_region_user_model_summary['modelId'] = hourly_region_user_model_summary['modelId'].apply( lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1] ) result_parts.append(hourly_region_user_model_summary.to_string(index=False)) # === HOURLY USAGE PATTERN ANALYSIS === result_parts.append("\n=== Hourly Usage Pattern Analysis ===") # Group by hour of day (ignoring date) to see hourly patterns hour_pattern = df.groupby(df['timestamp'].dt.hour).agg({ 'inputTokens': ['count', 'sum'], 'totalTokens': ['sum'] }) # Flatten hour pattern columns hour_pattern.columns = [f"{col[0]}_{col[1]}" for col in hour_pattern.columns] hour_pattern = hour_pattern.reset_index() hour_pattern = hour_pattern.rename(columns={ 'timestamp': 'hour_of_day', 'inputTokens_count': 'request_count' }) # Format the hour to be more readable hour_pattern['hour_of_day'] = hour_pattern['hour_of_day'].apply( lambda hour: f"{hour:02d}:00 - {hour:02d}:59" ) result_parts.append(hour_pattern.to_string(index=False)) # Combine all parts into a single string result = "\n".join(result_parts) return result
- server.py:31-51 (schema)Pydantic BaseModel schema defining the input parameters for the tool: days (1-90), region, log_group_name, and optional aws_account_id.class BedrockLogsParams(BaseModel): """Parameters for retrieving Bedrock invocation logs.""" days: int = Field( default=7, description="Number of days to look back for Bedrock logs", ge=1, le=90 ) region: str = Field( default="us-east-1", description="AWS region to retrieve logs from" ) log_group_name: str = Field( description="Bedrock Log Group Name", default=os.environ.get('BEDROCK_LOG_GROUP_NAME', 'BedrockModelInvocationLogGroup') ) aws_account_id: Optional[str] = Field( description="AWS account id (if different from the current AWS account) of the account for which to get the cost data", default=None )
- server.py:136-251 (helper)Core helper function that queries CloudWatch Logs for Bedrock model invocation events, parses JSON logs, extracts timestamp, modelId, userId, input/completion/total tokens, and returns a pandas DataFrame. Called by the main handler.def get_bedrock_logs(params: BedrockLogsParams) -> Optional[pd.DataFrame]: """ Retrieve Bedrock invocation logs for the last n days in a given region as a dataframe Args: params: Pydantic model containing parameters: - days: Number of days to look back (default: 7) - region: AWS region to query (default: us-east-1) Returns: pd.DataFrame: DataFrame containing the log data with columns: - timestamp: Timestamp of the invocation - region: AWS region - modelId: Bedrock model ID - userId: User ARN - inputTokens: Number of input tokens - completionTokens: Number of completion tokens - totalTokens: Total tokens used """ # Initialize CloudWatch Logs client print(f"get_bedrock_logs, params={params}") client = get_aws_service_boto3_client("logs", params.aws_account_id, params.region) # Calculate time range end_time = datetime.now() start_time = end_time - timedelta(days=params.days) # Convert to milliseconds since epoch start_time_ms = int(start_time.timestamp() * 1000) end_time_ms = int(end_time.timestamp() * 1000) filtered_logs = [] try: paginator = client.get_paginator("filter_log_events") # Parameters for the log query query_params = { "logGroupName": params.log_group_name, # Use the provided log group name "logStreamNames": [ "aws/bedrock/modelinvocations" ], # The specific log stream "startTime": start_time_ms, "endTime": end_time_ms, } # Paginate through results for page in paginator.paginate(**query_params): for event in page.get("events", []): try: # Parse the message as JSON message = json.loads(event["message"]) # Get user prompt from the input messages prompt = "" input = message.get("input", {}) input_json = input.get("inputBodyJson", {}) messages = input_json.get("messages", None) if messages: for msg in message["input"]["inputBodyJson"]["messages"]: #print(f"debug 2.2, {type(msg)}") if msg.get("role") == "user" and msg.get("content"): for content in msg["content"]: if isinstance(content, dict): if content.get("text"): prompt += content["text"] + " " else: prompt += content prompt = prompt.strip() # Extract only the required fields filtered_event = { "timestamp": message.get("timestamp"), "region": message.get("region"), "modelId": message.get("modelId"), "userId": message.get("identity", {}).get("arn"), "inputTokens": message.get("input", {}).get("inputTokenCount"), "completionTokens": message.get("output", {}).get( "outputTokenCount" ), "totalTokens": ( message.get("input", {}).get("inputTokenCount", 0) + message.get("output", {}).get("outputTokenCount", 0) ), } filtered_logs.append(filtered_event) except json.JSONDecodeError: continue # Skip non-JSON messages except KeyError: continue # Skip messages missing required fields # Create DataFrame if we have logs if filtered_logs: df = pd.DataFrame(filtered_logs) df["timestamp"] = pd.to_datetime(df["timestamp"]) return df else: print("No logs found for the specified time period.") return None except client.exceptions.ResourceNotFoundException: print( f"Log group '{params.log_group_name}' or stream 'aws/bedrock/modelinvocations' not found" ) return None except Exception as e: print(f"Error retrieving logs: {str(e)}") return None
- server.py:75-135 (helper)Utility helper to create boto3 clients for AWS services (like logs), handling cross-account role assumption via STS if aws_account_id differs from current account.def get_aws_service_boto3_client(service: str, aws_account_id: Optional[str], region_name: str, account_b_role_name: Optional[str] = CROSS_ACCOUNT_ROLE_NAME): """ Creates a boto3 client for the specified service in this current AWS account or in a different account if an account id is specified. Args: service (str): AWS service name (e.g., 'logs', 'cloudwatch') region_name (str): AWS region (e.g. 'us-east-1') aws_account_id (str): AWS account ID to access, this is the account in which the role is to be assumed account_b_role_name (str): IAM role name to assume Returns: boto3.client: Service client with assumed role credentials """ try: this_account = boto3.client('sts').get_caller_identity()['Account'] if aws_account_id is not None and this_account != aws_account_id: # the request is for a different account, we need to assume a role in that account print(f"Request is for a different account: {aws_account_id}, current account: {this_account}") # Create STS client sts_client = boto3.client('sts') current_identity = sts_client.get_caller_identity() print(f"Current identity: {current_identity}") # Define the role ARN role_arn = f"arn:aws:iam::{aws_account_id}:role/{account_b_role_name}" print(f"Attempting to assume role: {role_arn}") # Assume the role assumed_role = sts_client.assume_role( RoleArn=role_arn, RoleSessionName="CrossAccountSession" ) # Extract temporary credentials credentials = assumed_role['Credentials'] # Create client with assumed role credentials client = boto3.client( service, region_name=region_name, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) print(f"Successfully created cross-account client for {service} in account {aws_account_id}") return client else: client = boto3.client( service, region_name=region_name ) print(f"Successfully created client for {service} in the current AWS account {this_account}") return client except Exception as e: print(f"Error creating cross-account client for {service}: {e}") raise e
- server.py:451-451 (registration)The @mcp.tool() decorator registers the get_bedrock_hourly_usage_stats function as an MCP tool.@mcp.tool()