Skip to main content
Glama

Convex MCP server

Official
by get-convex
in_memory_indexes.rs24 kB
use std::{ cmp, collections::BTreeMap, sync::Arc, }; use anyhow::Context; use async_lru::{ async_lru::SizedValue, multi_type_async_lru::{ LruKey, MultiTypeAsyncLru, }, }; use async_trait::async_trait; use common::{ document::{ CreationTime, PackedDocument, ParseDocument, ResolvedDocument, }, index::IndexKeyBytes, interval::Interval, knobs::{ FUNRUN_INDEX_CACHE_CONCURRENCY, FUNRUN_INDEX_CACHE_SIZE, }, persistence::{ PersistenceReader, PersistenceSnapshot, RepeatablePersistence, RetentionValidator, }, query::Order, runtime::Runtime, types::{ IndexId, RepeatableTimestamp, }, virtual_system_mapping::VirtualSystemMapping, }; use database::{ BootstrapMetadata, ComponentRegistry, DatabaseSnapshot, SchemaRegistry, TableCountSnapshot, TableRegistry, Transaction, TransactionIdGenerator, TransactionIndex, TransactionReadSet, TransactionTextSnapshot, COMPONENTS_TABLE, SCHEMAS_TABLE, }; use futures::{ FutureExt, TryStreamExt, }; use indexing::{ backend_in_memory_indexes::{ DatabaseIndexSnapshot, InMemoryIndexes, MemoryDocument, SystemDocument, }, index_registry::IndexRegistry, }; use keybroker::Identity; use model::virtual_system_mapping; use sync_types::Timestamp; use usage_tracking::FunctionUsageTracker; use value::{ heap_size::HeapSize, InternalId, TableName, TableNamespace, TabletId, }; use super::metrics::{ load_index_timer, log_funrun_index_cache_get, log_funrun_index_load_rows, }; use crate::{ metrics::begin_tx_timer, FunctionWrites, }; fn make_transaction<RT: Runtime>( ts: RepeatableTimestamp, identity: Identity, rt: RT, table_registry: TableRegistry, schema_registry: SchemaRegistry, component_registry: ComponentRegistry, index_registry: IndexRegistry, table_count_snapshot: Arc<dyn TableCountSnapshot>, database_index_snapshot: DatabaseIndexSnapshot, text_index_snapshot: Arc<dyn TransactionTextSnapshot>, retention_validator: Arc<dyn RetentionValidator>, virtual_system_mapping: VirtualSystemMapping, usage_tracker: FunctionUsageTracker, ) -> anyhow::Result<Transaction<RT>> { let id_generator = TransactionIdGenerator::new(&rt)?; // The transaction timestamp might be few minutes behind if the backend // has been idle. Make sure creation time is always recent. let creation_time = CreationTime::try_from(cmp::max(*ts, rt.generate_timestamp()?))?; let transaction_index = TransactionIndex::new(index_registry, database_index_snapshot, text_index_snapshot); Ok(Transaction::new( identity, id_generator, creation_time, transaction_index, table_registry, schema_registry, component_registry, table_count_snapshot, rt, usage_tracker, retention_validator, virtual_system_mapping, )) } #[derive(Hash, PartialEq, Eq, Clone, Debug)] struct IndexCacheKey { instance_name: String, index_id: InternalId, last_modified: Timestamp, } impl LruKey for IndexCacheKey { type Value = IndexCacheValue; } /// The cache value is the same as [DatabaseIndexMap] apart from keeping track /// of last modified timestamps. The [BTreeMap] keys are the index keys. struct IndexCacheValue { map: BTreeMap<Vec<u8>, (Timestamp, MemoryDocument)>, size: usize, } impl SizedValue for IndexCacheValue { fn size(&self) -> u64 { self.size as u64 } } /// Caches in-memory indexes for many instances. /// This object is cheaply cloneable and clones reference the same underlying /// cache. #[derive(Clone)] pub(crate) struct InMemoryIndexCache<RT: Runtime> { cache: MultiTypeAsyncLru<RT>, rt: RT, } #[fastrace::trace] async fn load_index( instance_name: String, index_id: IndexId, persistence_snapshot: PersistenceSnapshot, tablet_id: TabletId, table_name: String, ) -> anyhow::Result<Arc<IndexCacheValue>> { let _timer = load_index_timer(&table_name, &instance_name); let mut size = 0; let index_map: BTreeMap<Vec<u8>, _> = persistence_snapshot .index_scan( index_id, tablet_id, &Interval::all(), Order::Asc, usize::MAX, ) .map_ok(|(key, rev)| { let doc = PackedDocument::pack(&rev.value); // This doesn't take into account the future size of the cached // SystemDocument, so the cache will use more memory than its // nominal size size += key.0.heap_size() + doc.heap_size(); ( key.0, ( rev.ts, MemoryDocument { packed_document: doc, cached_system_document: SystemDocument::new(), }, ), ) }) .try_collect() .await?; log_funrun_index_load_rows(index_map.len() as u64, &table_name, &instance_name); Ok(Arc::new(IndexCacheValue { map: index_map, size, })) } #[fastrace::trace] async fn load_unpacked_index( instance_name: &str, index_id: IndexId, persistence_snapshot: &PersistenceSnapshot, tablet_id: TabletId, table_name: &str, ) -> anyhow::Result<(Vec<ResolvedDocument>, u64)> { let _timer = load_index_timer(table_name, instance_name); let documents: Vec<ResolvedDocument> = persistence_snapshot .index_scan( index_id, tablet_id, &Interval::all(), Order::Asc, usize::MAX, ) .map_ok(|(_, rev)| rev.value) .try_collect() .await?; log_funrun_index_load_rows(documents.len() as u64, table_name, instance_name); let size = documents.iter().map(|d| d.size() as u64).sum(); Ok((documents, size)) } struct WithSize<T>(T, u64); impl<T> SizedValue for WithSize<T> { fn size(&self) -> u64 { self.1 } } impl<RT: Runtime> InMemoryIndexCache<RT> { pub fn new(rt: RT) -> Self { Self { cache: MultiTypeAsyncLru::new( rt.clone(), *FUNRUN_INDEX_CACHE_SIZE, *FUNRUN_INDEX_CACHE_CONCURRENCY, "funrun_index_cache", ), rt, } } } impl<RT: Runtime> FunctionRunnerInMemoryIndexes<RT> { /// Get the index from the cache or load it from persistence and put it in /// the cache. If the index is not in the last_modified map, it is not an /// in-memory index and should not be cached. #[fastrace::trace] async fn get_or_load( &self, index_id: IndexId, tablet_id: TabletId, table_name: TableName, ) -> anyhow::Result<Option<Arc<IndexCacheValue>>> { let Some(key) = self .backend_last_modified .get(&index_id) .map(|ts| IndexCacheKey { instance_name: self.instance_name.clone(), index_id, last_modified: *ts, }) else { return Ok(None); }; let table_name = table_name.to_string(); let cache_value_result = self .cache .get( key, load_index( self.instance_name.clone(), index_id, self.persistence_snapshot.clone(), tablet_id, table_name.clone(), ) .boxed(), ) .await .map(Some); log_funrun_index_cache_get(&table_name, &self.instance_name); cache_value_result } /// Returns the TableRegistry together with its last-modified time #[fastrace::trace] async fn load_table_registry( &self, &BootstrapMetadata { tables_by_id, tables_tablet_id, .. }: &BootstrapMetadata, ) -> anyhow::Result<(Timestamp, TableRegistry)> { #[derive(Hash, PartialEq, Eq, Debug, Clone)] struct Key { instance_name: String, tables_last_modified: Timestamp, } impl LruKey for Key { type Value = WithSize<TableRegistry>; } let tables_last_modified = *self .backend_last_modified .get(&tables_by_id) .context("_tables not configured to be in-memory")?; const NAME: &str = "_table_registry"; log_funrun_index_cache_get(NAME, &self.instance_name); let instance_name = self.instance_name.clone(); let persistence_snapshot = self.persistence_snapshot.clone(); let table_registry = self .cache .get( Key { instance_name: self.instance_name.clone(), tables_last_modified, }, async move { let (documents, size) = load_unpacked_index( &instance_name, tables_by_id, &persistence_snapshot, tables_tablet_id, NAME, ) .await?; let (table_mapping, table_states) = DatabaseSnapshot::<RT>::table_mapping_and_states( documents.into_iter().map(|doc| doc.parse()).try_collect()?, ); let registry = TableRegistry::bootstrap( table_mapping, table_states, persistence_snapshot.persistence().version(), )?; // We don't have `HeapSize` implemented for `TableRegistry` // so just approximate its size using the size of the // documents it was made from. Ok(WithSize(registry, size)) } .boxed(), ) .await?; Ok((tables_last_modified, table_registry.0.clone())) } #[fastrace::trace] async fn load_index_registry( &self, table_registry: (Timestamp, TableRegistry), &BootstrapMetadata { index_by_id, index_tablet_id, .. }: &BootstrapMetadata, ) -> anyhow::Result<(Timestamp, IndexRegistry)> { #[derive(Hash, PartialEq, Eq, Debug, Clone)] struct Key { instance_name: String, last_modified: Timestamp, } impl LruKey for Key { type Value = WithSize<IndexRegistry>; } let indexes_last_modified = *self .backend_last_modified .get(&index_by_id) .context("_index not configured to be in-memory")?; const NAME: &str = "_index_registry"; log_funrun_index_cache_get(NAME, &self.instance_name); let instance_name = self.instance_name.clone(); let persistence_snapshot = self.persistence_snapshot.clone(); let index_registry = self .cache .get( Key { instance_name: self.instance_name.clone(), // We use the max of the two timestamps as our cache key // because it's "as if" `load_unpacked_index` is reading at // that timestamp. last_modified: table_registry.0.max(indexes_last_modified), }, async move { let (documents, size) = load_unpacked_index( &instance_name, index_by_id, &persistence_snapshot, index_tablet_id, NAME, ) .await?; let index_registry = IndexRegistry::bootstrap( table_registry.1.table_mapping(), documents.into_iter(), persistence_snapshot.persistence().version(), )?; DatabaseSnapshot::<RT>::verify_invariants(&table_registry.1, &index_registry)?; Ok(WithSize(index_registry, size)) } .boxed(), ) .await?; Ok((indexes_last_modified, index_registry.0.clone())) } #[fastrace::trace] async fn load_component_registry( &self, table_registry: (Timestamp, TableRegistry), index_registry: (Timestamp, IndexRegistry), ) -> anyhow::Result<(Timestamp, ComponentRegistry)> { #[derive(Hash, PartialEq, Eq, Debug, Clone)] struct Key { instance_name: String, last_modified: Timestamp, } impl LruKey for Key { type Value = WithSize<ComponentRegistry>; } let component_tablet_id = table_registry .1 .table_mapping() .namespace(TableNamespace::Global) .id(&COMPONENTS_TABLE)? .tablet_id; let components_by_id = index_registry.1.must_get_by_id(component_tablet_id)?.id; let components_last_modified = *self .backend_last_modified .get(&components_by_id) .context("_components not configured to be in-memory")?; const NAME: &str = "_component_registry"; log_funrun_index_cache_get(NAME, &self.instance_name); let instance_name = self.instance_name.clone(); let persistence_snapshot = self.persistence_snapshot.clone(); let component_registry = self .cache .get( Key { instance_name: self.instance_name.clone(), last_modified: table_registry .0 .max(index_registry.0) .max(components_last_modified), }, async move { let (documents, size) = load_unpacked_index( &instance_name, components_by_id, &persistence_snapshot, component_tablet_id, NAME, ) .await?; let component_registry = ComponentRegistry::bootstrap( table_registry.1.table_mapping(), documents.into_iter().map(|d| d.parse()).try_collect()?, )?; Ok(WithSize(component_registry, size)) } .boxed(), ) .await?; Ok((components_last_modified, component_registry.0.clone())) } #[fastrace::trace] async fn load_schema_registry( &self, table_registry: (Timestamp, TableRegistry), index_registry: (Timestamp, IndexRegistry), component_registry: (Timestamp, ComponentRegistry), ) -> anyhow::Result<(Timestamp, SchemaRegistry)> { #[derive(Hash, PartialEq, Eq, Debug, Clone)] struct Key { instance_name: String, last_modified: Timestamp, } impl LruKey for Key { type Value = WithSize<SchemaRegistry>; } let table_mapping = table_registry.1.table_mapping(); // Each component's namespace has a _schemas table. // Note there may be _schemas table in other namespaces, but we don't care about // those (and also they're not necessarily loaded into memory yet). // This argument only applies because we're in the function runner, which // can only operate in components' namespaces -- internal database workers // like IndexWorker and SchemaWorker include schemas from all namespaces. let component_ids = component_registry .1 .all_component_paths(&mut TransactionReadSet::new()) .into_keys(); let mut last_modified_ts = table_registry .0 .max(index_registry.0) .max(component_registry.0); let mut schema_tables = vec![]; const NAME: &str = "_schema_registry"; for component_id in component_ids { let namespace = component_id.into(); let schema_tablet = table_mapping.namespace(namespace).name_to_tablet()(SCHEMAS_TABLE.clone())?; let index_id = index_registry.1.must_get_by_id(schema_tablet)?.id; let schemas_last_modified = *self .backend_last_modified .get(&index_id) .context("_schemas not configured to be in-memory")?; last_modified_ts = last_modified_ts.max(schemas_last_modified); schema_tables.push((namespace, schema_tablet, index_id)); log_funrun_index_cache_get(NAME, &self.instance_name); } let instance_name = self.instance_name.clone(); let persistence_snapshot = self.persistence_snapshot.clone(); let schema_registry = self .cache .get( Key { instance_name: self.instance_name.clone(), last_modified: last_modified_ts, }, async move { let mut size = 0; let mut schema_docs = BTreeMap::new(); for (namespace, schema_tablet, index_id) in schema_tables { let (component_documents, component_size) = load_unpacked_index( &instance_name, index_id, &persistence_snapshot, schema_tablet, NAME, ) .await?; schema_docs.insert( namespace, component_documents .into_iter() .map(|d| d.parse()) .try_collect()?, ); size += component_size; } let schema_registry = SchemaRegistry::bootstrap(schema_docs); Ok(WithSize(schema_registry, size)) } .boxed(), ) .await?; Ok((last_modified_ts, schema_registry.0.clone())) } #[fastrace::trace] async fn load_registries( &self, bootstrap_metadata: BootstrapMetadata, ) -> anyhow::Result<( TableRegistry, SchemaRegistry, ComponentRegistry, IndexRegistry, )> { // This is unfortunate but we need this cascade of cached lookups // because we don't know what cache keys to use until the previous // registry has been fetched. let table_registry = self.load_table_registry(&bootstrap_metadata).await?; let index_registry = self .load_index_registry(table_registry.clone(), &bootstrap_metadata) .await?; let component_registry = self .load_component_registry(table_registry.clone(), index_registry.clone()) .await?; let schema_registry = self .load_schema_registry( table_registry.clone(), index_registry.clone(), component_registry.clone(), ) .await?; Ok(( table_registry.1, schema_registry.1, component_registry.1, index_registry.1, )) } } impl<RT: Runtime> InMemoryIndexCache<RT> { /// Loads table and index registry from cache or persistence snapshot. #[fastrace::trace] pub(crate) async fn begin_tx( &self, identity: Identity, ts: RepeatableTimestamp, existing_writes: FunctionWrites, persistence: Arc<dyn PersistenceReader>, instance_name: String, in_memory_index_last_modified: BTreeMap<IndexId, Timestamp>, bootstrap_metadata: BootstrapMetadata, table_count_snapshot: Arc<dyn TableCountSnapshot>, text_index_snapshot: Arc<dyn TransactionTextSnapshot>, usage_tracker: FunctionUsageTracker, retention_validator: Arc<dyn RetentionValidator>, ) -> anyhow::Result<Transaction<RT>> { let _timer = begin_tx_timer(); for (index_id, last_modified) in &in_memory_index_last_modified { anyhow::ensure!( *last_modified <= *ts, "Last modified timestamp {last_modified} for index {index_id} is ahead of \ transaction timestamp {}", *ts ); } let repeatable_persistence = RepeatablePersistence::new(persistence.clone(), ts, retention_validator.clone()); let persistence_snapshot = repeatable_persistence.read_snapshot(repeatable_persistence.upper_bound())?; let in_memory_indexes = FunctionRunnerInMemoryIndexes { cache: self.cache.clone(), instance_name, backend_last_modified: in_memory_index_last_modified, persistence_snapshot: persistence_snapshot.clone(), }; let (table_registry, schema_registry, component_registry, index_registry) = in_memory_indexes .load_registries(bootstrap_metadata) .await?; let database_index_snapshot = DatabaseIndexSnapshot::new( index_registry.clone(), Arc::new(in_memory_indexes), table_registry.table_mapping().clone(), persistence_snapshot, ); let mut tx = make_transaction( ts, identity, self.rt.clone(), table_registry, schema_registry, component_registry, index_registry, table_count_snapshot, database_index_snapshot, text_index_snapshot, retention_validator, virtual_system_mapping().clone(), usage_tracker, )?; tx.merge_writes(existing_writes.updates)?; Ok(tx) } } /// An [`InMemoryIndexCache`] that is bound to a single instance at a specific /// timestamp. One instance is created at the beginning of each UDF's /// transaction. pub(crate) struct FunctionRunnerInMemoryIndexes<RT: Runtime> { pub(crate) cache: MultiTypeAsyncLru<RT>, pub(crate) instance_name: String, /// The last modified timestamp for each index at the beginning of the /// Transaction (i.e. as of `persistence_snapshot.timestamp()`). pub(crate) backend_last_modified: BTreeMap<IndexId, Timestamp>, pub(crate) persistence_snapshot: PersistenceSnapshot, } #[async_trait] impl<RT: Runtime> InMemoryIndexes for FunctionRunnerInMemoryIndexes<RT> { async fn range( &self, index_id: IndexId, interval: &Interval, order: Order, tablet_id: TabletId, table_name: TableName, ) -> anyhow::Result<Option<Vec<(IndexKeyBytes, Timestamp, MemoryDocument)>>> { let Some(index_map) = self.get_or_load(index_id, tablet_id, table_name).await? else { return Ok(None); }; let range = order .apply( index_map .map .range(interval) .map(|(k, (ts, v))| (IndexKeyBytes(k.clone()), *ts, v.clone())), ) .collect::<Vec<(IndexKeyBytes, Timestamp, MemoryDocument)>>(); Ok(Some(range)) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server