Skip to main content
Glama

Convex MCP server

Official
by get-convex
test_persistence.rs14.5 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, iter, ops::Bound, sync::Arc, }; use async_trait::async_trait; use cmd_util::env::config_test; use futures::{ stream, StreamExt, }; use itertools::Itertools; use parking_lot::Mutex; use serde_json::Value as JsonValue; use value::{ InternalDocumentId, TabletId, }; #[cfg(test)] use super::persistence_test_suite; use crate::{ document::ResolvedDocument, index::{ IndexEntry, IndexKeyBytes, }, interval::{ End, Interval, StartIncluded, }, persistence::{ ConflictStrategy, DocumentLogEntry, DocumentPrevTsQuery, DocumentStream, IndexStream, LatestDocument, Persistence, PersistenceGlobalKey, PersistenceIndexEntry, PersistenceReader, RetentionValidator, TimestampRange, }, query::Order, run_persistence_test_suite, types::{ IndexId, PersistenceVersion, Timestamp, }, }; #[derive(Clone)] pub struct TestPersistence { inner: Arc<Mutex<Inner>>, } impl TestPersistence { pub fn new() -> Self { config_test(); let inner = Inner { is_fresh: true, is_read_only: false, log: BTreeMap::new(), index: BTreeMap::new(), persistence_globals: BTreeMap::new(), }; Self::new_inner(Arc::new(Mutex::new(inner)), false).unwrap() } /// Pass in an Inner to store state across TestPersistence instances. fn new_inner(inner: Arc<Mutex<Inner>>, allow_read_only: bool) -> anyhow::Result<Self> { anyhow::ensure!(allow_read_only || !inner.lock().is_read_only); Ok(Self { inner }) } } #[async_trait] impl Persistence for TestPersistence { fn is_fresh(&self) -> bool { self.inner.lock().is_fresh } fn reader(&self) -> Arc<dyn PersistenceReader> { Arc::new(self.clone()) } async fn write<'a>( &self, documents: &'a [DocumentLogEntry], indexes: &'a [PersistenceIndexEntry], conflict_strategy: ConflictStrategy, ) -> anyhow::Result<()> { let mut inner = self.inner.lock(); for update in documents { anyhow::ensure!( conflict_strategy == ConflictStrategy::Overwrite || !inner.log.contains_key(&(update.ts, update.id)), "Unique constraint not satisfied. Failed to write document at ts {} with id {}: \ (document, ts) pair already exists", update.ts, update.id ); inner.log.insert( (update.ts, update.id), (update.value.clone(), update.prev_ts), ); } inner.is_fresh = false; for update in indexes { let index_key_bytes = update.key.clone(); anyhow::ensure!( conflict_strategy == ConflictStrategy::Overwrite || !inner .index .get(&update.index_id) .map(|idx| idx.contains_key(&(index_key_bytes.clone(), update.ts))) .unwrap_or(false), "Unique constraint not satisfied. Failed to write to index {} at ts {} with key \ {:?}: (key, ts) pair already exists", update.index_id, update.ts, index_key_bytes ); inner .index .entry(update.index_id) .or_default() .insert((index_key_bytes, update.ts), update.value); } Ok(()) } async fn set_read_only(&self, read_only: bool) -> anyhow::Result<()> { self.inner.lock().is_read_only = read_only; Ok(()) } async fn write_persistence_global( &self, key: PersistenceGlobalKey, value: JsonValue, ) -> anyhow::Result<()> { self.inner.lock().persistence_globals.insert(key, value); Ok(()) } async fn load_index_chunk( &self, cursor: Option<IndexEntry>, chunk_size: usize, ) -> anyhow::Result<Vec<IndexEntry>> { let mut inner = self.inner.lock(); let index = &mut inner.index; let index_entries = index .iter() .flat_map(|(index_id, tree)| { tree.iter().map(|((key, ts), v)| IndexEntry { index_id: *index_id, deleted: v.is_none(), key_prefix: key.0.clone(), key_suffix: None, key_sha256: key.0.clone(), ts: *ts, }) }) .filter(|index_entry| match cursor { None => true, Some(ref cursor) => index_entry > cursor, }) .take(chunk_size) .collect(); Ok(index_entries) } async fn delete_index_entries(&self, expired_rows: Vec<IndexEntry>) -> anyhow::Result<usize> { let mut inner = self.inner.lock(); let index = &mut inner.index; let mut total_deleted = 0; for expired_row in expired_rows { if index .get_mut(&expired_row.index_id) .unwrap() .remove(&(IndexKeyBytes(expired_row.key_prefix), expired_row.ts)) .is_some() { total_deleted += 1; } } Ok(total_deleted) } async fn delete( &self, documents: Vec<(Timestamp, InternalDocumentId)>, ) -> anyhow::Result<usize> { let mut inner = self.inner.lock(); let log = &mut inner.log; let mut total_deleted = 0; for expired_doc in documents { if log.remove(&expired_doc).is_some() { total_deleted += 1; } } Ok(total_deleted) } } #[async_trait] impl PersistenceReader for TestPersistence { fn load_documents( &self, range: TimestampRange, order: Order, _page_size: u32, _retention_validator: Arc<dyn RetentionValidator>, ) -> DocumentStream<'_> { let log = { self.inner.lock().log.clone() }; let iter = log .into_iter() .map(|((ts, id), (value, prev_ts))| DocumentLogEntry { ts, id, value, prev_ts, }) .filter(move |entry| range.contains(entry.ts)) // Mimic the sort in Postgres that is by internal id. .sorted_by_key(|entry| (entry.ts, entry.id.internal_id())) .map(Ok); match order { Order::Asc => stream::iter(iter).boxed(), Order::Desc => stream::iter(iter.rev()).boxed(), } } async fn previous_revisions( &self, ids: BTreeSet<(InternalDocumentId, Timestamp)>, retention_validator: Arc<dyn RetentionValidator>, ) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> { let min_ts = ids.iter().map(|(_, ts)| *ts).min(); let result = { let inner = self.inner.lock(); let result = ids .into_iter() .filter_map(|(id, ts)| { inner .log .iter() .filter(|((log_ts, log_id), _)| log_id == &id && log_ts < &ts) .max_by_key(|(log_ts, _)| *log_ts) .map(|((log_ts, _), (doc, prev_ts))| { ( (id, ts), DocumentLogEntry { id, ts: *log_ts, value: doc.clone(), prev_ts: *prev_ts, }, ) }) }) .collect(); result }; if let Some(min_ts) = min_ts { retention_validator .validate_document_snapshot(min_ts) .await?; } Ok(result) } async fn previous_revisions_of_documents( &self, ids: BTreeSet<DocumentPrevTsQuery>, retention_validator: Arc<dyn RetentionValidator>, ) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>> { let min_ts = ids.iter().map(|DocumentPrevTsQuery { ts, .. }| *ts).min(); let result = { let inner = self.inner.lock(); let result = ids .into_iter() .filter_map(|DocumentPrevTsQuery { id, ts, prev_ts }| { inner.log.get(&(prev_ts, id)).map(|(doc, prev_prev_ts)| { ( DocumentPrevTsQuery { id, ts, prev_ts }, DocumentLogEntry { id, ts: prev_ts, value: doc.clone(), prev_ts: *prev_prev_ts, }, ) }) }) .collect(); result }; if let Some(min_ts) = min_ts { retention_validator .validate_document_snapshot(min_ts) .await?; } Ok(result) } fn index_scan( &self, index_id: IndexId, _tablet_id: TabletId, read_timestamp: Timestamp, interval: &Interval, order: Order, _size_hint: usize, _retention_validator: Arc<dyn RetentionValidator>, ) -> IndexStream<'_> { let interval = interval.clone(); // Add timestamp. let lower = match interval.start { StartIncluded(v) => Bound::Included((v.into(), Timestamp::MIN)), }; let upper = match interval.end { End::Excluded(v) => Bound::Excluded((v.into(), Timestamp::MIN)), End::Unbounded => Bound::Unbounded, }; let lock = self.inner.lock(); let index = lock.index.get(&index_id); // BTreeMap is not happy if you give it an empty range. Copy how it detects // the range is empty and a void calling it. let index = match (&lower, &upper) { (Bound::Excluded(s), Bound::Excluded(e)) if s == e => None, (Bound::Included(s) | Bound::Excluded(s), Bound::Included(e) | Bound::Excluded(e)) if s > e => { None }, _ => index, }; let it: Box<dyn Iterator<Item = _> + Send> = match index { Some(index) => { let it = index.range((lower, upper)); match order { Order::Asc => Box::new(it), Order::Desc => Box::new(it.rev()), } }, None => Box::new(iter::empty()), }; let mut results: Vec<(IndexKeyBytes, Timestamp, InternalDocumentId)> = Vec::new(); let mut maybe_add_value = |entry: Option<(&(IndexKeyBytes, Timestamp), &Option<InternalDocumentId>)>| match entry { None | Some((_, None)) => {}, Some(((k, ts), Some(doc_id))) => { // Lookup the document by id and timestamp. results.push((k.clone(), *ts, *doc_id)); }, }; let mut previous: Option<(&(IndexKeyBytes, Timestamp), &Option<InternalDocumentId>)> = None; for current in it { if current.0 .1 > read_timestamp { // Outside of read snapshot. continue; } let different = match previous { Some(previous) => previous.0 .0 != current.0 .0, None => true, }; if different { match order { Order::Asc => maybe_add_value(previous), Order::Desc => maybe_add_value(Some(current)), }; } previous = Some(current); } // Yield the last value if applicable. match order { Order::Asc => maybe_add_value(previous), Order::Desc => {}, }; let results: Vec<anyhow::Result<(IndexKeyBytes, LatestDocument)>> = results .into_iter() .map(|(k, ts, doc_id)| -> anyhow::Result<_> { let (value, prev_ts) = lock.lookup(doc_id, ts)?; Ok((k, LatestDocument { ts, value, prev_ts })) }) .collect(); stream::iter(results).boxed() } async fn get_persistence_global( &self, key: PersistenceGlobalKey, ) -> anyhow::Result<Option<JsonValue>> { Ok(self.inner.lock().persistence_globals.get(&key).cloned()) } fn version(&self) -> PersistenceVersion { PersistenceVersion::default() } } struct Inner { is_fresh: bool, is_read_only: bool, log: BTreeMap<(Timestamp, InternalDocumentId), (Option<ResolvedDocument>, Option<Timestamp>)>, index: BTreeMap<IndexId, BTreeMap<(IndexKeyBytes, Timestamp), Option<InternalDocumentId>>>, persistence_globals: BTreeMap<PersistenceGlobalKey, JsonValue>, } impl Inner { // Lookup object by (id, timestamp). The document must exist. fn lookup( &self, doc_id: InternalDocumentId, ts: Timestamp, ) -> anyhow::Result<(ResolvedDocument, Option<Timestamp>)> { let (value, prev_ts) = self .log .get(&(ts, doc_id)) .cloned() .ok_or_else(|| anyhow::anyhow!("Dangling index reference"))?; Ok(( value.ok_or_else(|| anyhow::anyhow!("Index reference to deleted document"))?, prev_ts, )) } } run_persistence_test_suite!( db, Arc::new(Mutex::new(Inner { is_fresh: true, is_read_only: false, log: BTreeMap::new(), index: BTreeMap::new(), persistence_globals: BTreeMap::new(), })), TestPersistence::new_inner(db.clone(), false)?, TestPersistence::new_inner(db.clone(), true)? );

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server