Skip to main content
Glama
billing_events.rs7.25 kB
use std::{ future::{ Future, IntoFuture as _, }, io, sync::Arc, }; use app_state::{ AppState, NoopAppState, }; use billing_events::{ BillingEventsError, BillingEventsWorkQueue, }; use data_warehouse_stream_client::{ DataWarehouseStreamClient, DataWarehouseStreamClientError, }; use naxum::{ MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, extract::MatchedSubject, handler::Handler as _, middleware::{ ack::AckLayer, matched_subject::{ ForSubject, MatchedSubjectLayer, }, trace::TraceLayer, }, response::{ IntoResponse, Response, }, }; use si_data_nats::{ ConnectionMetadata, async_nats::{ self, error::Error as AsyncNatsError, jetstream::{ consumer::{ StreamErrorKind, pull::Stream, }, stream::ConsumerErrorKind, }, }, jetstream::Context, }; use telemetry::prelude::*; use thiserror::Error; use tokio_util::sync::CancellationToken; mod app_state; mod handlers; #[derive(Debug, Error)] pub enum BillingEventsAppSetupError { #[error("async nats consumer error: {0}")] AsyncNatsConsumer(#[from] AsyncNatsError<ConsumerErrorKind>), #[error("async nats stream error: {0}")] AsyncNatsStream(#[from] AsyncNatsError<StreamErrorKind>), #[error("billing events error: {0}")] BillingEvents(#[from] BillingEventsError), #[error("data warehouse error: {0}")] DataWarehouse(#[from] DataWarehouseStreamClientError), } type Result<T> = std::result::Result<T, BillingEventsAppSetupError>; #[instrument( name = "forklift.init.app.billing_events.build_and_run", level = "debug", skip_all )] pub(crate) async fn build_and_run( jetstream_context: Context, durable_consumer_name: String, connection_metadata: Arc<ConnectionMetadata>, concurrency_limit: usize, data_warehouse_stream_name: Option<&str>, token: CancellationToken, ) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { let incoming = { let queue = BillingEventsWorkQueue::get_or_create(jetstream_context).await?; let consumer_subject = queue.workspace_update_subject("*"); queue .stream() .await? .create_consumer(async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable_consumer_name), filter_subject: consumer_subject, ..Default::default() }) .await? .messages() .await? }; let inner = match data_warehouse_stream_name { Some(stream_name) => { info!(%stream_name, "creating billing events app in data warehouse stream delivery mode..."); let client = DataWarehouseStreamClient::new(stream_name).await?; let state = AppState::new(client); build_app( state, connection_metadata, incoming, concurrency_limit, token.clone(), )? } None => { info!("creating billing events app in no-op mode..."); let state = NoopAppState::new(); build_noop_app( state, connection_metadata, incoming, concurrency_limit, token.clone(), )? } }; Ok(inner) } #[instrument( name = "forklift.init.app.billing_events.build_and_run.build_app", level = "debug", skip_all )] fn build_app( state: AppState, connection_metadata: Arc<ConnectionMetadata>, incoming: Stream, concurrency_limit: usize, token: CancellationToken, ) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { let app = ServiceBuilder::new() .layer(MatchedSubjectLayer::new().for_subject( ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), )) .layer( TraceLayer::new() .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(AckLayer::new()) .service(handlers::process_request.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) .with_graceful_shutdown(naxum::wait_on_cancelled(token)); Ok(Box::new(inner.into_future())) } #[instrument( name = "forklift.init.app.billing_events.build_and_run.build_noop_app", level = "debug", skip_all )] fn build_noop_app( state: NoopAppState, connection_metadata: Arc<ConnectionMetadata>, incoming: Stream, concurrency_limit: usize, token: CancellationToken, ) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { let app = ServiceBuilder::new() .layer(MatchedSubjectLayer::new().for_subject( ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), )) .layer( TraceLayer::new() .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(AckLayer::new()) .service(handlers::process_request_noop.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) .with_graceful_shutdown(naxum::wait_on_cancelled(token)); Ok(Box::new(inner.into_future())) } #[derive(Clone, Debug)] struct ForkliftBillingEventsForSubject { prefix: Option<()>, } impl ForkliftBillingEventsForSubject { fn with_prefix(prefix: Option<&str>) -> Self { Self { prefix: prefix.map(|_p| ()), } } } impl<R> ForSubject<R> for ForkliftBillingEventsForSubject where R: MessageHead, { fn call(&mut self, req: &mut naxum::Message<R>) { let mut parts = req.subject().split('.'); match self.prefix { Some(_) => { if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } None => { if let (Some(p1), Some(p2), Some(_workspace_id), None) = (parts.next(), parts.next(), parts.next(), parts.next()) { let matched = format!("{p1}.{p2}.:workspace_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server