Skip to main content
Glama

Convex MCP server

Official
by get-convex
system_query.rs25.7 kB
//! Provides a simplified, more-typed API for querying system metadata tables //! (as compared to [`common::query::Query`] / [`crate::query::ResolvedQuery`]). use std::{ mem, ops::{ Bound, RangeBounds, }, sync::Arc, }; use anyhow::Context as _; use common::{ document::{ ParseDocument as _, ParsedDocument, }, interval::{ BinaryKey, End, Interval, StartIncluded, }, knobs::DEFAULT_QUERY_PREFETCH, query::Order, runtime::Runtime, }; use indexing::backend_in_memory_indexes::{ LazyDocument, MemoryDocument, RangeRequest, }; use value::{ serde::ConvexSerializable, sorting::write_sort_key, walk::ConvexValueWalker, DeveloperDocumentId, TableNamespace, TabletId, }; use crate::{ system_tables::{ SystemIndex, SystemTable, SystemTableMetadata, }, Transaction, }; pub struct EqFields { prefix: Vec<u8>, fields: usize, } pub struct Unlimited; pub struct SystemQueryBuilder<'a, 'b, RT: Runtime, T: SystemTable, R> { tx: &'a mut Transaction<RT>, namespace: TableNamespace, index: &'b SystemIndex<T>, tablet_id: TabletId, index_range: R, order: Order, } pub struct SystemQuery<'a, 'b, RT: Runtime, T: SystemTable> { tx: &'a mut Transaction<RT>, index: &'b SystemIndex<T>, tablet_id: TabletId, index_range: Interval, order: Order, } impl<RT: Runtime> Transaction<RT> { /// Start building a query over `index`. /// The query will initially range over the entire index. To narrow the /// range, use the `.eq()` and `.range()` builder methods. pub fn query_system<'a, 'b, T: SystemTable>( &'a mut self, namespace: TableNamespace, index: &'b SystemIndex<T>, ) -> anyhow::Result<SystemQueryBuilder<'a, 'b, RT, T, EqFields>> where T::Metadata: ConvexSerializable, { // TODO: this should possibly return an empty vector? let tablet_id = self .table_mapping() .namespace(namespace) .id_if_exists(T::table_name()) .with_context(|| format!("Index {:?} not present in {namespace:?}", index.name()))?; Ok(SystemQueryBuilder { tx: self, namespace, index, tablet_id, index_range: EqFields { prefix: vec![], fields: 0, }, order: Order::Asc, }) } } impl<'a, 'b, RT: Runtime, T: SystemTable> SystemQueryBuilder<'a, 'b, RT, T, EqFields> { /// Specify equality constraints for the next fields in the index. /// This will return an error if the total number of `.eq()` fields exceeds /// the number of indexed fields. /// /// This is not type-safe with respect to the actual field types - providing /// an eq() constraint of the wrong type will find nothing. pub fn eq<F: ConvexValueWalker + Copy>(mut self, fields: &[F]) -> anyhow::Result<Self> { let indexed_fields_len = self.index.fields.len() + 1; // include _id self.index_range.push(fields, indexed_fields_len)?; Ok(self) } /// Apply a one- or two-sided inequality to the index range. /// /// The inequality is applied to the same number of index fields as the /// number of values provided. For example, if the next fields in the index /// are `["nextTs", "_creationTime", "_id"]`, then calling /// `query.range(..=[a, b])` ranges over `(nextTs, _creationTime) <= (a, /// b)` and ignores `_id`. pub fn range<F: ConvexValueWalker + Copy, A: AsRef<[F]>>( self, range: impl RangeBounds<A>, ) -> anyhow::Result<SystemQueryBuilder<'a, 'b, RT, T, Interval>> { let SystemQueryBuilder { tx, namespace, index, tablet_id, index_range, order, } = self; let indexed_fields_len = self.index.fields.len() + 1; // include _id let index_range = index_range.range( range.start_bound().map(AsRef::as_ref), range.end_bound().map(AsRef::as_ref), indexed_fields_len, )?; Ok(SystemQueryBuilder { tx, namespace, index, tablet_id, index_range, order, }) } /// Sets the iteration order of the query. The query is ascending by /// default. pub fn order(mut self, order: Order) -> Self { self.order = order; self } } impl<'a, 'b, RT: Runtime, T: SystemTable, R: Into<Interval>> SystemQueryBuilder<'a, 'b, RT, T, R> { /// Builds the query so that it can be iterated one page at a time with /// [`SystemQuery::next_page`]. pub fn build(self) -> SystemQuery<'a, 'b, RT, T> { let Self { tx, namespace: _, index, tablet_id, index_range, order, } = self; SystemQuery { tx, index, tablet_id, index_range: index_range.into(), order, } } /// Returns all the documents in the index range. pub async fn all(self) -> anyhow::Result<Vec<Arc<ParsedDocument<T::Metadata>>>> where T::Metadata: ConvexSerializable, { let mut q = self.build(); let mut docs = vec![]; loop { let (page, has_more) = q.next_page(*DEFAULT_QUERY_PREFETCH).await?; docs.extend(page); if !has_more { break; } } Ok(docs) } /// Returns the unique document, if any, in the range. Raises an error if /// more than one document is found. pub async fn unique(self) -> anyhow::Result<Option<Arc<ParsedDocument<T::Metadata>>>> where T::Metadata: ConvexSerializable, { let mut q = self.build(); let (mut docs, _has_more) = q.next_page(2).await?; anyhow::ensure!( docs.len() <= 1, "expected at most 1 document from index {:?}, got {}", q.index.name(), docs.len() ); Ok(docs.pop()) } } impl<RT: Runtime, T: SystemTable> SystemQuery<'_, '_, RT, T> { /// Returns the next (up to) `n` documents and a boolean indicating if more /// results are expected. Note that this could return `true` even if there /// aren't actually any more documents, if there's still an unexplored index /// range that happens to have no documents in it. pub async fn next_page( &mut self, n: usize, ) -> anyhow::Result<(Vec<Arc<ParsedDocument<T::Metadata>>>, bool)> where T::Metadata: ConvexSerializable, { if self.index_range.is_empty() { return Ok((vec![], false)); } let Ok(tablet_index_id) = self .index .name .clone() .map_table(&|_| Ok::<_, !>(self.tablet_id)); let request = RangeRequest { index_name: tablet_index_id, printable_index_name: self.index.name(), order: self.order, interval: mem::replace(&mut self.index_range, Interval::empty()), max_size: n, }; let [result] = self .tx .index .range_no_deps(&[&request]) .await .try_into() .map_err(|_| anyhow::anyhow!("returned wrong batch size"))?; let (page, cursor) = result?; let (page_range, remaining) = request.interval.split(cursor, self.order); self.index_range = remaining; self.tx.reads.record_indexed_directly( request.index_name, self.index.fields.clone(), page_range, )?; for (_, doc, _) in &page { // NOTE: since this is a system read, we don't bother tracking usage; // we only update `system_tx_size` for stats self.tx .reads .record_read_system_document(doc.approximate_size()); } Ok(( page.into_iter() .map(|(_index_key, doc, _ts)| { Ok(match doc { LazyDocument::Resolved(doc) => { Arc::new(SystemTableMetadata::parse_from_doc(doc)?) }, LazyDocument::Memory(doc) if !T::FOR_MIGRATION => { doc.force::<T::Metadata>()? }, LazyDocument::Memory(MemoryDocument { packed_document: doc, .. }) | LazyDocument::Packed(doc) => Arc::new(doc.parse()?), }) }) .collect::<anyhow::Result<Vec<_>>>()?, !self.index_range.is_empty(), )) } } impl<RT: Runtime> Transaction<RT> { /// Queries the document with the given `id` from table `T`. /// /// Note that if `id` actually belongs to a different table, this will /// return `Ok(None)` (and not raise any error). pub async fn get_system<T: SystemTable>( &mut self, namespace: TableNamespace, id: DeveloperDocumentId, ) -> anyhow::Result<Option<Arc<ParsedDocument<T::Metadata>>>> where T::Metadata: ConvexSerializable, { self.query_system(namespace, &SystemIndex::<T>::by_id())? .eq(&[id.encode_into(&mut Default::default())])? .unique() .await } } impl EqFields { fn push<F: ConvexValueWalker + Copy>( &mut self, fields: &[F], indexed_fields_len: usize, ) -> anyhow::Result<()> { for &value in fields { write_sort_key(value, &mut self.prefix).map_err(Into::into)?; self.fields += 1; } // Sanity checks against developer errors anyhow::ensure!( self.fields <= indexed_fields_len, "invalid system query: provided {} eq() fields but index only has {} fields", self.fields, indexed_fields_len ); Ok(()) } fn range<F: ConvexValueWalker + Copy>( self, lower_bound: Bound<&[F]>, upper_bound: Bound<&[F]>, indexed_fields_len: usize, ) -> anyhow::Result<Interval> { for bound in [lower_bound, lower_bound] { if let Bound::Excluded(b) | Bound::Included(b) = bound { anyhow::ensure!( self.fields + b.len() <= indexed_fields_len, "invalid system query: provided {} eq() fields and {} bound fields but index \ only has {} fields", self.fields, b.len(), indexed_fields_len ); } } let start = match lower_bound { bound @ (Bound::Included(fields) | Bound::Excluded(fields)) => { let mut start = self.prefix.clone(); for &value in fields { write_sort_key(value, &mut start).map_err(Into::into)?; } if let Bound::Excluded(_) = bound { let Some(key) = BinaryKey::from(start).increment() else { return Ok(Interval::empty()); }; key } else { BinaryKey::from(start) } }, Bound::Unbounded => BinaryKey::from(self.prefix.clone()), }; let end = match upper_bound { bound @ (Bound::Included(fields) | Bound::Excluded(fields)) => { let mut end = self.prefix; for &value in fields { write_sort_key(value, &mut end).map_err(Into::into)?; } if let Bound::Included(_) = bound { End::after_prefix(&end.into()) } else { End::Excluded(end.into()) } }, Bound::Unbounded => End::after_prefix(&self.prefix.into()), }; Ok(Interval { start: StartIncluded(start), end, }) } } impl From<EqFields> for Interval { #[inline] fn from(value: EqFields) -> Self { Interval::prefix(value.prefix.into()) } } #[cfg(test)] mod tests { use std::{ fmt::Debug, ops::Bound, sync::{ Arc, LazyLock, }, }; use common::{ document::ParsedDocument, interval::{ BinaryKey, End, Interval, IntervalSet, StartIncluded, }, testing::assert_contains, }; use proptest_derive::Arbitrary; use runtime::testing::TestRuntime; use serde::{ Deserialize, Serialize, }; use value::{ assert_obj, codegen_convex_serialization, values_to_bytes, ConvexValue, DeveloperDocumentId, ResolvedDocumentId, TableName, TableNamespace, }; use crate::{ system_tables::{ SystemIndex, SystemTable, }, test_helpers::DbFixtures, Database, TestFacingModel, Transaction, }; static TEST_TABLE_NAME: LazyLock<TableName> = LazyLock::new(|| "test".parse().unwrap()); static TEST_INDEX: LazyLock<SystemIndex<TestTable>> = LazyLock::new(|| { SystemIndex::new("by_a_b", [&"a".parse().unwrap(), &"b".parse().unwrap()]).unwrap() }); struct TestTable; impl SystemTable for TestTable { type Metadata = TestMetadata; fn table_name() -> &'static TableName { &TEST_TABLE_NAME } fn indexes() -> Vec<SystemIndex<Self>> { vec![TEST_INDEX.clone()] } } #[derive(Serialize, Deserialize, Clone, Arbitrary, Debug, PartialEq)] struct TestMetadata { #[serde(default, skip_serializing_if = "Option::is_none")] a: Option<i64>, #[serde(default, skip_serializing_if = "Option::is_none")] b: Option<i64>, } codegen_convex_serialization!(TestMetadata, TestMetadata); async fn setup_db(rt: &TestRuntime) -> anyhow::Result<Database<TestRuntime>> { let DbFixtures { db, tp, .. } = DbFixtures::new(rt).await?; db.create_backfilled_index_for_test( tp, TableNamespace::test_user(), TEST_INDEX.name(), TEST_INDEX.fields.clone(), ) .await?; Ok(db) } fn index_reads<T: SystemTable>( tx: &Transaction<TestRuntime>, test_index: &SystemIndex<T>, namespace: TableNamespace, ) -> IntervalSet { let name = test_index .name() .map_table( &tx.metadata .table_mapping() .namespace(namespace) .name_to_tablet(), ) .unwrap(); tx.reads.read_set().index_reads_for_test(&name) } fn i(start: BinaryKey, end: End) -> Interval { Interval { start: StartIncluded(start), end, } } fn intervals<const N: usize>(intervals: [Interval; N]) -> IntervalSet { let mut set = IntervalSet::new(); for interval in intervals { set.add(interval); } set } fn k<T: TryInto<ConvexValue, Error: Debug>, const N: usize>(values: [T; N]) -> BinaryKey { BinaryKey::from(values_to_bytes( &values .into_iter() .map(|v| Some(v.try_into().unwrap())) .collect::<Vec<_>>(), )) } #[convex_macro::test_runtime] async fn test_system_query(rt: TestRuntime) -> anyhow::Result<()> { let db = setup_db(&rt).await?; let namespace = TableNamespace::test_user(); let mut tx = db.begin_system().await?; let doc1 = TestFacingModel::new(&mut tx) .insert_and_get(TEST_TABLE_NAME.clone(), assert_obj! {}) .await?; let doc2 = TestFacingModel::new(&mut tx) .insert_and_get(TEST_TABLE_NAME.clone(), assert_obj! { "a" => 1i64, }) .await?; db.commit(tx).await?; let mut tx = db.begin_system().await?; let docs = tx.query_system(namespace, &TEST_INDEX)?.all().await?; assert_eq!(docs.len(), 2); assert_eq!(docs[0].id(), doc1.id()); assert_eq!(**docs[0], TestMetadata { a: None, b: None }); assert_eq!(docs[1].id(), doc2.id()); assert_eq!( **docs[1], TestMetadata { a: Some(1), b: None, } ); assert_eq!(index_reads(&tx, &TEST_INDEX, namespace), IntervalSet::All); // Step through a query one page at a time let creation_time_index = SystemIndex::<TestTable>::by_creation_time(); let mut query = tx.query_system(namespace, &creation_time_index)?.build(); let (page, has_more) = query.next_page(1).await?; assert_eq!(page.len(), 1); assert_eq!(page[0].id(), doc1.id()); assert!(has_more); // Check that the read set is updated incrementally for each document read assert_eq!( index_reads(query.tx, &creation_time_index, namespace), intervals([i( BinaryKey::min(), End::after_prefix(&k::<ConvexValue, 2>([ doc1.creation_time().into(), doc1.developer_id().encode().try_into().unwrap() ])) )]) ); let (page, has_more) = query.next_page(1).await?; assert_eq!(page.len(), 1); assert_eq!(page[0].id(), doc2.id()); assert!(has_more); assert_eq!( index_reads(query.tx, &creation_time_index, namespace), intervals([i( BinaryKey::min(), End::after_prefix(&k::<ConvexValue, 2>([ doc2.creation_time().into(), doc2.developer_id().encode().try_into().unwrap() ])) )]) ); let (page, has_more) = query.next_page(1).await?; assert_eq!(page, vec![]); assert!(!has_more); assert_eq!( index_reads(&tx, &creation_time_index, namespace), IntervalSet::All ); Ok(()) } #[convex_macro::test_runtime] async fn test_system_query_eq(rt: TestRuntime) -> anyhow::Result<()> { let db = setup_db(&rt).await?; let namespace = TableNamespace::test_user(); let mut tx = db.begin_system().await?; let doc = TestFacingModel::new(&mut tx) .insert_and_get( TEST_TABLE_NAME.clone(), assert_obj! { "a" => 1i64, "b" => 2i64 }, ) .await?; db.commit(tx).await?; let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .eq(&[1i64, 2])? .unique() .await?; assert_eq!(queried.as_ref().unwrap().id(), doc.id()); assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([Interval::prefix(k([1i64, 2i64]))]) ); assert_eq!( tx.query_system(namespace, &TEST_INDEX)? .eq(&[1i64])? .unique() .await?, queried ); assert_eq!( tx.query_system(namespace, &TEST_INDEX)? .eq(&[2i64])? .unique() .await?, None ); // Too many fields provided for the index assert_contains( &tx.query_system(namespace, &TEST_INDEX)? .eq(&[1i64; 5]) .err() .unwrap(), "invalid system query", ); Ok(()) } #[convex_macro::test_runtime] async fn test_range(rt: TestRuntime) -> anyhow::Result<()> { let db = setup_db(&rt).await?; let namespace = TableNamespace::test_user(); let mut tx = db.begin_system().await?; let doc1 = TestFacingModel::new(&mut tx) .insert_and_get( TEST_TABLE_NAME.clone(), assert_obj! { "a" => 1i64, "b" => 2i64 }, ) .await?; let doc2 = TestFacingModel::new(&mut tx) .insert_and_get( TEST_TABLE_NAME.clone(), assert_obj! { "a" => 1i64, "b" => 3i64 }, ) .await?; let doc3 = TestFacingModel::new(&mut tx) .insert_and_get( TEST_TABLE_NAME.clone(), assert_obj! { "a" => 2i64, "b" => 1i64 }, ) .await?; db.commit(tx).await?; fn ids(docs: Vec<Arc<ParsedDocument<TestMetadata>>>) -> Vec<ResolvedDocumentId> { docs.into_iter().map(|d| d.id()).collect() } // half-exclusive range on `a` let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .range([1i64]..[2i64])? .all() .await?; assert_eq!(ids(queried), [doc1.id(), doc2.id()]); assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([i(k([1i64]), End::Excluded(k([2i64])))]) ); drop(tx); // inclusive range on `a` let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .range([1i64]..=[2i64])? .all() .await?; assert_eq!(ids(queried), [doc1.id(), doc2.id(), doc3.id()]); assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([i(k([1i64]), End::after_prefix(&k([2i64])))]) ); drop(tx); // range bounds do not have to be the same length let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .range([1i64, 3].as_slice()..=[2i64].as_slice())? .all() .await?; assert_eq!(ids(queried), [doc2.id(), doc3.id()]); assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([i(k([1i64, 3i64]), End::after_prefix(&k([2i64])))]) ); drop(tx); // can use an exclusive start bound (with ugly syntax) let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .range((Bound::Excluded([1i64]), Bound::Unbounded))? .all() .await?; assert_eq!(ids(queried), [doc3.id()]); assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([i(k([1i64]).increment().unwrap(), End::Unbounded)]) ); drop(tx); // an empty range works and returns nothing let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .range([1i64]..[1i64])? .all() .await?; assert_eq!(queried, []); assert_eq!(index_reads(&tx, &TEST_INDEX, namespace), IntervalSet::new()); drop(tx); // can use .eq() then .range() let mut tx = db.begin_system().await?; let queried = tx .query_system(namespace, &TEST_INDEX)? .eq(&[1i64])? .range([2i64]..[3i64])? .all() .await?; assert_eq!(ids(queried), [doc1.id()]); // this is the same as [1, 2]..[1, 3] assert_eq!( index_reads(&tx, &TEST_INDEX, namespace), intervals([i(k([1i64, 2]), End::Excluded(k([1i64, 3])))]) ); drop(tx); // too many fields in range() returns an error let mut tx = db.begin_system().await?; assert_contains( &tx.query_system(namespace, &TEST_INDEX)? .eq(&[1i64])? .range([2i64; 4]..) .err() .unwrap(), "invalid system query", ); drop(tx); Ok(()) } #[convex_macro::test_runtime] async fn test_get_system(rt: TestRuntime) -> anyhow::Result<()> { let db = setup_db(&rt).await?; let namespace = TableNamespace::test_user(); let mut tx = db.begin_system().await?; let doc = TestFacingModel::new(&mut tx) .insert_and_get(TEST_TABLE_NAME.clone(), assert_obj! {}) .await?; db.commit(tx).await?; let mut tx = db.begin_system().await?; assert_eq!( tx.get_system::<TestTable>(namespace, doc.developer_id()) .await? .unwrap() .id(), doc.id() ); assert_eq!( tx.get_system::<TestTable>(namespace, DeveloperDocumentId::MIN) .await?, None ); let doc_key = k([doc.developer_id().encode()]); let missing_key = k([DeveloperDocumentId::MIN.encode()]); assert_eq!( index_reads(&tx, &SystemIndex::<TestTable>::by_id(), namespace), intervals([Interval::prefix(missing_key), Interval::prefix(doc_key),]) ); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server