Skip to main content
Glama
billing_publish.rs8.68 kB
//! This module provides the ability to publish billing events. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use billing_events::{ BillingEvent, BillingEventKind, BillingEventsError, }; use chrono::Utc; use si_events::FuncRunId; use telemetry::prelude::*; use thiserror::Error; use crate::{ ChangeSet, ChangeSetError, Component, ComponentError, ComponentId, DalContext, SchemaVariant, SchemaVariantError, TransactionsError, resource_metadata::{ self, ResourceMetadataError, }, }; #[allow(missing_docs)] #[remain::sorted] #[derive(Debug, Error)] pub enum BillingPublishError { #[error("billing events error: {0}")] BillingEvents(#[from] BillingEventsError), #[error("change set error: {0}")] ChangeSet(#[from] ChangeSetError), #[error("component error: {0}")] Component(#[from] ComponentError), #[error("resource metadata error: {0}")] ResourceMetadata(#[from] ResourceMetadataError), #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), } type BillingPublishResult<T> = Result<T, BillingPublishError>; #[instrument( name = "billing_publish.for_head_change_set_pointer_update", level = "debug", skip(ctx, change_set) )] /// Publishes a billing event with resource count if the change set that was updated is HEAD pub async fn for_head_change_set_pointer_update( ctx: &DalContext, change_set: &ChangeSet, ) -> BillingPublishResult<()> { if change_set.workspace_id.is_none() || !change_set.is_head(ctx).await? { trace!( ?change_set, "skipping billing event publishing since change set is not HEAD" ); return Ok(()); }; // NOTE(nick): the metadata itself is not sent over the wire at the time of writing due to unbounded size in the // message payload. We should store this locally and batch up in a separate process. let metadata = resource_metadata::list(ctx).await?; let resource_count = metadata.len(); let workspace_id = change_set.workspace_id()?; let event = BillingEvent { workspace_id, workspace_snapshot_address: change_set.workspace_snapshot_address, event_timestamp: Utc::now(), change_set_status: change_set.status.into(), change_set_id: change_set.id, merge_requested_by_user_id: change_set.merge_requested_by_user_id, resource_count: Some(resource_count), component_id: None, component_name: None, schema_variant_id: None, schema_id: None, schema_name: None, func_run_id: None, kind: BillingEventKind::HeadChangeSetPointerUpdate, }; ctx.services_context() .jetstream_streams() .billing_events() .publish_workspace_update(&workspace_id.to_string(), &event) .await?; Ok(()) } #[instrument( name = "billing_publish.for_change_set_status_update", level = "debug", skip(ctx, change_set) )] pub(crate) async fn for_change_set_status_update( ctx: &DalContext, change_set: &ChangeSet, ) -> BillingPublishResult<()> { if change_set.workspace_id.is_none() || !change_set.is_head(ctx).await? { trace!( ?change_set, "skipping billing event publishing since change set is not HEAD" ); return Ok(()); }; // NOTE(nick): the metadata itself is not sent over the wire at the time of writing due to unbounded size in the // message payload. We should store this locally and batch up in a separate process. let metadata = resource_metadata::list(ctx).await?; let resource_count = metadata.len(); let workspace_id = change_set.workspace_id()?; let event = BillingEvent { workspace_id, workspace_snapshot_address: change_set.workspace_snapshot_address, event_timestamp: Utc::now(), change_set_status: change_set.status.into(), change_set_id: change_set.id, merge_requested_by_user_id: change_set.merge_requested_by_user_id, resource_count: Some(resource_count), component_id: None, component_name: None, schema_variant_id: None, schema_id: None, schema_name: None, func_run_id: None, kind: BillingEventKind::ChangeSetStatusUpdate, }; ctx.services_context() .jetstream_streams() .billing_events() .publish_workspace_update(&workspace_id.to_string(), &event) .await?; Ok(()) } #[instrument( name = "billing_publish.for_resource_create", level = "debug", skip(ctx) )] pub(crate) async fn for_resource_create( ctx: &DalContext, component_id: ComponentId, func_run_id: FuncRunId, ) -> BillingPublishResult<()> { let change_set = ctx.change_set()?; if change_set.workspace_id.is_none() || !change_set.is_head(ctx).await? { trace!( ?change_set, "skipping billing event publishing since change set is not HEAD" ); return Ok(()); }; let component_name = Component::name_by_id(ctx, component_id).await?; let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?; let schema = SchemaVariant::schema_for_schema_variant_id(ctx, schema_variant_id).await?; let schema_id = schema.id(); let schema_name = schema.name().to_string(); let workspace_id = change_set.workspace_id()?; let event = BillingEvent { workspace_id, workspace_snapshot_address: change_set.workspace_snapshot_address, event_timestamp: Utc::now(), change_set_status: change_set.status.into(), change_set_id: change_set.id, merge_requested_by_user_id: change_set.merge_requested_by_user_id, resource_count: None, component_id: Some(component_id), component_name: Some(component_name), schema_variant_id: Some(schema_variant_id), schema_id: Some(schema_id), schema_name: Some(schema_name), func_run_id: Some(func_run_id), kind: BillingEventKind::ResourceCreate, }; ctx.services_context() .jetstream_streams() .billing_events() .publish_workspace_update(&workspace_id.to_string(), &event) .await?; Ok(()) } #[instrument( name = "billing_publish.for_resource_delete", level = "debug", skip(ctx) )] pub(crate) async fn for_resource_delete( ctx: &DalContext, component_id: ComponentId, func_run_id: FuncRunId, ) -> BillingPublishResult<()> { let change_set = ctx.change_set()?; if change_set.workspace_id.is_none() || !change_set.is_head(ctx).await? { trace!( ?change_set, "skipping billing event publishing since change set is not HEAD" ); return Ok(()); }; let component_name = Component::name_by_id(ctx, component_id).await?; let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?; let schema = SchemaVariant::schema_for_schema_variant_id(ctx, schema_variant_id).await?; let schema_id = schema.id(); let schema_name = schema.name().to_string(); let workspace_id = change_set.workspace_id()?; let event = BillingEvent { workspace_id, workspace_snapshot_address: change_set.workspace_snapshot_address, event_timestamp: Utc::now(), change_set_status: change_set.status.into(), change_set_id: change_set.id, merge_requested_by_user_id: change_set.merge_requested_by_user_id, resource_count: None, component_id: Some(component_id), component_name: Some(component_name), schema_variant_id: Some(schema_variant_id), schema_id: Some(schema_id), schema_name: Some(schema_name), func_run_id: Some(func_run_id), kind: BillingEventKind::ResourceDelete, }; ctx.services_context() .jetstream_streams() .billing_events() .publish_workspace_update(&workspace_id.to_string(), &event) .await?; Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server