Skip to main content
Glama
aarora79

AWS Cost Explorer MCP Server

get_bedrock_daily_usage_stats

Retrieve daily AWS Bedrock usage statistics with detailed breakdowns for cost analysis and monitoring.

Instructions

Get daily usage statistics with detailed breakdowns. Args: params: Parameters specifying the number of days to look back and region Returns: str: Formatted string representation of daily usage statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • The main handler function for the 'get_bedrock_daily_usage_stats' tool. Decorated with @mcp.tool(), it retrieves Bedrock logs, processes them into daily statistics grouped by date, region, model, and user, and returns a formatted markdown table summary.
    @mcp.tool() def get_bedrock_daily_usage_stats(params: BedrockLogsParams) -> str: """ Get daily usage statistics with detailed breakdowns. Args: params: Parameters specifying the number of days to look back and region Returns: str: Formatted string representation of daily usage statistics """ print(f"get_bedrock_daily_usage_stats, params={params}") df = get_bedrock_logs(params) if df is None or df.empty: return "No usage data found for the specified period." # Initialize result string result_parts = [] # Add header result_parts.append(f"Bedrock Usage Statistics (Past {params.days} days - {params.region})") result_parts.append("=" * 80) # Add a date column for easier grouping df['date'] = df['timestamp'].dt.date # === REGION -> MODEL GROUPING === result_parts.append("\n=== Daily Region-wise -> Model-wise Analysis ===") # Group by date, region, model and calculate metrics region_model_stats = df.groupby(['date', 'region', 'modelId']).agg({ 'inputTokens': ['count', 'sum', 'mean', 'max', 'median'], 'completionTokens': ['sum', 'mean', 'max', 'median'], 'totalTokens': ['sum', 'mean', 'max', 'median'] }) # Flatten the column multi-index region_model_stats.columns = [f"{col[0]}_{col[1]}" for col in region_model_stats.columns] # Reset the index to get a flat dataframe flattened_stats = region_model_stats.reset_index() # Rename inputTokens_count to request_count flattened_stats = flattened_stats.rename(columns={'inputTokens_count': 'request_count'}) # Add the flattened stats to result result_parts.append(flattened_stats.to_string(index=False)) # Add summary statistics result_parts.append("\n=== Summary Statistics ===") # Total requests and tokens total_requests = flattened_stats['request_count'].sum() total_input_tokens = flattened_stats['inputTokens_sum'].sum() total_completion_tokens = flattened_stats['completionTokens_sum'].sum() total_tokens = flattened_stats['totalTokens_sum'].sum() result_parts.append(f"Total Requests: {total_requests:,}") result_parts.append(f"Total Input Tokens: {total_input_tokens:,}") result_parts.append(f"Total Completion Tokens: {total_completion_tokens:,}") result_parts.append(f"Total Tokens: {total_tokens:,}") # === REGION SUMMARY === result_parts.append("\n=== Region Summary ===") region_summary = df.groupby('region').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten region summary columns region_summary.columns = [f"{col[0]}_{col[1]}" for col in region_summary.columns] region_summary = region_summary.reset_index() region_summary = region_summary.rename(columns={'inputTokens_count': 'request_count'}) result_parts.append(region_summary.to_string(index=False)) # === MODEL SUMMARY === result_parts.append("\n=== Model Summary ===") model_summary = df.groupby('modelId').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten model summary columns model_summary.columns = [f"{col[0]}_{col[1]}" for col in model_summary.columns] model_summary = model_summary.reset_index() model_summary = model_summary.rename(columns={'inputTokens_count': 'request_count'}) # Format model IDs to be more readable model_summary['modelId'] = model_summary['modelId'].apply( lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1] ) result_parts.append(model_summary.to_string(index=False)) # === USER SUMMARY === if 'userId' in df.columns: result_parts.append("\n=== User Summary ===") user_summary = df.groupby('userId').agg({ 'inputTokens': ['count', 'sum'], 'completionTokens': ['sum'], 'totalTokens': ['sum'] }) # Flatten user summary columns user_summary.columns = [f"{col[0]}_{col[1]}" for col in user_summary.columns] user_summary = user_summary.reset_index() user_summary = user_summary.rename(columns={'inputTokens_count': 'request_count'}) result_parts.append(user_summary.to_string(index=False)) # === REGION -> USER -> MODEL DETAILED SUMMARY === if 'userId' in df.columns: result_parts.append("\n=== Region -> User -> Model Detailed Summary ===") region_user_model_summary = df.groupby(['region', 'userId', 'modelId']).agg({ 'inputTokens': ['count', 'sum', 'mean'], 'completionTokens': ['sum', 'mean'], 'totalTokens': ['sum', 'mean'] }) # Flatten columns region_user_model_summary.columns = [f"{col[0]}_{col[1]}" for col in region_user_model_summary.columns] region_user_model_summary = region_user_model_summary.reset_index() region_user_model_summary = region_user_model_summary.rename(columns={'inputTokens_count': 'request_count'}) # Format model IDs to be more readable region_user_model_summary['modelId'] = region_user_model_summary['modelId'].apply( lambda model: model.split('.')[-1] if '.' in model else model.split('/')[-1] ) result_parts.append(region_user_model_summary.to_string(index=False)) # Combine all parts into a single string result = "\n".join(result_parts) return result
  • Pydantic BaseModel schema defining the input parameters for the tool: days (1-90), region, log_group_name, and optional aws_account_id.
    class BedrockLogsParams(BaseModel): """Parameters for retrieving Bedrock invocation logs.""" days: int = Field( default=7, description="Number of days to look back for Bedrock logs", ge=1, le=90 ) region: str = Field( default="us-east-1", description="AWS region to retrieve logs from" ) log_group_name: str = Field( description="Bedrock Log Group Name", default=os.environ.get('BEDROCK_LOG_GROUP_NAME', 'BedrockModelInvocationLogGroup') ) aws_account_id: Optional[str] = Field( description="AWS account id (if different from the current AWS account) of the account for which to get the cost data", default=None )
  • Supporting function that queries CloudWatch Logs for Bedrock model invocation events, parses the JSON log messages to extract timestamp, modelId, userId, input/completion/total tokens, and returns a structured pandas DataFrame.
    def get_bedrock_logs(params: BedrockLogsParams) -> Optional[pd.DataFrame]: """ Retrieve Bedrock invocation logs for the last n days in a given region as a dataframe Args: params: Pydantic model containing parameters: - days: Number of days to look back (default: 7) - region: AWS region to query (default: us-east-1) Returns: pd.DataFrame: DataFrame containing the log data with columns: - timestamp: Timestamp of the invocation - region: AWS region - modelId: Bedrock model ID - userId: User ARN - inputTokens: Number of input tokens - completionTokens: Number of completion tokens - totalTokens: Total tokens used """ # Initialize CloudWatch Logs client print(f"get_bedrock_logs, params={params}") client = get_aws_service_boto3_client("logs", params.aws_account_id, params.region) # Calculate time range end_time = datetime.now() start_time = end_time - timedelta(days=params.days) # Convert to milliseconds since epoch start_time_ms = int(start_time.timestamp() * 1000) end_time_ms = int(end_time.timestamp() * 1000) filtered_logs = [] try: paginator = client.get_paginator("filter_log_events") # Parameters for the log query query_params = { "logGroupName": params.log_group_name, # Use the provided log group name "logStreamNames": [ "aws/bedrock/modelinvocations" ], # The specific log stream "startTime": start_time_ms, "endTime": end_time_ms, } # Paginate through results for page in paginator.paginate(**query_params): for event in page.get("events", []): try: # Parse the message as JSON message = json.loads(event["message"]) # Get user prompt from the input messages prompt = "" input = message.get("input", {}) input_json = input.get("inputBodyJson", {}) messages = input_json.get("messages", None) if messages: for msg in message["input"]["inputBodyJson"]["messages"]: #print(f"debug 2.2, {type(msg)}") if msg.get("role") == "user" and msg.get("content"): for content in msg["content"]: if isinstance(content, dict): if content.get("text"): prompt += content["text"] + " " else: prompt += content prompt = prompt.strip() # Extract only the required fields filtered_event = { "timestamp": message.get("timestamp"), "region": message.get("region"), "modelId": message.get("modelId"), "userId": message.get("identity", {}).get("arn"), "inputTokens": message.get("input", {}).get("inputTokenCount"), "completionTokens": message.get("output", {}).get( "outputTokenCount" ), "totalTokens": ( message.get("input", {}).get("inputTokenCount", 0) + message.get("output", {}).get("outputTokenCount", 0) ), } filtered_logs.append(filtered_event) except json.JSONDecodeError: continue # Skip non-JSON messages except KeyError: continue # Skip messages missing required fields # Create DataFrame if we have logs if filtered_logs: df = pd.DataFrame(filtered_logs) df["timestamp"] = pd.to_datetime(df["timestamp"]) return df else: print("No logs found for the specified time period.") return None except client.exceptions.ResourceNotFoundException: print( f"Log group '{params.log_group_name}' or stream 'aws/bedrock/modelinvocations' not found" ) return None except Exception as e: print(f"Error retrieving logs: {str(e)}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aarora79/aws-cost-explorer-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server