Skip to main content
Glama
audit_logs.rs7.14 kB
use std::{ future::{ Future, IntoFuture as _, }, io, sync::Arc, time::Duration, }; use app_state::AppState; use audit_database::AuditDatabaseContext; use audit_logs_stream::{ AuditLogsStream, AuditLogsStreamError, }; use nats_dead_letter_queue::{ DeadLetterQueue, NatsDeadLetterQueueError, }; use naxum::{ MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, extract::MatchedSubject, handler::Handler as _, middleware::{ ack::AckLayer, matched_subject::{ ForSubject, MatchedSubjectLayer, }, trace::TraceLayer, }, response::{ IntoResponse, Response, }, }; use si_data_nats::{ ConnectionMetadata, async_nats::{ self, error::Error as AsyncNatsError, jetstream::{ consumer::StreamErrorKind, stream::ConsumerErrorKind, }, }, jetstream::Context, }; use telemetry::prelude::*; use thiserror::Error; use tokio_util::sync::CancellationToken; mod app_state; mod handlers; #[derive(Debug, Error)] pub enum AuditLogsAppSetupError { #[error("async nats consumer error: {0}")] AsyncNatsConsumer(#[from] AsyncNatsError<ConsumerErrorKind>), #[error("async nats stream error: {0}")] AsyncNatsStream(#[from] AsyncNatsError<StreamErrorKind>), #[error("audit logs stream error: {0}")] AuditLogsStream(#[from] AuditLogsStreamError), #[error("failed to create dead letter stream: {0}")] NatsDeadLetterQueue(#[from] NatsDeadLetterQueueError), } type Result<T> = std::result::Result<T, AuditLogsAppSetupError>; /// Builds a naxum app for audit logs. Note that despite having an ack layer, all audit logs remain on the stream when /// processed. This is because the audit logs stream is limits-based and is not a work queue. Sneaky! #[instrument( name = "forklift.init.app.audit_logs.build_and_run", level = "debug", skip_all )] pub(crate) async fn build_and_run( jetstream_context: Context, durable_consumer_name: String, connection_metadata: Arc<ConnectionMetadata>, audit_database_context: AuditDatabaseContext, insert_concurrency_limit: usize, token: CancellationToken, ) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { DeadLetterQueue::create_stream(jetstream_context.clone()).await?; let incoming = { let stream = AuditLogsStream::get_or_create(jetstream_context.clone()).await?; let consumer_subject = stream.consuming_subject_for_all_workspaces(); stream .stream() .await? .create_consumer(async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable_consumer_name.clone()), filter_subject: consumer_subject.into_string(), max_deliver: 4, backoff: vec![ Duration::from_secs(5), Duration::from_secs(10), Duration::from_secs(15), ], ..Default::default() }) .await? .messages() .await? }; let state = AppState::new( audit_database_context, connection_metadata.subject_prefix().is_some(), ); // NOTE(nick,fletcher): the "NatsMakeSpan" builder defaults to "info" level logging. Bump it down, if needed. let app = ServiceBuilder::new() .layer( crate::middleware::consumer_lag_gauge::ConsumerLagGaugeLayer::new(|lag| { use telemetry_utils::gauge; gauge!(audit_logs_consumer_lag = lag as f64); }), ) .layer( MatchedSubjectLayer::new().for_subject(ForkliftAuditLogsForSubject::with_prefix( connection_metadata.subject_prefix(), )), ) .layer( TraceLayer::new() .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(AckLayer::new()) .service(handlers::default.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve_with_incoming_limit( incoming, app.into_make_service(), insert_concurrency_limit, ) .with_graceful_shutdown(naxum::wait_on_cancelled(token)); Ok(Box::new(inner.into_future())) } #[derive(Clone, Debug)] struct ForkliftAuditLogsForSubject { prefix: Option<()>, } impl ForkliftAuditLogsForSubject { fn with_prefix(prefix: Option<&str>) -> Self { Self { prefix: prefix.map(|_p| ()), } } } impl<R> ForSubject<R> for ForkliftAuditLogsForSubject where R: MessageHead, { fn call(&mut self, req: &mut naxum::Message<R>) { let mut parts = req.subject().split('.'); match self.prefix { Some(_) => { match ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { // Subject with change set id ( Some(prefix), Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), None, ) => { let matched = format!("{prefix}.{p1}.{p2}.:workspace_id.:change_set_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); } // Subject without change set id (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None, None) => { let matched = format!("{prefix}.{p1}.{p2}.:workspace_id."); req.extensions_mut().insert(MatchedSubject::from(matched)); } _ => {} } } None => { match ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { // Subject with change set id (Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), None) => { let matched = format!("{p1}.{p2}.:workspace_id.:change_set_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); } // Subject without change set id (Some(p1), Some(p2), Some(_workspace_id), None, None) => { let matched = format!("{p1}.{p2}.:workspace_id."); req.extensions_mut().insert(MatchedSubject::from(matched)); } _ => {} } } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server