Skip to main content
Glama
server.rs10.8 kB
use std::{ fmt, future::Future, io, sync::Arc, }; use audit_database::{ AuditDatabaseContext, AuditDatabaseContextError, }; use si_data_nats::{ ConnectionMetadata, NatsClient, jetstream, }; use si_data_pg::{ PgPool, PgPoolConfig, }; use si_layer_cache::event::LayeredEventClient; use snapshot_eviction::SnapshotEvictor; use telemetry::prelude::*; use thiserror::Error; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; use crate::config::Config; mod app; pub(crate) use app::AppSetupError; const DURABLE_CONSUMER_NAME: &str = "forklift-server"; #[derive(Debug, Error)] pub enum ServerError { #[error("app setup error: {0}")] AppSetup(#[from] AppSetupError), #[error("audit database context error: {0}")] AuditDatabaseContext(#[from] AuditDatabaseContextError), #[error("join error: {0}")] Join(#[from] JoinError), #[error("naxum error: {0}")] Naxum(#[source] io::Error), #[error("pg pool error: {0}")] PgPool(#[from] si_data_pg::PgPoolError), #[error("si data nats error: {0}")] SiDataNats(#[from] si_data_nats::Error), #[error("ulid decode error: {0}")] UlidDecode(#[from] ulid::DecodeError), #[error("snapshot eviction error: {0}")] SnapshotEviction(#[from] snapshot_eviction::SnapshotEvictionError), } type Result<T> = std::result::Result<T, ServerError>; /// Server metadata, used with telemetry. #[derive(Clone, Debug)] pub(crate) struct ServerMetadata { #[allow(dead_code)] instance_id: String, #[allow(dead_code)] job_invoked_provider: &'static str, } impl ServerMetadata { /// Returns the server's unique instance id. #[allow(dead_code)] pub(crate) fn instance_id(&self) -> &str { &self.instance_id } /// Returns the job invoked provider. #[allow(dead_code)] pub(crate) fn job_invoked_provider(&self) -> &str { self.job_invoked_provider } } /// The forklift server instance with its inner naxum task. pub struct Server { metadata: Arc<ServerMetadata>, shutdown_token: CancellationToken, // TODO(nick): remove option once this is working. inner_audit_logs: Option<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>>, inner_billing_events: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, snapshot_evictor: SnapshotEvictor, } impl fmt::Debug for Server { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Server") .field("metadata", &self.metadata) .field("shutdown_token", &self.shutdown_token) .finish() } } impl Server { /// Creates a forklift server with a running naxum task from a given configuration. #[instrument(name = "forklift.init.from_config", level = "info", skip_all)] pub async fn from_config(config: Config, token: CancellationToken) -> Result<Self> { let nats = Self::connect_to_nats(&config).await?; let connection_metadata = nats.metadata_clone(); let jetstream_context = jetstream::new(nats.clone()); let audit_bag = if config.enable_audit_logs_app() { let insert_concurrency_limit = config.audit().insert_concurrency_limit; let audit_database_context = AuditDatabaseContext::from_config(config.audit()).await?; Some((audit_database_context, insert_concurrency_limit)) } else { None }; // Initialize pools for eviction task let si_db_pool = Self::create_si_db_pool(&config.snapshot_eviction().si_db).await?; let layer_cache_pool = Self::create_layer_cache_pool(&config.snapshot_eviction().layer_cache_pg).await?; // Create LayeredEventClient for eviction let layered_event_client = Self::create_layered_event_client(&nats, config.instance_id(), &jetstream_context)?; // Validate and clamp eviction config let mut eviction_config = config.snapshot_eviction().clone(); eviction_config.validate_and_clamp(); Self::from_services( connection_metadata, jetstream_context, config.instance_id(), config.concurrency_limit(), audit_bag, config.data_warehouse_stream_name(), si_db_pool, layer_cache_pool, layered_event_client, eviction_config, token, ) .await } /// Creates a forklift server with a running naxum task with running services. #[allow(clippy::too_many_arguments)] #[instrument(name = "forklift.init.from_services", level = "info", skip_all)] pub async fn from_services( connection_metadata: Arc<ConnectionMetadata>, jetstream_context: jetstream::Context, instance_id: &str, concurrency_limit: usize, audit_bag: Option<(AuditDatabaseContext, usize)>, data_warehouse_stream_name: Option<&str>, si_db_pool: PgPool, layer_cache_pool: PgPool, layered_event_client: LayeredEventClient, snapshot_eviction_config: snapshot_eviction::SnapshotEvictionConfig, token: CancellationToken, ) -> Result<Self> { let metadata = Arc::new(ServerMetadata { instance_id: instance_id.into(), job_invoked_provider: "si", }); let inner_audit_logs = if let Some((audit_database_context, insert_concurrency_limit)) = audit_bag { Some( app::audit_logs( jetstream_context.clone(), DURABLE_CONSUMER_NAME.to_string(), connection_metadata.clone(), audit_database_context, insert_concurrency_limit, token.clone(), ) .await?, ) } else { None }; let inner_billing_events = app::billing_events( jetstream_context, DURABLE_CONSUMER_NAME.to_string(), connection_metadata, concurrency_limit, data_warehouse_stream_name, token.clone(), ) .await?; // Create snapshot evictor let snapshot_evictor = SnapshotEvictor::new( si_db_pool, layer_cache_pool, layered_event_client, snapshot_eviction_config, ); Ok(Self { metadata, inner_audit_logs, inner_billing_events, snapshot_evictor, shutdown_token: token, }) } /// Infallible wrapper around running the inner naxum task(s). #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { error!(error = ?err, "error while running forklift main loop"); } } /// Fallibly awaits the inner naxum task(s). pub async fn try_run(self) -> Result<()> { // Extract fields to avoid partial move issues let snapshot_evictor = self.snapshot_evictor; let inner_audit_logs = self.inner_audit_logs; let inner_billing_events = self.inner_billing_events; let shutdown_token = self.shutdown_token; // Spawn snapshot eviction background task let eviction_shutdown = shutdown_token.clone(); let eviction_task = tokio::spawn(async move { snapshot_evictor.run(eviction_shutdown).await }); info!("Snapshot eviction task spawned"); // Run existing app tasks let result = match inner_audit_logs { Some(inner_audit_logs) => { info!("running three apps: audit logs, billing events, and snapshot eviction"); let (eviction_result, audit_result, billing_result) = futures::join!( eviction_task, tokio::spawn(inner_audit_logs), tokio::spawn(inner_billing_events) ); // Check eviction task eviction_result??; // Check existing tasks audit_result?.map_err(ServerError::Naxum)?; billing_result?.map_err(ServerError::Naxum)?; Ok(()) } None => { info!("running two apps: billing events and snapshot eviction"); let (eviction_result, billing_result) = futures::join!(eviction_task, tokio::spawn(inner_billing_events)); // Check eviction task eviction_result??; // Check billing task billing_result?.map_err(ServerError::Naxum)?; Ok(()) } }; info!("forklift main loop shutdown complete"); result } #[instrument(name = "forklift.init.create_si_db_pool", level = "info", skip_all)] async fn create_si_db_pool(config: &PgPoolConfig) -> Result<PgPool> { let mut pool_config = config.clone(); // Minimal pool for single eviction task pool_config.pool_max_size = 2; let pool = PgPool::new(&pool_config).await?; debug!("si-db pool initialized for eviction (pool_max_size=2)"); Ok(pool) } #[instrument( name = "forklift.init.create_layer_cache_pool", level = "info", skip_all )] async fn create_layer_cache_pool(config: &PgPoolConfig) -> Result<PgPool> { let mut pool_config = config.clone(); // Minimal pool for single eviction task pool_config.pool_max_size = 2; let pool = PgPool::new(&pool_config).await?; debug!("layer-cache pool initialized for eviction (pool_max_size=2)"); Ok(pool) } #[instrument( name = "forklift.init.create_layered_event_client", level = "info", skip_all )] fn create_layered_event_client( nats: &NatsClient, instance_id: &str, jetstream_context: &jetstream::Context, ) -> Result<LayeredEventClient> { let instance_id_ulid = ulid::Ulid::from_string(instance_id)?; let client = LayeredEventClient::new( nats.metadata().subject_prefix().map(|s| s.to_owned()), instance_id_ulid, jetstream_context.clone(), ); debug!("LayeredEventClient created for snapshot eviction"); Ok(client) } #[instrument(name = "forklift.init.connect_to_nats", level = "info", skip_all)] async fn connect_to_nats(config: &Config) -> Result<NatsClient> { let client = NatsClient::new(config.nats()).await?; debug!("successfully connected nats client"); Ok(client) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server