Skip to main content
Glama

Convex MCP server

Official
by get-convex
index_range.rs13.8 kB
use std::{ cmp, collections::VecDeque, }; use async_trait::async_trait; use common::{ bootstrap_model::index::database_index::IndexedFields, components::ComponentId, document::DeveloperDocument, index::IndexKeyBytes, interval::Interval, knobs::{ DEFAULT_QUERY_PREFETCH, TRANSACTION_MAX_READ_SIZE_BYTES, TRANSACTION_MAX_READ_SIZE_ROWS, }, query::{ CursorPosition, Order, }, runtime::Runtime, types::{ IndexName, StableIndexName, TabletIndexName, WriteTimestamp, }, version::Version, }; use tokio::task; use value::TableNamespace; use super::{ query_scanned_too_many_documents_error, query_scanned_too_much_data, DeveloperIndexRangeResponse, QueryStream, QueryStreamNext, MAX_QUERY_FETCH, }; use crate::{ metrics, transaction::IndexRangeRequest, Transaction, UserFacingModel, }; /// A `QueryStream` that scans a range of an index. pub struct IndexRange { namespace: TableNamespace, stable_index_name: StableIndexName, /// For usage and error messages. If the table mapping has changed, this /// might get out of sync with `stable_index_name`, which is the index /// actually being walked. printable_index_name: IndexName, // There is a fixed Interval which is queried by this IndexRange, // but we don't need to store it because we have everything we need in // cursor_interval, page, and unfetched_interval. // interval: Interval, order: Order, indexed_fields: IndexedFields, /// The interval defined by the optional start and end cursors. /// The start cursor will move as we produce results, but this /// `cursor_interval` must always be a subset of `interval`. cursor_interval: CursorInterval, intermediate_cursors: Option<Vec<CursorPosition>>, page: VecDeque<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, /// The interval which we have yet to fetch. /// This starts as an intersection of the IndexRange's `interval` and /// `cursor_interval`, and gets smaller as results are fetched into `page`. /// When `unfetched_interval` and `page` are empty, the stream is done. /// Note that `cursor_interval.curr_exclusive` advances whenever `next()` /// yields a new result, while `unfetched_interval.start` (or `.end` if /// order is Desc) advances whenever we repopulate `page`, even if we /// haven't yielded the results yet. unfetched_interval: Interval, /// This is the interval queried trimmed to the cursor start and end. Before /// the query has consumed any results, this will be identical to /// `unfetched_interval`. This is used to track the intervals read in /// the read set as we consume query results. initial_unfetched_interval: Interval, page_count: usize, returned_results: usize, rows_read: usize, returned_bytes: usize, maximum_rows_read: Option<usize>, maximum_bytes_read: Option<usize>, soft_maximum_rows_read: usize, soft_maximum_bytes_read: usize, version: Option<Version>, } impl IndexRange { pub fn new( namespace: TableNamespace, stable_index_name: StableIndexName, printable_index_name: IndexName, interval: Interval, order: Order, indexed_fields: IndexedFields, cursor_interval: CursorInterval, maximum_rows_read: Option<usize>, maximum_bytes_read: Option<usize>, should_compute_split_cursor: bool, version: Option<Version>, ) -> Self { // unfetched_interval = intersection of interval with cursor_interval let unfetched_interval = match &cursor_interval.curr_exclusive { Some(cursor) => { let (_, after_curr_cursor_position) = interval.split(cursor.clone(), order); after_curr_cursor_position }, None => interval, }; let unfetched_interval = match &cursor_interval.end_inclusive { Some(cursor) => { let (up_to_end_cursor_position, _) = unfetched_interval.split(cursor.clone(), order); up_to_end_cursor_position }, None => unfetched_interval, }; Self { namespace, stable_index_name, printable_index_name, order, initial_unfetched_interval: unfetched_interval.clone(), cursor_interval, indexed_fields, intermediate_cursors: if should_compute_split_cursor { Some(Vec::new()) } else { None }, page: VecDeque::new(), unfetched_interval, page_count: 0, returned_results: 0, rows_read: 0, returned_bytes: 0, maximum_rows_read, maximum_bytes_read, soft_maximum_rows_read: soft_data_limit( maximum_rows_read .unwrap_or(*TRANSACTION_MAX_READ_SIZE_ROWS) .min(*TRANSACTION_MAX_READ_SIZE_ROWS), ), soft_maximum_bytes_read: soft_data_limit( maximum_bytes_read .unwrap_or(*TRANSACTION_MAX_READ_SIZE_BYTES) .min(*TRANSACTION_MAX_READ_SIZE_BYTES), ), version, } } fn start_next<RT: Runtime>( &mut self, tx: &mut Transaction<RT>, prefetch_hint: Option<usize>, ) -> anyhow::Result<QueryStreamNext> { // If we have an end cursor, for correctness we need to process // the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`. let enforce_limits = self.cursor_interval.end_inclusive.is_none(); if enforce_limits && let Some(maximum_bytes_read) = self.maximum_bytes_read && self.returned_bytes >= maximum_bytes_read { // Note: we do not need to record the index range in the read set here // since we will have recorded it in the previous `start_next` that read the // document that put us over the data budget. // If we're over our data budget, throw an error. // We do this after we've already exceeded the limit to ensure that // paginated queries always scan at least one item so they can // make progress. return Err(query_scanned_too_much_data(self.returned_bytes).into()); } let Some(tablet_index_name) = self.tablet_index_name().cloned() else { // This must be a missing index, self.cursor_interval.curr_exclusive = Some( self.cursor_interval .end_inclusive .clone() .unwrap_or(CursorPosition::End), ); return Ok(QueryStreamNext::Ready(None)); }; if let Some((index_position, v, timestamp)) = self.page.pop_front() { let index_bytes = index_position.len(); if let Some(intermediate_cursors) = &mut self.intermediate_cursors { intermediate_cursors.push(CursorPosition::After(index_position.clone())); } let cursor_position = CursorPosition::After(index_position); self.cursor_interval.curr_exclusive = Some(cursor_position.clone()); self.returned_results += 1; let (used_interval, _) = self .initial_unfetched_interval .split(cursor_position, self.order); tx.reads.record_indexed_directly( tablet_index_name, self.indexed_fields.clone(), used_interval, )?; UserFacingModel::new(tx, self.namespace) .record_read_document(&v, self.printable_index_name.table())?; // Database bandwidth for index reads let component_path = tx.must_component_path(ComponentId::from(self.namespace))?; tx.usage_tracker.track_database_egress_size( component_path, self.printable_index_name.table().to_string(), index_bytes as u64, self.printable_index_name.is_system_owned(), ); self.returned_bytes += v.size(); return Ok(QueryStreamNext::Ready(Some((v, timestamp)))); } if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive { tx.reads.record_indexed_directly( tablet_index_name, self.indexed_fields.clone(), self.initial_unfetched_interval.clone(), )?; return Ok(QueryStreamNext::Ready(None)); } if self.unfetched_interval.is_empty() { tx.reads.record_indexed_directly( tablet_index_name, self.indexed_fields.clone(), self.initial_unfetched_interval.clone(), )?; // We're out of results. If we have an end cursor then we must // have reached it. Otherwise we're at the end of the entire // query. self.cursor_interval.curr_exclusive = Some( self.cursor_interval .end_inclusive .clone() .unwrap_or(CursorPosition::End), ); return Ok(QueryStreamNext::Ready(None)); } let mut max_rows = prefetch_hint .unwrap_or(*DEFAULT_QUERY_PREFETCH) .clamp(1, MAX_QUERY_FETCH); if enforce_limits && let Some(maximum_rows_read) = self.maximum_rows_read { if self.rows_read >= maximum_rows_read { return Err(query_scanned_too_many_documents_error(self.rows_read).into()); } max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read); } Ok(QueryStreamNext::WaitingOn(IndexRangeRequest { stable_index_name: self.stable_index_name.clone(), interval: self.unfetched_interval.clone(), order: self.order, max_rows, version: self.version.clone(), })) } fn process_fetch( &mut self, page: Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, fetch_cursor: CursorPosition, ) -> anyhow::Result<()> { let (_, new_unfetched_interval) = self.unfetched_interval.split(fetch_cursor, self.order); anyhow::ensure!(self.unfetched_interval != new_unfetched_interval); self.unfetched_interval = new_unfetched_interval; self.page_count += 1; self.rows_read += page.len(); self.page.extend(page); Ok(()) } } pub const fn soft_data_limit(hard_limit: usize) -> usize { hard_limit * 3 / 4 } #[async_trait] impl QueryStream for IndexRange { fn cursor_position(&self) -> &Option<CursorPosition> { &self.cursor_interval.curr_exclusive } fn split_cursor_position(&self) -> Option<&CursorPosition> { let intermediate_cursors = self.intermediate_cursors.as_ref()?; if intermediate_cursors.len() <= 2 { None } else { intermediate_cursors.get(intermediate_cursors.len() / 2) } } fn is_approaching_data_limit(&self) -> bool { self.rows_read > self.soft_maximum_rows_read || self.returned_bytes > self.soft_maximum_bytes_read } async fn next<RT: Runtime>( &mut self, tx: &mut Transaction<RT>, prefetch_hint: Option<usize>, ) -> anyhow::Result<QueryStreamNext> { task::consume_budget().await; self.start_next(tx, prefetch_hint) } fn feed(&mut self, index_range_response: DeveloperIndexRangeResponse) -> anyhow::Result<()> { self.process_fetch(index_range_response.page, index_range_response.cursor) } fn tablet_index_name(&self) -> Option<&TabletIndexName> { self.stable_index_name.tablet_index_name() } fn printable_index_name(&self) -> &IndexName { &self.printable_index_name } } impl Drop for IndexRange { fn drop(&mut self) { metrics::log_index_range( self.returned_results, // If there are many results in the page when the query is over, // it means we fetched too much in a single page and may be able to // decrease prefetch hints. self.page.len(), // If we fetched too many pages, it means we weren't prefetching enough. self.page_count, ) } } /// An interval between two optional cursors. pub struct CursorInterval { pub curr_exclusive: Option<CursorPosition>, pub end_inclusive: Option<CursorPosition>, } impl CursorInterval { pub fn contains(&self, index_key: &IndexKeyBytes) -> bool { if let Some(start_exclusive) = &self.curr_exclusive { match start_exclusive { CursorPosition::After(start_key) => { // If we're before the start cursor, return false. if *index_key <= *start_key { return false; } }, // If the start cursor is at the end, nothing is in range. CursorPosition::End => return false, } } if let Some(CursorPosition::After(end_key)) = &self.end_inclusive { // If we're after the end cursor, also return false. if *index_key > *end_key { return false; } } // If we didn't violate a constraint, we're in range. true } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server