Skip to main content
Glama
nats.rs5.78 kB
use std::time::Duration; use si_data_nats::{ async_nats::jetstream, jetstream::Context, }; pub const NATS_HEADER_DB_NAME: &str = "X-DB-NAME"; pub const NATS_HEADER_KEY: &str = "X-KEY"; pub const NATS_HEADER_INSTANCE_ID: &str = "X-INSTANCE-ID"; const NATS_EVENTS_STREAM_NAME: &str = "LAYERDB_EVENTS"; // Stream that covers messages across the following subjects: // ``` // si.layerdb.events.$workspace_pk.$change_set_pk.$table_name.$event_kind // ``` const NATS_EVENT_STREAM_SUBJECTS: &[&str] = &["si.layerdb.events.*.*.*.*"]; const NATS_ACTIVITIES_STREAM_NAME: &str = "LAYERDB_ACTIVITIES"; const NATS_ACTIVITIES_STREAM_SUBJECTS: &[&str] = &["si.layerdb.activities.>"]; const MAX_BYTES: i64 = 1024 * 1024; // mirrors settings in Synadia NATs /// Returns a Jetstream Stream and creates it if it doesn't yet exist. pub async fn layerdb_events_stream( context: &Context, prefix: Option<&str>, ) -> Result<jetstream::stream::Stream, jetstream::context::CreateStreamError> { let subjects: Vec<_> = NATS_EVENT_STREAM_SUBJECTS .iter() .map(|suffix| nats_std::subject::prefixed(prefix, suffix).to_string()) .collect(); let stream = context .get_or_create_stream(jetstream::stream::Config { name: nats_std::jetstream::prefixed(prefix, NATS_EVENTS_STREAM_NAME), description: Some("Layerdb events".to_owned()), subjects, retention: jetstream::stream::RetentionPolicy::Limits, discard: jetstream::stream::DiscardPolicy::Old, // TODO(fnichol): this likely needs tuning max_age: Duration::from_secs(60 * 60 * 6), ..Default::default() }) .await?; Ok(stream) } pub async fn layerdb_activities_stream( context: &Context, prefix: Option<&str>, ) -> Result<jetstream::stream::Stream, jetstream::context::CreateStreamError> { let subjects: Vec<_> = NATS_ACTIVITIES_STREAM_SUBJECTS .iter() .map(|suffix| nats_std::subject::prefixed(prefix, suffix).to_string()) .collect(); let stream = context .get_or_create_stream(jetstream::stream::Config { name: nats_std::jetstream::prefixed(prefix, NATS_ACTIVITIES_STREAM_NAME), description: Some("Layerdb activities".to_owned()), subjects, retention: jetstream::stream::RetentionPolicy::Limits, discard: jetstream::stream::DiscardPolicy::Old, // TODO(fnichol): this likely needs tuning max_age: Duration::from_secs(60 * 60 * 6), max_bytes: MAX_BYTES, ..Default::default() }) .await?; Ok(stream) } pub mod subject { use si_data_nats::Subject; use crate::{ activities::{ Activity, ActivityPayloadDiscriminants, }, event::LayeredEvent, }; const EVENTS_PREFIX: &str = "si.layerdb.events"; const ACTIVITIES_PREFIX: &str = "si.layerdb.activities"; pub fn for_event(prefix: Option<&str>, event: &LayeredEvent) -> Subject { // Cuts down on the amount of `String` allocations dealing with Ulids let mut buf = [0; ulid::ULID_LEN]; // A string with enough capacity to avoid multiple reallocations let mut suffix = String::with_capacity( EVENTS_PREFIX.len() + (2 * ulid::ULID_LEN) + event.payload.db_name.len() + 4, ); suffix.push_str(EVENTS_PREFIX); suffix.push('.'); suffix.push_str(event.metadata.tenancy.workspace_pk.array_to_str(&mut buf)); suffix.push('.'); suffix.push_str(event.metadata.tenancy.change_set_id.array_to_str(&mut buf)); suffix.push('.'); suffix.push_str(&event.payload.db_name); suffix.push('.'); suffix.push_str(event.event_kind.as_ref()); nats_std::subject::prefixed(prefix, suffix) } pub fn for_activity(prefix: Option<&str>, activity: &Activity) -> Subject { // Cuts down on the amount of `String` allocations dealing with Ulids let mut buf = [0; ulid::ULID_LEN]; // A string with enough capacity to avoid multiple reallocations let mut suffix = String::with_capacity( ACTIVITIES_PREFIX.len() + (2 * ulid::ULID_LEN) + activity.payload.to_subject().len() + 3, ); suffix.push_str(ACTIVITIES_PREFIX); suffix.push('.'); suffix.push_str( activity .metadata .tenancy .workspace_pk .array_to_str(&mut buf), ); suffix.push('.'); suffix.push_str( activity .metadata .tenancy .change_set_id .array_to_str(&mut buf), ); suffix.push('.'); suffix.push_str(&activity.payload.to_subject()); nats_std::subject::prefixed(prefix, suffix) } pub fn for_activity_discriminate( prefix: Option<&str>, activity_payload_discriminate: ActivityPayloadDiscriminants, ) -> Subject { // Cuts down on the amount of `String` allocations dealing with Ulids let _buf = [0; ulid::ULID_LEN]; // A string with enough capacity to avoid multiple reallocations let mut suffix = String::with_capacity( ACTIVITIES_PREFIX.len() + activity_payload_discriminate.to_subject().len() + 5, ); suffix.push_str(ACTIVITIES_PREFIX); suffix.push('.'); suffix.push('*'); suffix.push('.'); suffix.push('*'); suffix.push('.'); suffix.push_str(&activity_payload_discriminate.to_subject()); nats_std::subject::prefixed(prefix, suffix) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server