Skip to main content
Glama
compressing_stream.rs99.7 kB
use std::{ collections::{ HashMap, VecDeque, }, error, fmt, mem, pin::Pin, result, task::{ Context, Poll, }, time::Instant, }; use edda_core::api_types::{ ContentInfo, ContentInfoError, Negotiate, NegotiateError, }; use futures::{ FutureExt, Stream, StreamExt, TryStreamExt, future::BoxFuture, }; use naxum::MessageHead; use pin_project_lite::pin_project; use serde::Serialize; use si_data_nats::{ HeaderMap, Subject, async_nats::jetstream::{ self, stream::DeleteMessageError, }, }; use strum::AsRefStr; use telemetry::{ OpenTelemetrySpanExt, opentelemetry::trace::{ SpanContext, TraceContextExt, }, prelude::*, }; use telemetry_nats::propagation; use telemetry_utils::{ histogram, monotonic, }; use thiserror::Error; use tokio::sync::watch; use crate::{ api_types::{ CompressFromRequests, CompressedRequestError, }, local_message::LocalMessage, }; #[remain::sorted] #[derive(Debug, Error)] pub enum CompressingStreamError { #[error("failed to get count of messages on subject {0}; setting value to `1`")] CalculateReadWindowCountNotFound(Subject), #[error("failed to compress requests; skipping requests & deleting messages: {0}")] CompressedRequest(CompressedRequestError), #[error("failed to compress requests; skipping requests, deleting messages & closing: {0}")] CompressedRequestBeforeClose(CompressedRequestError), #[error("failed to delete message from stream; skipping message & deleting remaining: {0}")] DeleteStreamMessage(DeleteMessageError), #[error("failed to delete message from stream; skipping message, & deleting remaining: {0}")] DeleteStreamMessageAfterCompressError(DeleteMessageError), #[error( "failed to delete message from stream; skipping message, deleting remaining & closing: {0}" )] DeleteStreamMessageAfterCompressErrorBeforeClose(DeleteMessageError), #[error( "failed to delete message from stream; skipping message, deleting remaining & closing: {0}" )] DeleteStreamMessageBeforeClose(DeleteMessageError), #[error("failed to parse info from first message; skipping message: {0}")] FirstMessageInfoParse(Box<dyn error::Error + Send + Sync + 'static>), #[error( "failed to parse info from next message; skipping message & compressing remaining: {0}" )] NextMessageInfoParse(Box<dyn error::Error + Send + Sync + 'static>), #[error("failed to parse api request from first message; deleting message: {0}")] ParseFirstRequest(NegotiateError), #[error( "failed to parse api request from next message; skipping message & compressing remaining: {0}" )] ParseNextRequestInWindow(NegotiateError), #[error("error on subscription stream on first read; skipping message: {0}")] ReadFirstMessage(Box<dyn error::Error + Send + Sync + 'static>), #[error( "error on subscription stream on next read; skipping message & compressing remaining: {0}" )] ReadNextMessageInWindow(Box<dyn error::Error + Send + Sync + 'static>), #[error("failed to serialize compressed request to local message: {0}")] SerializeLocalMessage(serde_json::Error), #[error("failed to serialize compressed request to local message; closing: {0}")] SerializeLocalMessageBeforeClose(serde_json::Error), } type Result<T> = result::Result<T, CompressingStreamError>; type Error = CompressingStreamError; /// Internal state machine of [`CompressingStream`]. #[derive(AsRefStr, Default)] enum State<R, C> { #[default] /// 1. Reading the first message from the subscription ReadFirstMessage, /// 2. Calculating the number of messages to read, a.k.a the "read window" CalculateReadWindow { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The message to be parsed ([`Option`] for `mem::take()`) message: Box<Option<jetstream::Message>>, /// The stream sequence number of the first message message_stream_sequence: u64, /// A [`Future`] that calculates the read window calculate_read_window_fut: BoxFuture< 'static, result::Result<usize, Box<dyn error::Error + Send + Sync + 'static>>, >, }, /// 3. Parsing the first message into an API request ParseFirstRequest { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The number of messages to read read_window: usize, /// The stream sequence number of the first message message_stream_sequence: u64, /// A [`Future`] that parses a Jetstream message into an API request and extracts headers parse_message_fut: BoxFuture<'static, result::Result<(R, Option<HeaderMap>), NegotiateError>>, }, /// 4. Reading the next message from the subscription in the read window ReadNextMessageInWindow { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The number of messages to read read_window: usize, /// The accumulated list of read and parsed API requests requests: Vec<R>, /// The accumulated list of stream sequence numbers to later delete stream_sequence_numbers: VecDeque<u64>, /// The accumulated list of span contexts from incoming messages for creating span links span_contexts: Vec<SpanContext>, }, /// 5. Parsing the next message into an API request in the read window ParseNextRequestInWindow { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The number of messages to read read_window: usize, /// The stream sequence number of the initial message message_stream_sequence: u64, /// A [`Future`] that parses a Jetstream message into an API request and extracts headers parse_message_fut: BoxFuture<'static, result::Result<(R, Option<HeaderMap>), NegotiateError>>, /// The accumulated list of read and parsed API requests requests: Vec<R>, /// The accumulated list of stream sequence numbers to later delete stream_sequence_numbers: VecDeque<u64>, /// The accumulated list of span contexts from incoming messages for creating span links span_contexts: Vec<SpanContext>, }, /// 5. Compressing multiple API requests into a single compressed request CompressRequests { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// A [`Future`] that compresses multiple API requests into a single "compressed" request compress_messages_fut: BoxFuture<'static, result::Result<C, CompressedRequestError>>, /// The accumulated list of stream sequence numbers to later delete stream_sequence_numbers: VecDeque<u64>, /// The accumulated list of span contexts from incoming messages for creating span links span_contexts: Vec<SpanContext>, }, /// 7. Deleting a message from the Jetstream stream DeleteStreamMessage { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// A [`Future`] that deletes a message from the Jetstream stream delete_message_fut: BoxFuture<'static, result::Result<(), DeleteMessageError>>, /// The remaining list of stream sequence numbers to delete stream_sequence_numbers: VecDeque<u64>, /// The compressed request to later yield (outer [`Option`] for `mem::take()`, and inner is /// when there were no requests to be compressed) compressed_request: Option<Option<C>>, /// The number of successfully deleted messages deleted_count: usize, }, /// 8. Converting request into final message to yield from [`Stream`] YieldItem { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The compressed request to later yield ([`Option`] for `mem::take()`) compressed_request: Option<C>, }, /// 3.1 Deleting the first message after error DeleteFirstMessageAfterError { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// A [`Future`] that deletes a message from the Jetstream stream delete_message_fut: BoxFuture<'static, result::Result<(), DeleteMessageError>>, }, /// 4.1 Compressing remaining requests before closing [`Stream`] CompressRequestsAndClose { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// A [`Future`] that compresses multiple API requests into a single "compressed" request compress_messages_fut: BoxFuture<'static, result::Result<C, CompressedRequestError>>, /// The accumulated list of stream sequence numbers to later delete stream_sequence_numbers: VecDeque<u64>, /// The accumulated list of span contexts from incoming messages for creating span links span_contexts: Vec<SpanContext>, }, /// 4.2 Deleting messages from the Jetstream stream before closing [`Stream`] DeleteStreamMessageAndClose { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// A [`Future`] that deletes a message from the Jetstream stream delete_message_fut: BoxFuture<'static, result::Result<(), DeleteMessageError>>, /// The compressed request to later yield (outer [`Option`] for `mem::take()`, and inner is /// when there were no requests to be compressed) compressed_request: Option<Option<C>>, /// The remaining list of stream sequence numbers to delete stream_sequence_numbers: VecDeque<u64>, /// The number of successfully deleted messages deleted_count: usize, }, /// 4.3. Converting request into final message to yield from [`Stream`] before closing /// [`Stream`] YieldItemAndClose { /// The [`Subject`] to be used on the compressed request ([`Option`] for `mem::take()`) subject: Option<Subject>, /// The compressed request to later yield ([`Option`] for `mem::take()`) compressed_request: Option<C>, }, /// 4.1.1 Closing the stream CloseStream, /// 6.1 Deleting messages from the Jetstream stream after failing to compress requests DeleteStreamMessageAfterCompressError { /// A [`Future`] that deletes a message from the Jetstream stream delete_message_fut: BoxFuture<'static, result::Result<(), DeleteMessageError>>, /// The remaining list of stream sequence numbers to delete stream_sequence_numbers: VecDeque<u64>, /// The number of successfully deleted messages deleted_count: usize, }, /// 4.1.2 Deleting messages from the Jetstream stream after failing to compress requests before /// closing [`Stream`] DeleteStreamMessageAfterCompressErrorAndClose { /// A [`Future`] that deletes a message from the Jetstream stream delete_message_fut: BoxFuture<'static, result::Result<(), DeleteMessageError>>, /// The remaining list of stream sequence numbers to delete stream_sequence_numbers: VecDeque<u64>, /// The number of successfully deleted messages deleted_count: usize, }, } pin_project! { pub struct CompressingStream<S, R, C> { #[pin] subscription: S, stream: jetstream::stream::Stream, state: State<R, C>, last_compressing_heartbeat_tx: Option<watch::Sender<Instant>>, span: Option<Span>, span_started_at: Option<Instant>, } } impl<S, R, C> CompressingStream<S, R, C> { /// Creates and return a new CompressingStream. pub fn new( subscription: S, stream: jetstream::stream::Stream, last_compressing_heartbeat_tx: impl Into<Option<watch::Sender<Instant>>>, ) -> Self { Self { subscription, stream, state: Default::default(), last_compressing_heartbeat_tx: last_compressing_heartbeat_tx.into(), span: None, span_started_at: None, } } } impl<S, R, C> fmt::Debug for CompressingStream<S, R, C> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CompressingStream").finish_non_exhaustive() } } impl<S, R, C, E> Stream for CompressingStream<S, R, C> where S: Stream<Item = result::Result<jetstream::Message, E>>, R: Negotiate + Send + 'static, C: Serialize + CompressFromRequests<Request = R> + AsRef<str>, E: error::Error + Send + Sync + 'static, { type Item = Result<LocalMessage>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); loop { let span = this.span.get_or_insert_with(|| { let follows_from = Span::current(); let s = span!( parent: None, Level::INFO, "compressing_stream.next", messages.deleted.count = Empty, messaging.destination.name = Empty, read_window.count = Empty, requests.count = Empty, compressed.kind = Empty, task.state = Empty, ); s.follows_from(follows_from); // Track when span started for total duration metric *this.span_started_at = Some(Instant::now()); s }); let _guard = span.enter(); match this.state { // 1. Reading the first message from the subscription State::ReadFirstMessage => { // Read first message from subscription let poll_start = Instant::now(); match this.subscription.poll_next_unpin(cx) { // Read the first Jetstream message successfully Poll::Ready(Some(Ok(message))) => { let poll_duration = poll_start.elapsed(); debug!( poll_duration_ms = poll_duration.as_millis(), "subscription.poll_next_unpin completed for first message", ); // Metrics: Track subscription poll latency and message count histogram!( compressing_stream_subscription_poll_latency_ms = poll_duration.as_millis() as f64, message_position = "first" ); monotonic!( compressing_stream_messages_read = 1, message_position = "first" ); update_heartbeat(this.last_compressing_heartbeat_tx); // Determine the stream sequence number of this message so we can // delete it later. let message_stream_sequence = match message.info() { // Info parsed successfully from the message Ok(info) => info.stream_sequence, // Failed to parse [`Info`] from message Err(err) => { // We can't delete this message easily as the sequence number // comes from the [`Info`] struct, so we're going to restart // the whole process trace!( si.error.message = ?err, messaging.destination.name = message.subject.as_str(), concat!( "failed to parse Info from first message; ", "skipping message", ), ); // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err(Error::FirstMessageInfoParse( err, )))); } }; span.record("messaging.destination.name", message.subject.as_str()); let subject = Some(message.subject.clone()); let fut_subject = message.subject.clone(); let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::CalculateReadWindow { subject, message: Box::new(Some(message)), message_stream_sequence, calculate_read_window_fut: Box::pin(async move { let info: HashMap<_, _> = stream .info_with_subjects(fut_subject.as_str()) .await? .try_collect() .await?; let message_count_on_subject = info.get(fut_subject.as_str()).copied().ok_or( Error::CalculateReadWindowCountNotFound(fut_subject), )?; Ok(message_count_on_subject) }), }; continue; } // Subscription stream yielded an error as the next item Poll::Ready(Some(Err(err))) => { update_heartbeat(this.last_compressing_heartbeat_tx); // We can't delete this message easily as the sequence number // comes from the [`Info`] struct, so we're going to restart // the whole process trace!( si.error.message = ?err, "error on subscription stream on first read; skipping message", ); // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err(Error::ReadFirstMessage(Box::new(err))))); } // Subscription stream has closed, so we close Poll::Ready(None) => { span.record("task.state", this.state.as_ref()); span.record_ok(); return Poll::Ready(None); } // Pending on the first message, so we are pending too Poll::Pending => { let pending_duration = poll_start.elapsed(); if pending_duration.as_millis() > 0 { debug!( pending_duration_ms = pending_duration.as_millis(), "subscription.poll_next_unpin returned Pending for first message", ); } return Poll::Pending; } } } // 2. Calculating the number of messages to read, a.k.a the "read window" State::CalculateReadWindow { subject, message, message_stream_sequence, calculate_read_window_fut, } => { // Caclulate the number of messages available to read, a.k.a the "read window". // This is the number of messages we will unconditionally read in as our "read // window". let calc_start = Instant::now(); match calculate_read_window_fut.poll_unpin(cx) { // Read window calculated successfully Poll::Ready(Ok(read_window)) => { let calc_duration = calc_start.elapsed(); debug!( calc_duration_ms = calc_duration.as_millis(), read_window = read_window, "calculate_read_window completed", ); // Metrics: Track read window calculation latency and size histogram!( compressing_stream_read_window_calc_latency_ms = calc_duration.as_millis() as f64 ); histogram!(compressing_stream_read_window_size = read_window as f64); update_heartbeat(this.last_compressing_heartbeat_tx); span.record("read_window.count", read_window); let message = message .take() .expect("extracting owned value only happens once"); debug!( first_message_info_pending = message .info() .map(|info| info.pending as isize) .unwrap_or(-1), stream_info_with_subjects_count = read_window, "calculated number of messages to read" ); // Set next state and continue loop *this.state = State::ParseFirstRequest { subject: subject.take(), read_window, message_stream_sequence: *message_stream_sequence, parse_message_fut: Box::pin(async move { parse_message(message) }), }; continue; } // Failed to determine read window Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // We can't determine the read window, so we'll set it to `1` and // continue as there is work we can do with the first message warn!( si.error.message = ?err, concat!( "failed calculate read window; ", "setting value to `1`", ), ); let read_window = 1; let message = message .take() .expect("extracting owned value only happens once"); // Set next state and continue loop *this.state = State::ParseFirstRequest { subject: subject.take(), read_window, message_stream_sequence: *message_stream_sequence, parse_message_fut: Box::pin(async move { parse_message(message) }), }; continue; } // Pending on parse message, so we are pending too Poll::Pending => return Poll::Pending, } } // 3. Parsing the first message into an API request State::ParseFirstRequest { subject, read_window, message_stream_sequence, parse_message_fut, } => { // Parse API request from Jetstream message match parse_message_fut.poll_unpin(cx) { // API request parsed successfully Poll::Ready(Ok((request, headers))) => { update_heartbeat(this.last_compressing_heartbeat_tx); let mut requests = Vec::with_capacity(*read_window); requests.push(request); let mut stream_sequence_numbers = VecDeque::with_capacity(*read_window); stream_sequence_numbers.push_back(*message_stream_sequence); // Extract span context from headers for span linking. This is the // first request, so we need to initialize the contexts vec collection. let mut span_contexts = Vec::with_capacity(*read_window); if let Some(ref headers) = headers { let otel_ctx = propagation::extract_opentelemetry_context(headers); let span_ctx = otel_ctx.span().span_context().clone(); span_contexts.push(span_ctx); } // We've read all message in the read window if requests.len() == *read_window { // There are no requests to compress if requests.is_empty() { // Pop the first sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessage { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(None), stream_sequence_numbers, deleted_count: 0, }; continue; } // The delete list is empty None => { // Nothing to compress and nothing to delete, so // re-start state machine span.record("messages.deleted.count", 0); // Set next state and continue loop *this.state = State::ReadFirstMessage; drop(_guard); *this.span = None; continue; } } } // There are requests to compress else { span.record("requests.count", requests.len()); // Set next state and continue loop *this.state = State::CompressRequests { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers, span_contexts, }; continue; } } // There are remaining messages in the read window else { // Set next state and continue loop *this.state = State::ReadNextMessageInWindow { subject: subject.take(), read_window: *read_window, requests, stream_sequence_numbers, span_contexts, }; continue; } } // Failed to parse API request from message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // Set next state to delete this message and restart the state trace!( si.error.message = ?err, "failed to parse api request from first message; deleting message", ); let stream = this.stream.clone(); let message_stream_sequence = *message_stream_sequence; // Set next state and return error *this.state = State::DeleteFirstMessageAfterError { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::ParseFirstRequest(err)))); } // Pending on parse message, so we are pending too Poll::Pending => return Poll::Pending, } } // 4. Reading the next message from the subscription in the read window State::ReadNextMessageInWindow { subject, read_window, requests, stream_sequence_numbers, span_contexts, } => { // Read next message from subscription in read window let poll_start = Instant::now(); match this.subscription.poll_next_unpin(cx) { Poll::Ready(Some(Ok(message))) => { let poll_duration = poll_start.elapsed(); debug!( poll_duration_ms = poll_duration.as_millis(), requests_so_far = requests.len(), read_window = *read_window, "subscription.poll_next_unpin completed for next message in window", ); // Metrics: Track subscription poll latency and message count histogram!( compressing_stream_subscription_poll_latency_ms = poll_duration.as_millis() as f64, message_position = "subsequent" ); monotonic!( compressing_stream_messages_read = 1, message_position = "subsequent" ); update_heartbeat(this.last_compressing_heartbeat_tx); // Determine the stream sequence number of this message so we can // delete it later. let message_stream_sequence = match message.info() { // Info parsed successfully from the message Ok(info) => info.stream_sequence, // Failed to parse [`Info`] from message Err(err) => { // We can't delete this message easily as the sequence number // comes from the [`Info`] struct, so we're going to move to // compress what has been accumulated trace!( si.error.message = ?err, messaging.destination.name = message.subject.as_str(), concat!( "failed to parse info from next message; ", "skipping message & compressing remaining", ), ); let requests = mem::take(requests); span.record("requests.count", requests.len()); // Set next state and return error *this.state = State::CompressRequests { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers: mem::take(stream_sequence_numbers), span_contexts: mem::take(span_contexts), }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::NextMessageInfoParse( err, )))); } }; // Set next state and continue loop *this.state = State::ParseNextRequestInWindow { subject: subject.take(), read_window: *read_window, message_stream_sequence, parse_message_fut: Box::pin(async move { parse_message(message) }), requests: mem::take(requests), stream_sequence_numbers: mem::take(stream_sequence_numbers), span_contexts: mem::take(span_contexts), }; continue; } // Subscription stream yielded an error as the next item Poll::Ready(Some(Err(err))) => { update_heartbeat(this.last_compressing_heartbeat_tx); // Set up state to move into compression of what has been accumulated trace!( si.error.message = ?err, concat!( "error on subscription stream on next read; ", "skipping message & compressing remaining", ), ); let requests = mem::take(requests); span.record("requests.count", requests.len()); // Set next state and return error *this.state = State::CompressRequests { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers: mem::take(stream_sequence_numbers), span_contexts: mem::take(span_contexts), }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::ReadNextMessageInWindow( Box::new(err), )))); } // Subscription stream has closed Poll::Ready(None) => { // Set up state to compress remaining and then close our stream trace!(concat!( "subscription stream is closed on next read; ", "compressing remaining & closing stream", )); let requests = mem::take(requests); // Set next state and continue loop *this.state = State::CompressRequestsAndClose { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers: mem::take(stream_sequence_numbers), span_contexts: mem::take(span_contexts), }; continue; } // Pending on the next message, so we are pending too Poll::Pending => { let pending_duration = poll_start.elapsed(); if pending_duration.as_millis() > 0 { debug!( pending_duration_ms = pending_duration.as_millis(), requests_so_far = requests.len(), read_window = *read_window, "subscription.poll_next_unpin returned Pending for next message in window", ); } return Poll::Pending; } } } // 5. Parsing the next message into an API request in the read window State::ParseNextRequestInWindow { subject, read_window, message_stream_sequence, parse_message_fut, requests, stream_sequence_numbers, span_contexts, } => { // Parse API request from Jetstream message match parse_message_fut.poll_unpin(cx) { // API request parsed successfully Poll::Ready(Ok((request, headers))) => { update_heartbeat(this.last_compressing_heartbeat_tx); requests.push(request); stream_sequence_numbers.push_back(*message_stream_sequence); // Extract span context from headers for span linking. We already have // a span context from the first message, so now we'll add another. if let Some(ref headers) = headers { let otel_ctx = propagation::extract_opentelemetry_context(headers); let span_ctx = otel_ctx.span().span_context().clone(); span_contexts.push(span_ctx); } let requests = mem::take(requests); let stream_sequence_numbers = mem::take(stream_sequence_numbers); let span_contexts = mem::take(span_contexts); // We've read all message in the read window if requests.len() == *read_window { span.record("requests.count", requests.len()); // Set next state and continue loop *this.state = State::CompressRequests { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers, span_contexts, }; continue; } // There are remaining messages in the read window else { // Set next state and continue loop *this.state = State::ReadNextMessageInWindow { subject: subject.take(), read_window: *read_window, requests, stream_sequence_numbers, span_contexts, }; continue; } } // Failed to parse API request from message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // Set next state to compress remaining requests trace!( si.error.message = ?err, concat!( "failed to parse api request from next message; ", "skipping message & compressing remaining", ), ); // Add current message to list of messages to delete stream_sequence_numbers.push_back(*message_stream_sequence); let requests = mem::take(requests); let stream_sequence_numbers = mem::take(stream_sequence_numbers); let span_contexts = mem::take(span_contexts); span.record("requests.count", requests.len()); // Set next state and return error *this.state = State::CompressRequests { subject: subject.take(), compress_messages_fut: Box::pin(async move { C::compress_from_requests(requests).await }), stream_sequence_numbers, span_contexts, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::ParseNextRequestInWindow(err)))); } // Pending on parse message, so we are pending too Poll::Pending => return Poll::Pending, } } // 6. Compressing multiple API requests into a single compressed request State::CompressRequests { subject, compress_messages_fut, stream_sequence_numbers, span_contexts, } => { // Add span links to track the lineage from the original incoming requests to // the compressed request. Links must be added before any other work is done // within the span. for span_context in span_contexts.iter() { if span_context.is_valid() { span.add_link(span_context.clone()); } } // Compress multiple API requests into a single compressed request let compress_start = Instant::now(); match compress_messages_fut.poll_unpin(cx) { // Requests compressed successfully Poll::Ready(Ok(compressed_request)) => { let compress_duration = compress_start.elapsed(); // Metrics: Track compression operation histogram!( compressing_stream_compress_latency_ms = compress_duration.as_millis() as f64 ); monotonic!( compressing_stream_compress_operations = 1, result = "success" ); update_heartbeat(this.last_compressing_heartbeat_tx); span.record("compressed.kind", compressed_request.as_ref()); // Pop the first sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessage { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(Some(compressed_request)), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: 0, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", 0); // Set next state and continue loop *this.state = State::YieldItem { subject: subject.take(), compressed_request: Some(compressed_request), }; continue; } } } // Error while compressing requests Poll::Ready(Err(err)) => { // Metrics: Track compression failures monotonic!( compressing_stream_compress_operations = 1, result = "error" ); update_heartbeat(this.last_compressing_heartbeat_tx); // Nothing much we can do at this point, if we can't compress then we // throw all the API requests away and delete the associated messages trace!( si.error.message = ?err, concat!( "failed to compress requests; ", "skipping requests & deleting messages", ), ); // Pop the first sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off the delete list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessageAfterCompressError { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: 0, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::CompressedRequest(err)))); } // The delete list is empty None => { span.record("messages.deleted.count", 0); // Nothing to compress and nothing to delete, so // re-start state machine // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err(Error::CompressedRequest(err)))); } } } // Pending on compressing messages, so we are pending too Poll::Pending => return Poll::Pending, } } // 7. Deleting a message from the Jetstream stream State::DeleteStreamMessage { subject, delete_message_fut, compressed_request, stream_sequence_numbers, deleted_count, } => { // Delete a message let delete_start = Instant::now(); match delete_message_fut.poll_unpin(cx) { // Message was deleted successfully Poll::Ready(Ok(_)) => { let delete_duration = delete_start.elapsed(); // Metrics: Track message deletion histogram!( compressing_stream_message_delete_latency_ms = delete_duration.as_millis() as f64 ); monotonic!(compressing_stream_messages_deleted = 1, result = "success"); update_heartbeat(this.last_compressing_heartbeat_tx); *deleted_count += 1; let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessage { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(compressed_request), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Do we have a compressed request? match compressed_request { // Compressed request found Some(compressed_request) => { // Set next state and continue loop *this.state = State::YieldItem { subject: subject.take(), compressed_request: Some(compressed_request), }; continue; } // No compressed request None => { // Nothing to yield so re-start state machine // Set next state and continue loop *this.state = State::ReadFirstMessage; drop(_guard); *this.span = None; continue; } } } } } // Error when deleting a message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // This message failed to delete, so return the error but set state to // continue the process of deleting the remaining messages trace!( si.error.message = ?err, concat!( "failed to delete message from stream; ", "skipping message & deleting remaining", ), ); let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessage { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(compressed_request), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err(Error::DeleteStreamMessage(err)))); } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Do we have a compressed request? match compressed_request { // Compressed request found Some(compressed_request) => { // Set next state and return error *this.state = State::YieldItem { subject: subject.take(), compressed_request: Some(compressed_request), }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessage(err), ))); } // No compressed request None => { // Nothing to yield so re-start state machine // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err( Error::DeleteStreamMessage(err), ))); } } } } } // Pending on deleting message, so we are pending too Poll::Pending => return Poll::Pending, } } // 8. Converting request into final message to yield from [`Stream`] State::YieldItem { subject, compressed_request, } => { update_heartbeat(this.last_compressing_heartbeat_tx); // Metrics: Track total span duration from start to yield if let Some(span_start) = this.span_started_at.take() { let total_duration = span_start.elapsed(); histogram!( compressing_stream_total_span_duration_ms = total_duration.as_millis() as f64 ); } monotonic!(compressing_stream_items_yielded = 1); let subject = subject .take() .expect("extracting owned value only happens once"); let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); let payload = match serde_json::to_vec(&compressed_request) { // Compressed requests serialized to bytes successfully Ok(vec) => vec.into(), // Failed to serialize Err(err) => { // It's too bad this error (which it shouldn't) because all we can do // is throw away the compressed request and re-start the state machine trace!( si.error.message = ?err, messaging.destination.name = subject.as_str(), "failed to serialize compressed request to local message", ); // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err(Error::SerializeLocalMessage(err)))); } }; // We need to inject headers with our span and all of its links that we've // built. Without this, the trace would end here. let mut headers = HeaderMap::new(); propagation::inject_headers(&mut headers); let local_message = LocalMessage { subject, headers: Some(headers), payload, }; // Set next state and return item *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); span.record_ok(); drop(_guard); *this.span = None; return Poll::Ready(Some(Ok(local_message))); } // 3.1 Deleting the initial message after error State::DeleteFirstMessageAfterError { subject, delete_message_fut, } => { // Delete the message match delete_message_fut.poll_unpin(cx) { // Message was deleted successfully Poll::Ready(Ok(_)) => { update_heartbeat(this.last_compressing_heartbeat_tx); span.record("messages.deleted.count", 1); } // Error when deleting message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); let subject = subject .take() .expect("extracting owned value only happens once"); error!( si.error.message = ?err, messaging.destination.name = subject.as_str(), "failed to delete message from stream", ); } // Pending on deleting message, so we are pending too Poll::Pending => return Poll::Pending, }; // Set next state and continue loop *this.state = State::ReadFirstMessage; drop(_guard); *this.span = None; continue; } // 4.1 Compressing remaining requests before closing [`Stream`] State::CompressRequestsAndClose { subject, compress_messages_fut, stream_sequence_numbers, span_contexts, } => { // Add span links to track the lineage from the original incoming requests to // the compressed request. Links must be added before any other work is done // within the span. for span_context in span_contexts.iter() { if span_context.is_valid() { span.add_link(span_context.clone()); } } // Compress multiple API requests into a single compressed request match compress_messages_fut.poll_unpin(cx) { // Requests compressed successfully Poll::Ready(Ok(compressed_request)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // Pop the first sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessageAndClose { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(Some(compressed_request)), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: 0, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", 0); // Set next state and continue loop *this.state = State::YieldItemAndClose { subject: subject.take(), compressed_request: Some(compressed_request), }; continue; } } } // Error while compressing requests Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // Nothing much we can do at this point, if we can't compress then we // throw all the API requests away and delete the associated messages trace!( si.error.message = ?err, concat!( "failed to compress requests; ", "skipping requests, deleting messages & closing", ), ); // Pop the first sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off the delete list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessageAfterCompressErrorAndClose { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take( stream_sequence_numbers, ), deleted_count: 0, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::CompressedRequestBeforeClose(err), ))); } // The delete list is empty None => { span.record("messages.deleted.count", 0); // Nothing to compress and nothing to delete, so // move to close stream // Set next state and return error *this.state = State::CloseStream; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::CompressedRequestBeforeClose(err), ))); } } } // Pending on compressing messages, so we are pending too Poll::Pending => return Poll::Pending, } } // 4.2 Deleting messages from the Jetstream stream before closing [`Stream`] State::DeleteStreamMessageAndClose { subject, delete_message_fut, compressed_request, stream_sequence_numbers, deleted_count, } => { // Delete a message match delete_message_fut.poll_unpin(cx) { // Message was deleted successfully Poll::Ready(Ok(_)) => { update_heartbeat(this.last_compressing_heartbeat_tx); *deleted_count += 1; let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessageAndClose { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(compressed_request), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Do we have a compressed request? match compressed_request { // Compressed request found Some(compressed_request) => { // Set next state and continue loop *this.state = State::YieldItemAndClose { subject: subject.take(), compressed_request: Some(compressed_request), }; continue; } // No compressed request None => { // Nothing to yield so move to close stream // Set next state and continue loop *this.state = State::CloseStream; continue; } } } } } // Error when deleting a message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // This message failed to delete, so return the error but set state to // continue the process of deleting the remaining messages trace!( si.error.message = ?err, concat!( "failed to delete message from stream; ", "skipping message, deleting remaining & closing", ), ); let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessage { subject: subject.take(), delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), compressed_request: Some(compressed_request), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageBeforeClose(err), ))); } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Do we have a compressed request? match compressed_request { // Compressed request found Some(compressed_request) => { // Set next state and return error *this.state = State::YieldItemAndClose { subject: subject.take(), compressed_request: Some(compressed_request), }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageBeforeClose(err), ))); } // No compressed request None => { // Nothing to yield so re-start state machine // Set next state and return error *this.state = State::CloseStream; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageBeforeClose(err), ))); } } } } } // Pending on deleting message, so we are pending too Poll::Pending => return Poll::Pending, } } // 4.3. Converting request into final message to yield from [`Stream`] before // closing [`Stream`] State::YieldItemAndClose { subject, compressed_request, } => { update_heartbeat(this.last_compressing_heartbeat_tx); let subject = subject .take() .expect("extracting owned value only happens once"); let compressed_request = compressed_request .take() .expect("extracting owned value only happens once"); let payload = match serde_json::to_vec(&compressed_request) { // Compressed requests serialized to bytes successfully Ok(vec) => vec.into(), // Failed to serialize Err(err) => { // It's too bad this error (which it shouldn't) because all we can do // is throw away the compressed request and close the stream trace!( si.error.message = ?err, messaging.destination.name = subject.as_str(), concat!( "failed to serialize compressed request to local message; ", "closing", ), ); // Set next state and return error *this.state = State::CloseStream; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::SerializeLocalMessageBeforeClose(err), ))); } }; // We need to inject headers with our span and all of its links that we've // built. Without this, the trace would end here. let mut headers = HeaderMap::new(); propagation::inject_headers(&mut headers); let local_message = LocalMessage { subject, headers: Some(headers), payload, }; // Set next state and return item *this.state = State::CloseStream; span.record("task.state", this.state.as_ref()); span.record_ok(); return Poll::Ready(Some(Ok(local_message))); } // 4.1.1 Closing the stream State::CloseStream => { span.record("task.state", this.state.as_ref()); // Don't record span either way as it may have already been marked ok/err return Poll::Ready(None); } // 6.1 Deleting messages from the Jetstream stream after failing to compress // requests State::DeleteStreamMessageAfterCompressError { delete_message_fut, stream_sequence_numbers, deleted_count, } => { // Delete a message match delete_message_fut.poll_unpin(cx) { // Message was deleted successfully Poll::Ready(Ok(_)) => { update_heartbeat(this.last_compressing_heartbeat_tx); *deleted_count += 1; // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessageAfterCompressError { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Nothing to yield so re-start state machine // Set next state and continue loop *this.state = State::ReadFirstMessage; drop(_guard); *this.span = None; continue; } } } // Error when deleting a message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // This message failed to delete, so return the error but set state to // continue the process of deleting the remaining messages trace!( si.error.message = ?err, concat!( "failed to delete message from stream; ", "skipping message & deleting remaining", ), ); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessageAfterCompressError { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take(stream_sequence_numbers), deleted_count: *deleted_count, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageAfterCompressError(err), ))); } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Nothing to yield so re-start state machine // Set next state and return error *this.state = State::ReadFirstMessage; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); drop(_guard); *this.span = None; return Poll::Ready(Some(Err( Error::DeleteStreamMessageAfterCompressError(err), ))); } } } // Pending on deleting message, so we are pending too Poll::Pending => return Poll::Pending, } } // 4.1.2 Deleting messages from the Jetstream stream after failing to compress // requests before closing [`Stream`] State::DeleteStreamMessageAfterCompressErrorAndClose { delete_message_fut, stream_sequence_numbers, deleted_count, } => { // Delete a message match delete_message_fut.poll_unpin(cx) { // Message was deleted successfully Poll::Ready(Ok(_)) => { update_heartbeat(this.last_compressing_heartbeat_tx); *deleted_count += 1; // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and continue loop *this.state = State::DeleteStreamMessageAfterCompressErrorAndClose { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take( stream_sequence_numbers, ), deleted_count: *deleted_count, }; continue; } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Nothing to yield so re-start state machine // Set next state and continue loop *this.state = State::CloseStream; continue; } } } // Error when deleting a message Poll::Ready(Err(err)) => { update_heartbeat(this.last_compressing_heartbeat_tx); // This message failed to delete, so return the error but set state to // continue the process of deleting the remaining messages trace!( si.error.message = ?err, concat!( "failed to delete message from stream; ", "skipping message, deleting remaining & closing", ), ); // Pop the next sequence number off the delete list match stream_sequence_numbers.pop_front() { // A message was popped off list Some(message_stream_sequence) => { let stream = this.stream.clone(); // Set next state and return error *this.state = State::DeleteStreamMessageAfterCompressErrorAndClose { delete_message_fut: Box::pin(async move { stream .delete_message(message_stream_sequence) .await .map(|_| ()) }), stream_sequence_numbers: mem::take( stream_sequence_numbers, ), deleted_count: *deleted_count, }; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageAfterCompressErrorBeforeClose( err, ), ))); } // The delete list is empty None => { span.record("messages.deleted.count", deleted_count); // Nothing to yield so close stream // Set next state and return error *this.state = State::CloseStream; span.record("task.state", this.state.as_ref()); let err = span.record_err(err); return Poll::Ready(Some(Err( Error::DeleteStreamMessageAfterCompressErrorBeforeClose( err, ), ))); } } } // Pending on deleting message, so we are pending too Poll::Pending => return Poll::Pending, } } } } } } #[inline] fn update_heartbeat(heartbeat_tx: &mut Option<watch::Sender<Instant>>) { if let Some(heartbeat_tx) = heartbeat_tx { // Update the "liveness" of the stream to prevent a quiescent period if there is // still work to do heartbeat_tx.send_replace(Instant::now()); } } #[inline] fn parse_message<R>( message: jetstream::Message, ) -> result::Result<(R, Option<HeaderMap>), NegotiateError> where R: Negotiate + Send + 'static, { let (head, payload) = message.into_head_and_payload(); let content_info = ContentInfo::try_from(head.headers.as_ref()).map_err(ContentInfoError::from_err)?; let request = R::negotiate(&content_info, &payload)?; Ok((request, head.headers)) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server