Skip to main content
Glama
server.rs11 kB
use std::{ fmt, future::IntoFuture as _, net::SocketAddr, path::PathBuf, sync::Arc, }; use audit_database::AuditDatabaseContext; use axum::{ Router, async_trait, routing::IntoMakeService, }; use dal::ServicesContext; use edda_client::EddaClient; use frigg::{ FriggStore, frigg_kv, }; use hyper::server::accept::Accept; use nats_multiplexer::Multiplexer; use nats_multiplexer_client::MultiplexerClient; use sdf_core::nats_multiplexer::EddaUpdatesMultiplexerClient; use si_data_nats::jetstream; use si_data_spicedb::SpiceDbClient; use si_jwt_public_key::JwtPublicSigningKeyChain; use si_posthog::PosthogClient; use telemetry::prelude::*; use tokio::{ io::{ AsyncRead, AsyncWrite, }, signal, sync::RwLock, }; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use crate::{ ApplicationRuntimeMode, AxumApp, Config, IncomingStream, ServerError, ServerResult, WorkspacePermissions, WorkspacePermissionsMode, init, nats_multiplexer::{ CRDT_MULTIPLEXER_SUBJECT, WS_MULTIPLEXER_SUBJECT, }, runnable::Runnable, uds::UdsIncomingStream, }; /// Server metadata, used with telemetry. #[derive(Clone, Debug)] pub struct ServerMetadata { instance_id: String, } impl ServerMetadata { /// Returns the server's unique instance id. pub fn instance_id(&self) -> &str { &self.instance_id } } pub struct Server { metadata: Arc<ServerMetadata>, inner: Box<dyn Runnable + Send>, socket: ServerSocket, } impl fmt::Debug for Server { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Server") .field("metadata", &self.metadata) .field("socket", &self.socket) .finish() } } #[async_trait] impl Runnable for Server { async fn try_run(self) -> ServerResult<()> { self.inner.try_run().await?; info!("luminork main loop shutdown complete"); Ok(()) } } impl Server { #[instrument(name = "luminork.init.from_config", level = "info", skip_all)] pub async fn from_config( config: Config, token: CancellationToken, helping_tasks_tracker: &TaskTracker, helping_tasks_token: CancellationToken, ) -> ServerResult<Self> { let (services_context, layer_db_graceful_shutdown) = init::services_context_from_config(&config, helping_tasks_token.clone()).await?; let jwt_public_signing_key = init::load_jwt_public_signing_key( config.jwt_signing_public_key().clone(), config.jwt_secondary_signing_public_key().cloned(), ) .await?; let (posthog_sender, posthog_client) = init::initialize_posthog(config.posthog(), helping_tasks_token.clone())?; let (ws_multiplexer, ws_multiplexer_client) = Multiplexer::new( services_context.nats_conn(), WS_MULTIPLEXER_SUBJECT, helping_tasks_token.clone(), ) .await?; let (crdt_multiplexer, crdt_multiplexer_client) = Multiplexer::new( services_context.nats_conn(), CRDT_MULTIPLEXER_SUBJECT, helping_tasks_token.clone(), ) .await?; let (edda_updates_multiplexer, edda_updates_multiplexer_client) = Multiplexer::new( services_context.nats_conn(), edda_core::nats::subject::all_edda_updates( services_context.nats_conn().metadata().subject_prefix(), ), helping_tasks_token.clone(), ) .await?; let edda_updates_multiplexer_client = EddaUpdatesMultiplexerClient::new(edda_updates_multiplexer_client); let application_runtime_mode = Arc::new(RwLock::new(ApplicationRuntimeMode::Running)); let mut spicedb_client = None; if config.spicedb().enabled { spicedb_client = Some(SpiceDbClient::new(config.spicedb()).await?); } let frigg = { let nats = services_context.nats_conn().clone(); let context = jetstream::new(nats.clone()); FriggStore::new( nats, frigg_kv(&context, context.metadata().subject_prefix()).await?, ) }; prepare_maintenance_mode_watcher(application_runtime_mode.clone(), token.clone())?; // Spawn helping tasks and track them for graceful shutdown helping_tasks_tracker.spawn(layer_db_graceful_shutdown.into_future()); helping_tasks_tracker.spawn(posthog_sender.run()); helping_tasks_tracker.spawn(ws_multiplexer.run()); helping_tasks_tracker.spawn(crdt_multiplexer.run()); helping_tasks_tracker.spawn(edda_updates_multiplexer.run()); let audit_database_context = AuditDatabaseContext::from_config(config.audit()).await?; let edda_client = edda_client::EddaClient::new(services_context.nats_conn().clone()).await?; Self::from_services( config.instance_id().to_string(), config.incoming_stream().clone(), services_context, jwt_public_signing_key, posthog_client, config.auth_api_url(), ws_multiplexer_client, crdt_multiplexer_client, edda_updates_multiplexer_client, *config.create_workspace_permissions(), config.create_workspace_allowlist().clone(), application_runtime_mode, token, spicedb_client, frigg, audit_database_context, edda_client, ) .await } #[instrument(name = "luminork.init.from_services", level = "info", skip_all)] #[allow(clippy::too_many_arguments)] pub async fn from_services( instance_id: impl Into<String>, incoming_stream: IncomingStream, services_context: ServicesContext, jwt_public_signing_key_chain: JwtPublicSigningKeyChain, posthog_client: PosthogClient, auth_api_url: impl AsRef<str>, ws_multiplexer_client: MultiplexerClient, crdt_multiplexer_client: MultiplexerClient, edda_updates_multiplexer_client: EddaUpdatesMultiplexerClient, create_workspace_permissions: WorkspacePermissionsMode, create_workspace_allowlist: Vec<WorkspacePermissions>, application_runtime_mode: Arc<RwLock<ApplicationRuntimeMode>>, token: CancellationToken, spicedb_client: Option<SpiceDbClient>, frigg: FriggStore, audit_database_context: AuditDatabaseContext, edda_client: EddaClient, ) -> ServerResult<Self> { let app = AxumApp::from_services( services_context.clone(), jwt_public_signing_key_chain, posthog_client, auth_api_url, ws_multiplexer_client, crdt_multiplexer_client, edda_updates_multiplexer_client, create_workspace_permissions, create_workspace_allowlist, application_runtime_mode, token.clone(), spicedb_client, frigg, audit_database_context.clone(), edda_client, ) .into_inner(); let (inner, socket): (Box<dyn Runnable + Send>, _) = match incoming_stream { IncomingStream::TcpSocket(socket_addr) => { debug!(%socket_addr, "binding to tcp socket"); let inner = axum::Server::bind(&socket_addr).serve(app.into_make_service()); let socket = inner.local_addr(); info!(%socket, "http service bound to tcp socket"); ( Box::new(InnerServer { inner, token }), ServerSocket::SocketAddr(socket), ) } IncomingStream::UnixDomainSocket(path) => { debug!(path = %path.display(), "binding to unix domain socket"); let inner = axum::Server::builder(UdsIncomingStream::create(&path).await?) .serve(app.into_make_service()); let socket = path; info!(socket = %socket.display(), "http service bound to unix domain socket"); ( Box::new(InnerServer { inner, token }), ServerSocket::DomainSocket(socket), ) } }; let metadata = Arc::new(ServerMetadata { instance_id: instance_id.into(), }); Ok(Self { metadata, inner, socket, }) } #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { error!(error = ?err, "error while running luminork main loop"); } } } #[derive(Debug)] #[remain::sorted] pub enum ServerSocket { DomainSocket(PathBuf), SocketAddr(SocketAddr), } struct InnerServer<I> { inner: axum::Server<I, IntoMakeService<Router>>, token: CancellationToken, } #[async_trait] impl<I, IO, IE> Runnable for InnerServer<I> where I: Accept<Conn = IO, Error = IE> + Send + Sync, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, IE: Into<Box<dyn std::error::Error + Send + Sync>>, { async fn try_run(self) -> ServerResult<()> { let token = self.token; self.inner .with_graceful_shutdown(async { token.cancelled().await; }) .await .map_err(ServerError::Axum) } } fn prepare_maintenance_mode_watcher( mode: Arc<RwLock<ApplicationRuntimeMode>>, cancellation_token: CancellationToken, ) -> ServerResult<()> { let mut sigusr2_watcher = signal::unix::signal(signal::unix::SignalKind::user_defined2()) .map_err(ServerError::Signal)?; tokio::spawn(async move { loop { tokio::select! { _ = sigusr2_watcher.recv() => { info!("received SIGUSR2 signal, changing application runtime mode"); let mut mode = mode.write().await; info!(?mode, "current application runtime mode (changing it...)"); *mode = match *mode { ApplicationRuntimeMode::Maintenance => ApplicationRuntimeMode::Running, ApplicationRuntimeMode::Running => ApplicationRuntimeMode::Maintenance, }; info!(?mode, "new application runtime mode (changed!)"); } _ = cancellation_token.cancelled() => { break } else => { // All other arms are closed, nothing left to do but return trace!("returning from graceful shutdown with all select arms closed"); break } } } }); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server