Skip to main content
Glama
upload-billing-hours.py4.42 kB
from typing import Iterable, NotRequired, cast from datetime import date, datetime, timedelta, timezone import itertools import logging from si_lambda import SiLambda, SiLambdaEnv from si_lago_api import ExternalSubscriptionId, LagoEvent from si_types import SqlTimestamp billable_metric_code = "resource-hours" cost_per_resource_hour = 0.007 class UploadBillingHoursEnv(SiLambdaEnv): batch_hours: NotRequired[int] end_hours_ago: NotRequired[int] class UploadBillingHours(SiLambda): def __init__(self, event: UploadBillingHoursEnv): super().__init__(event) self.batch_hours = int(event.get("batch_hours", 6)) self.end_hours_ago = int(event.get("end_hours_ago", 0)) assert self.batch_hours > 0 def get_next_uninvoiced_month(self): # Decide what month to set price for today = date.today() next_uninvoiced_month = today.replace(day=1) # If we're still in the grace period at the beginning of the month, we want the price for the previous month if today.day < 5: next_uninvoiced_month = (next_uninvoiced_month - timedelta(days=1)).replace(day=1) return next_uninvoiced_month def format_event(self, event: tuple[ExternalSubscriptionId, SqlTimestamp, int]) -> LagoEvent: [external_subscription_id, hour_start, resource_count] = event event_time = datetime.strptime(hour_start, "%Y-%m-%d %H:%M:%S").replace( tzinfo=timezone.utc ) assert ( event_time.minute == 0 and event_time.second == 0 and event_time.microsecond == 0 ) return { "transaction_id": f"{external_subscription_id}-{datetime.strftime(event_time, '%Y-%m-%d-%H')}", "external_subscription_id": external_subscription_id, "timestamp": event_time.timestamp(), "code": billable_metric_code, "properties": { "resource_hours": resource_count, }, } def run(self): last_hour_end = -self.end_hours_ago while last_hour_end is not None: first_hour_start = last_hour_end - self.batch_hours logging.info( f"Uploading {self.batch_hours} hours of billing events starting {-first_hour_start} hours ago" ) # # Upload events by hour # all_events = cast( Iterable[tuple[ExternalSubscriptionId, SqlTimestamp, int]], self.redshift.query_raw( f""" SELECT external_subscription_id, hour_start, resource_count FROM workspace_operations.owner_resource_hours_with_subscriptions CROSS JOIN workspace_operations.workspace_update_events_summary WHERE external_subscription_id IS NOT NULL AND DATEADD(HOUR, {first_hour_start}, last_complete_hour_start) <= hour_start AND hour_start < DATEADD(HOUR, {last_hour_end}, last_complete_hour_start) ORDER BY hour_start DESC """ ), ) uploaded_events = 0 for hour_start, hour_events in itertools.groupby( all_events, lambda event: event[1] ): logging.debug(f"Uploading events for hour {hour_start}") new_events, total_events = self.lago.upload_events( map(self.format_event, hour_events), dry_run=self.dry_run ) logging.info(f"Uploaded {new_events} / {total_events} for hour {hour_start}") if total_events == 0: logging.warning(f"No events found for hour {hour_start}") uploaded_events += new_events if uploaded_events == 0: logging.info( f"No events needed to be uploaded in the {self.batch_hours}-hour batch starting {-first_hour_start} hours ago! Finished." ) last_hour_end = None break logging.info( f"Uploaded {uploaded_events} events in {self.batch_hours}-hour batch starting {-first_hour_start} hours ago!" ) last_hour_end = first_hour_start if uploaded_events > 0 else None lambda_handler = UploadBillingHours.lambda_handler

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server