Skip to main content
Glama
kv_history.rs4.37 kB
// Internal impl of a `Watch` type vendored from the `async-nats` crate. // // See: https://github.com/nats-io/nats.rs/blob/7d63f1dd725c86a4f01723ea3194f17e30a0561b/async-nats/src/jetstream/kv/mod.rs#L1263-L1323 use std::{ result, str::FromStr as _, task::Poll, }; use futures::StreamExt as _; use si_data_nats::async_nats::{ self, jetstream::{ consumer::push::{ Ordered, OrderedError, }, kv::{ Entry, Operation, ParseOperationError, WatcherErrorKind, }, }, }; use thiserror::Error; const KV_OPERATION: &str = "KV-Operation"; /// A structure representing the history of a key-value bucket, yielding past values. pub struct History { pub subscription: Ordered, pub done: bool, pub prefix: String, pub bucket: String, } #[derive(Debug, Error)] pub enum WatcherError { #[error("{0}")] Default(WatcherErrorKind, String), #[error("{0}")] Ordered(#[from] OrderedError), } impl futures::Stream for History { type Item = result::Result<Entry, WatcherError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { if self.done { return Poll::Ready(None); } match self.subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { None => Poll::Ready(None), Some(message) => { let message = message?; let info = message.info().map_err(|err| { WatcherError::Default( WatcherErrorKind::Other, format!("failed to parse message metadata: {err}"), ) })?; if info.pending == 0 { self.done = true; } let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put); let key = message .subject .strip_prefix(&self.prefix) .map(|s| s.to_string()) .unwrap(); Poll::Ready(Some(Ok(Entry { bucket: self.bucket.clone(), key, value: message.payload.clone(), revision: info.stream_sequence, created: info.published, delta: info.pending, operation, seen_current: self.done, }))) } }, std::task::Poll::Pending => Poll::Pending, } } fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } } pub struct Keys { pub inner: History, } impl futures::Stream for Keys { type Item = Result<String, WatcherError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { loop { match self.inner.poll_next_unpin(cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(res)) => match res { Ok(entry) => { // Skip purged and deleted keys if matches!(entry.operation, Operation::Purge | Operation::Delete) { // Try to poll again if we skip this one continue; } else { return Poll::Ready(Some(Ok(entry.key))); } } Err(e) => return Poll::Ready(Some(Err(e))), }, Poll::Pending => return Poll::Pending, } } } } fn kv_operation_from_message( message: &async_nats::message::Message, ) -> Result<Operation, ParseOperationError> { let headers = match message.headers.as_ref() { Some(headers) => headers, None => return Ok(Operation::Put), }; if let Some(op) = headers.get(KV_OPERATION) { Operation::from_str(op.as_str()) } else { Ok(Operation::Put) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server