Skip to main content
Glama
server.rs15.4 kB
use std::{ collections::HashMap, convert::Infallible, fmt, future::{ Future, IntoFuture as _, }, io, sync::Arc, time::Duration, }; use futures::{ StreamExt, join, }; use naxum::{ MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, extract::MatchedSubject, handler::Handler as _, middleware::{ ack::AckLayer, matched_subject::{ ForSubject, MatchedSubjectLayer, }, trace::TraceLayer, }, response::{ IntoResponse, Response, }, }; use si_crypto::VeritechDecryptionKey; use si_data_nats::{ NatsClient, NatsConfig, Subscriber, async_nats, jetstream, }; use si_pool_noodle::{ KillExecutionRequest, PoolNoodle, Spec, instance::cyclone::{ LocalUdsInstance, LocalUdsInstanceSpec, }, pool_noodle::PoolNoodleConfig, }; use telemetry::prelude::*; use telemetry_utils::metric; use tokio::sync::{ Mutex, oneshot, }; use tokio_util::sync::CancellationToken; use veritech_core::{ ExecutionId, GetNatsSubjectFor, incoming_subject, veritech_work_queue, }; use crate::{ Config, ServerError, ServerResult, app_state::{ AppState, KillAppState, }, config::CycloneSpec, handlers, heartbeat::HeartbeatApp, }; const CONSUMER_NAME: &str = "veritech-server"; const CONSUMER_MAX_DELIVERY: i64 = 5; /// Server metadata, used with telemetry. #[derive(Clone, Debug)] pub struct ServerMetadata { #[allow(dead_code)] instance_id: String, } impl ServerMetadata { /// Returns the server's unique instance id. #[allow(dead_code)] pub fn instance_id(&self) -> &str { &self.instance_id } } pub struct Server { metadata: Arc<ServerMetadata>, inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, kill_inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, shutdown_token: CancellationToken, } impl fmt::Debug for Server { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Server") .field("metadata", &self.metadata) .field("shutdown_token", &self.shutdown_token) .finish() } } impl Server { #[instrument(name = "veritech.init.from_config", level = "info", skip_all)] pub async fn from_config( config: Config, token: CancellationToken, ) -> ServerResult<(Self, Option<HeartbeatApp>)> { let nats = Self::connect_to_nats(config.nats()).await?; let mut nats_config = config.nats().clone(); if let Some(name) = nats_config.connection_name { nats_config.connection_name = Some(format!("{name}-stream")); } let nats_jetstream = Self::connect_to_nats(&nats_config).await?; let metadata = Arc::new(ServerMetadata { instance_id: config.instance_id().into(), }); let decryption_key = VeritechDecryptionKey::from_config(config.crypto().clone()).await?; let kill_senders = Arc::new(Mutex::new(HashMap::new())); match config.cyclone_spec() { CycloneSpec::LocalHttp(_spec) => { // // TODO(fnichol): Hi there! Ultimately, the Veritech server should be able to work // with a LocalUds Cyclone backend and a LocalHttp version. But since this involves // threading through some generic types which themselves have trait // constraints--well we can add this back in the near future... Good luck to us // all. // // let mut cyclone_pool: PoolNoodle<LocalUdsInstance, LocalUdsInstanceSpec> = // PoolNoodle::new(spec.pool_size.into(), spec.clone(), token.clone()); // // spec.clone() // .setup() // .await // .map_err(|e| ServerError::CycloneSetupError(Box::new(e)))?; // cyclone_pool // .start(config.healthcheck_pool()) // .map_err(|e| ServerError::CyclonePool(Box::new(e)))?; // // // ... // // Ok(Server { // metadata, // inner: inner_future, // kill_inner: kill_inner_future, // shutdown_token: token, // }) unimplemented!("get ready for a surprise!!") } CycloneSpec::LocalUds(spec) => { let pool_config = PoolNoodleConfig { check_health: config.healthcheck_pool(), pool_size: spec.pool_size, shutdown_token: token.clone(), spec: spec.clone(), ..Default::default() }; let mut cyclone_pool: PoolNoodle<LocalUdsInstance, LocalUdsInstanceSpec> = PoolNoodle::new(pool_config).await; spec.clone() .setup() .await .map_err(|e| ServerError::CycloneSetupError(Box::new(e)))?; cyclone_pool .run() .map_err(|e| ServerError::CyclonePool(Box::new(e)))?; // Reset metrics before creating the naxum apps. metric!(counter.veritech.handlers_doing_work = 0); metric!(counter.veritech.pool_exhausted = 0); let inner_future = Self::build_app( metadata.clone(), config.veritech_requests_concurrency_limit(), cyclone_pool, Arc::new(decryption_key), config.cyclone_client_execution_timeout(), nats.clone(), nats_jetstream.clone(), kill_senders.clone(), token.clone(), ) .await?; let kill_inner_future = Self::build_kill_app( metadata.clone(), nats.clone(), kill_senders, token.clone(), ) .await?; let maybe_heartbeat_app = if config.heartbeat_app() { Some(HeartbeatApp::new( nats, token.clone(), config.instance_id(), config.heartbeat_app_sleep_duration(), config.heartbeat_app_publish_timeout_duration(), )) } else { None }; Ok(( Server { metadata, inner: inner_future, kill_inner: kill_inner_future, shutdown_token: token, }, maybe_heartbeat_app, )) } } } #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { error!(si.error.message = ?err, "error while running veritech main loop"); } } pub async fn try_run(self) -> ServerResult<()> { let (inner_result, kill_inner_result) = join!(tokio::spawn(self.inner), tokio::spawn(self.kill_inner)); // NOTE(nick,fletcher): we need to cancel before going to the kill app in case the stream // has closed, but the cancellation token has not been cancelled. This is a valid way to // close and we should handle it more gracefully in the future, but explicitly calling the // "cancel" method is sufficient here. match inner_result? { Ok(()) => self.shutdown_token.cancel(), Err(err) => return Err(ServerError::Naxum(err)), } kill_inner_result?.map_err(ServerError::Naxum)?; info!("veritech main loop shutdown complete"); Ok(()) } #[allow(clippy::too_many_arguments)] async fn build_app( metadata: Arc<ServerMetadata>, concurrency_limit: usize, cyclone_pool: PoolNoodle<LocalUdsInstance, LocalUdsInstanceSpec>, decryption_key: Arc<VeritechDecryptionKey>, cyclone_client_execution_timeout: Duration, nats: NatsClient, nats_jetstream: NatsClient, kill_senders: Arc<Mutex<HashMap<ExecutionId, oneshot::Sender<()>>>>, token: CancellationToken, ) -> ServerResult<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { let connection_metadata = nats_jetstream.metadata_clone(); // Take the *active* subject prefix from the connected NATS client let prefix = nats_jetstream .metadata() .subject_prefix() .map(|s| s.to_owned()); let incoming = { let context = jetstream::new(nats_jetstream); veritech_work_queue(&context, prefix.as_deref()) .await? .create_consumer(Self::incoming_consumer_config(prefix.as_deref())) .await? .messages() .await? }; let state = AppState::new( metadata, cyclone_pool, decryption_key, cyclone_client_execution_timeout, nats, kill_senders, ); let app = ServiceBuilder::new() .layer( MatchedSubjectLayer::new() .for_subject(VeritechForSubject::with_prefix(prefix.as_deref())), ) .layer( TraceLayer::new() .make_span_with( telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), ) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(AckLayer::new()) .service(handlers::process_request.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) .with_graceful_shutdown(naxum::wait_on_cancelled(token)); Ok(Box::new(inner.into_future())) } async fn build_kill_app( metadata: Arc<ServerMetadata>, nats: NatsClient, kill_senders: Arc<Mutex<HashMap<ExecutionId, oneshot::Sender<()>>>>, token: CancellationToken, ) -> ServerResult<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> { let connection_metadata = nats.metadata_clone(); let incoming = { let prefix = nats.metadata().subject_prefix().map(|s| s.to_owned()); Self::kill_subscriber(&nats, prefix.as_deref()) .await? .map(|msg| msg.into_parts().0) // Core NATS subscriptions are a stream of `Option<Message>` so we convert this // into a stream of `Option<Result<Message, Infallible>>` .map(Ok::<_, Infallible>) }; let state = KillAppState::new(metadata, nats, kill_senders); let app = ServiceBuilder::new() .layer( TraceLayer::new() .make_span_with( telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), ) .on_response(telemetry_nats::NatsOnResponse::new()), ) .service(handlers::process_kill_request.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve(incoming, app.into_make_service()) .with_graceful_shutdown(naxum::wait_on_cancelled(token)); Ok(Box::new(inner.into_future())) } // NOTE(nick,fletcher): it's a little funky that the prefix is taken from the nats client, but // we ask for both here. We don't want users to forget the prefix, so being explicit is // helpful, but maybe we can change this in the future. async fn kill_subscriber(nats: &NatsClient, prefix: Option<&str>) -> ServerResult<Subscriber> { // we have to make a dummy request here to get the nats subject from it let dummy_request = KillExecutionRequest { execution_id: "".into(), }; let subject = dummy_request.nats_subject(prefix, None, None); nats.subscribe(subject.clone()) .await .map_err(|err| ServerError::NatsSubscribe(subject, err)) } #[instrument(name = "veritech.init.connect_to_nats", level = "info", skip_all)] async fn connect_to_nats(config: &NatsConfig) -> ServerResult<NatsClient> { let client = NatsClient::new(config) .await .map_err(ServerError::NatsClient)?; debug!("successfully connected nats client"); Ok(client) } #[inline] fn incoming_consumer_config( subject_prefix: Option<&str>, ) -> async_nats::jetstream::consumer::pull::Config { async_nats::jetstream::consumer::pull::Config { durable_name: Some(CONSUMER_NAME.to_owned()), filter_subject: incoming_subject(subject_prefix).to_string(), max_deliver: CONSUMER_MAX_DELIVERY, ..Default::default() } } } #[derive(Clone, Debug)] struct VeritechForSubject { prefix: Option<()>, } impl VeritechForSubject { fn with_prefix(prefix: Option<&str>) -> Self { Self { prefix: prefix.map(|_p| ()), } } } impl<R> ForSubject<R> for VeritechForSubject where R: MessageHead, { fn call(&mut self, req: &mut naxum::Message<R>) { let mut parts = req.subject().split('.'); match self.prefix { Some(_) => { if let ( Some(prefix), Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), Some(kind), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{prefix}.{p1}.{p2}.:workspace_id.:change_set_id.{kind}"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } None => { if let ( Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), Some(kind), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{p1}.{p2}.:workspace_id.:change_set_id.{kind}"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server