Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs20.4 kB
use std::{ cmp, collections::{ BTreeMap, HashSet, }, future::Future, sync::Arc, }; use common::{ backoff::Backoff, bootstrap_model::index::{ database_index::{ DatabaseIndexState, IndexedFields, }, IndexConfig, TabletIndexMetadata, }, errors::report_error, knobs::{ INDEX_BACKFILL_CONCURRENCY, INDEX_WORKERS_INITIAL_BACKOFF, INDEX_WORKERS_MAX_BACKOFF, }, persistence::{ Persistence, RetentionValidator, }, runtime::Runtime, types::{ IndexId, RepeatableTimestamp, TabletIndexName, }, }; use futures::FutureExt; use hashlink::LinkedHashSet; use keybroker::Identity; use tokio::select; use tokio_util::task::JoinMap; use value::{ DeveloperDocumentId, ResolvedDocumentId, TableNamespace, TabletId, }; use crate::{ bootstrap_model::index_backfills::IndexBackfillModel, database_index_workers::index_writer::{ IndexSelector, IndexWriter, }, metrics::{ log_index_backfilled, log_num_indexes_to_backfill, tablet_index_backfill_timer, }, system_tables::SystemIndex, Database, IndexTable, SystemMetadataModel, Transaction, }; pub mod index_writer; pub struct IndexWorker<RT: Runtime> { /// Index IDs that are currently being backfilled. in_progress_index_ids: HashSet<IndexId, ahash::RandomState>, /// The index backfill tasks in_progress: JoinMap<Vec<IndexId>, anyhow::Result<()>>, /// Order-preserving HashSet that represents the order that pending index /// backfills will be processed. This does not include indexes that are /// `in_progress`. pending: LinkedHashSet<(IndexId, TabletId), ahash::RandomState>, /// Limit on the size of `in_progress` max_concurrency: usize, metadata_mutex: Arc<tokio::sync::Mutex<()>>, database: Database<RT>, index_writer: IndexWriter<RT>, backoff: Backoff, runtime: RT, #[cfg(any(test, feature = "testing"))] pub should_terminate: bool, } impl<RT: Runtime> IndexWorker<RT> { #[allow(clippy::new_ret_no_self)] pub fn new( runtime: RT, persistence: Arc<dyn Persistence>, retention_validator: Arc<dyn RetentionValidator>, database: Database<RT>, ) -> impl Future<Output = ()> + Send { let reader = persistence.reader(); let index_writer = IndexWriter::new( persistence.clone(), reader.clone(), retention_validator, runtime.clone(), ); let mut worker = IndexWorker { in_progress_index_ids: Default::default(), in_progress: JoinMap::new(), pending: Default::default(), max_concurrency: *INDEX_BACKFILL_CONCURRENCY, metadata_mutex: Default::default(), database, index_writer, backoff: Backoff::new(*INDEX_WORKERS_INITIAL_BACKOFF, *INDEX_WORKERS_MAX_BACKOFF), runtime, #[cfg(any(test, feature = "testing"))] should_terminate: false, }; tracing::info!("Starting IndexWorker"); async move { loop { if let Err(e) = worker.run().await { report_error(&mut e.context("IndexWorkerLoop died")).await; let delay = worker.backoff.fail(&mut worker.runtime.rng()); tracing::error!( "IndexIndexWorker died, num_failures: {}. Backing off for {}ms", worker.backoff.failures(), delay.as_millis() ); worker.runtime.wait(delay).await; } } } } /// Test-only variant that terminates when there are no more indexes to /// backfill. #[cfg(any(test, feature = "testing"))] pub fn new_terminating( runtime: RT, persistence: Arc<dyn Persistence>, retention_validator: Arc<dyn RetentionValidator>, database: Database<RT>, ) -> impl Future<Output = anyhow::Result<()>> + Send { let reader = persistence.reader(); let index_writer = IndexWriter::new( persistence.clone(), reader.clone(), retention_validator, runtime.clone(), ); let mut worker = IndexWorker { in_progress_index_ids: Default::default(), in_progress: JoinMap::new(), pending: Default::default(), max_concurrency: 10, metadata_mutex: Default::default(), database, index_writer, backoff: Backoff::new(*INDEX_WORKERS_INITIAL_BACKOFF, *INDEX_WORKERS_MAX_BACKOFF), runtime, should_terminate: true, }; async move { loop { use errors::ErrorMetadataAnyhowExt; let r = worker.run().await; if let Err(ref e) = r && e.is_occ() { tracing::error!("IndexWorker loop failed: {e:?}"); continue; } if worker.in_progress.is_empty() && worker.pending.is_empty() { return r; } } } } async fn run(&mut self) -> anyhow::Result<()> { // Get all the documents from the `_index` table. let mut tx = self.database.begin(Identity::system()).await?; // _index doesn't have `by_creation_time` index, and thus must use `by_id`. let index_documents = tx .query_system(TableNamespace::Global, &SystemIndex::<IndexTable>::by_id())? .all() .await?; let mut num_to_backfill = 0; for index_metadata in &index_documents { if let IndexConfig::Database { on_disk_state, .. } = &index_metadata.config { if matches!(on_disk_state, DatabaseIndexState::Backfilling(_)) { let index_id = index_metadata.id().internal_id(); let tablet_id = *index_metadata.name.table(); if !self.in_progress_index_ids.contains(&index_id) && !self.pending.contains(&(index_id, tablet_id)) { self.pending.insert((index_id, tablet_id)); } num_to_backfill += 1; } } } log_num_indexes_to_backfill(num_to_backfill); tracing::info!( "{num_to_backfill} database indexes to backfill @ {}", tx.begin_timestamp() ); let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; #[cfg(any(test, feature = "testing"))] if self.should_terminate && self.in_progress.is_empty() && self.pending.is_empty() { return Ok(()); } // Start new work if allowed by the concurrency limit while self.in_progress.len() < self.max_concurrency && let Some((index_id, tablet_id)) = self.pending.pop_front() { self.queue_index_backfill(index_id, tablet_id); } select! { biased; // Start by finding indexes that have finished backfilling res = self.in_progress.join_next(), if !self.in_progress.is_empty() => { let (index_ids, res) = res.expect("join_next cannot return None if nonempty"); // First, make sure `in_progress_index_ids` is always consistent with `in_progress` for &index_id in &index_ids { self.in_progress_index_ids.remove(&index_id); } // If backfill tasks are failing, return an error here so that we back off let () = res??; tracing::info!("Finished backfilling {index_ids:?}"); // Return so that we possibly queue up more work } // Alternatively, wait for invalidation _ = subscription.wait_for_invalidation().fuse() => { self.backoff.reset(); } } Ok(()) } /// Spawns a task to process the next index backfill. fn queue_index_backfill(&mut self, index_id: IndexId, tablet_id: TabletId) { let mut index_ids = vec![index_id]; // Since we're mainly limited by the speed of reading the table, let's // grab all the other pending indexes for this table at once self.pending.retain(|&(other_index_id, other_tablet_id)| { if other_tablet_id == tablet_id { index_ids.push(other_index_id); false } else { true } }); // TODO: this allows more than one `backfill_tablet` task to run // simultaneously on the same table; it could be better to cancel old // tasks and restart them. for &index_id in &index_ids { self.in_progress_index_ids.insert(index_id); } self.in_progress.spawn( index_ids.clone(), Self::backfill_tablet( tablet_id, index_ids, self.database.clone(), self.index_writer.clone(), self.metadata_mutex.clone(), ), ); } async fn backfill_tablet( tablet_id: TabletId, index_ids: Vec<IndexId>, database: Database<RT>, index_writer: IndexWriter<RT>, metadata_mutex: Arc<tokio::sync::Mutex<()>>, ) -> anyhow::Result<()> { let _timer = tablet_index_backfill_timer(); let mut backfills = BTreeMap::new(); for index_id in &index_ids { let (index_name, retention_started) = Self::begin_backfill(*index_id, &database).await?; backfills.insert(*index_id, (index_name, retention_started)); } let needs_backfill = backfills .iter() // If retention is already started, we're already done with the // initial step of the backfill. .filter(|(_, (_, retention_started))| !*retention_started) .map(|(index_id, (index_name, _))| (*index_id, index_name.clone())) .collect::<BTreeMap<_, _>>(); if !needs_backfill.is_empty() { let ts = database.now_ts_for_reads(); let snapshot = database.snapshot(ts)?; let table_mapping = snapshot.table_mapping(); let table_name = &table_mapping.tablet_to_name()(tablet_id)?; tracing::info!( "Starting backfill of {} indexes for {table_name}: {needs_backfill:?}", needs_backfill.len() ); let table_summary = snapshot.table_summary(table_mapping.tablet_namespace(tablet_id)?, table_name); let total_docs = table_summary.map(|summary| summary.num_values()); let mut tx = database.begin_system().await?; let mut index_backfill_model = IndexBackfillModel::new(&mut tx); for index_id in needs_backfill.keys() { index_backfill_model .initialize_backfill(*index_id, total_docs) .await?; } database .commit_with_write_source(tx, "index_worker_backfill_initialization") .await?; let index_selector = IndexSelector::ManyIndexes { tablet_id, indexes: needs_backfill, }; let index_registry = snapshot.index_registry; index_writer .perform_backfill( ts, &index_registry, index_selector, 1, Some(database.clone()), ) .await?; } let mut min_begin_ts = None; let mut retention = BTreeMap::new(); // The database currently does not allow concurrent writers to the // `_index` (or `_tables`) tables; see a TODO in // `Writes::record_reads_for_write`. // Since we run many `backfill_tablet` tasks concurrently, synchronize // here to avoid creating OCC conflicts with ourselves. let indexes_lock = metadata_mutex.lock().await; let mut tx = database.begin(Identity::system()).await?; for index_id in &index_ids { let (backfill_begin_ts, index_name, indexed_fields) = Self::begin_retention(&mut tx, *index_id).await?; min_begin_ts = min_begin_ts .map(|t| cmp::min(t, backfill_begin_ts)) .or(Some(backfill_begin_ts)); retention.insert(*index_id, (index_name, indexed_fields)); } database .commit_with_write_source(tx, "index_worker_start_retention") .await?; drop(indexes_lock); if let Some(min_begin_ts) = min_begin_ts { tracing::info!( "Started running retention for {} indexes: {retention:?}", retention.len() ); index_writer.run_retention(min_begin_ts, retention).await?; } let indexes_lock = metadata_mutex.lock().await; let mut tx = database.begin(Identity::system()).await?; for index_id in index_ids { Self::finish_backfill(&mut tx, index_id).await?; } database .commit_with_write_source(tx, "index_worker_finish_backfill") .await?; drop(indexes_lock); Ok(()) } async fn begin_backfill( index_id: IndexId, database: &Database<RT>, ) -> anyhow::Result<(TabletIndexName, bool)> { let mut tx = database.begin(Identity::system()).await?; let index_table_id = tx.bootstrap_tables().index_id; // If we observe an index to be in `Backfilling` state at some `ts`, we // know that all documents written after `ts` will already be in the index. // The index may contain writes from before `ts` too, but that's okay. We'll // just overwrite them. let index_doc = tx .get(ResolvedDocumentId::new( index_table_id.tablet_id, DeveloperDocumentId::new(index_table_id.table_number, index_id), )) .await? .ok_or_else(|| anyhow::anyhow!("Index {index_id:?} no longer exists"))?; let index_metadata = TabletIndexMetadata::from_document(index_doc)?; // Assuming that the IndexWorker is the only writer of index state, we expect // the state to still be `Backfilling` here. If this assertion fails, we // somehow raced with another `IndexWorker`(!) or don't actually have the // database lease (!). let retention_started = match &index_metadata.config { IndexConfig::Database { on_disk_state, .. } => { let DatabaseIndexState::Backfilling(state) = on_disk_state else { anyhow::bail!( "IndexWorker started backfilling index {index_metadata:?} not in \ Backfilling state" ); }; state.retention_started }, _ => anyhow::bail!( "IndexWorker attempted to backfill an index {index_metadata:?} which wasn't a \ database index." ), }; Ok((index_metadata.name.clone(), retention_started)) } async fn begin_retention( tx: &mut Transaction<RT>, index_id: IndexId, ) -> anyhow::Result<(RepeatableTimestamp, TabletIndexName, IndexedFields)> { let index_table_id = tx.bootstrap_tables().index_id; let index_doc = tx .get(ResolvedDocumentId::new( index_table_id.tablet_id, DeveloperDocumentId::new(index_table_id.table_number, index_id), )) .await? .ok_or_else(|| anyhow::anyhow!("Index {index_id:?} no longer exists"))?; let mut index_metadata = TabletIndexMetadata::from_document(index_doc)?; // Assuming that the IndexWorker is the only writer of index state, we expect // the state to still be `Backfilling` here. If this assertion fails, we // somehow raced with another `IndexWorker`(!) or don't actually have the // database lease (!). let (index_ts, indexed_fields) = match &mut index_metadata.config { IndexConfig::Database { on_disk_state, spec, } => { let DatabaseIndexState::Backfilling(state) = on_disk_state else { anyhow::bail!( "IndexWorker started backfilling index {index_metadata:?} not in \ Backfilling state" ) }; state.retention_started = true; ( tx.begin_timestamp() .prior_ts(state.index_created_lower_bound)?, spec.fields.clone(), ) }, _ => anyhow::bail!( "IndexWorker attempted to backfill an index {index_metadata:?} which wasn't a \ database index." ), }; let name = index_metadata.name.clone(); SystemMetadataModel::new_global(tx) .replace(index_metadata.id(), index_metadata.into_value().try_into()?) .await?; Ok((index_ts, name, indexed_fields)) } async fn finish_backfill(tx: &mut Transaction<RT>, index_id: IndexId) -> anyhow::Result<()> { // Now that we're done, write that we've finished backfilling the index, sanity // checking that it wasn't written concurrently with our backfill. let index_table_id = tx.bootstrap_tables().index_id; let full_index_id = ResolvedDocumentId::new( index_table_id.tablet_id, DeveloperDocumentId::new(index_table_id.table_number, index_id), ); let index_doc = tx .get(full_index_id) .await? .ok_or_else(|| anyhow::anyhow!("Index {index_id:?} no longer exists"))?; let mut index_metadata = TabletIndexMetadata::from_document(index_doc)?; let is_system_index_on_user_table = index_metadata.name.descriptor().is_reserved(); let is_index_on_system_table = tx .table_mapping() .is_system_tablet(*index_metadata.name.table()); match index_metadata.config { IndexConfig::Database { ref mut on_disk_state, .. } => { let DatabaseIndexState::Backfilling(ref backfilling_state) = *on_disk_state else { anyhow::bail!( "IndexWorker finished backfilling index {index_metadata:?} not in \ Backfilling state" ); }; *on_disk_state = if is_system_index_on_user_table || is_index_on_system_table { DatabaseIndexState::Enabled } else { DatabaseIndexState::Backfilled { staged: backfilling_state.staged, } }; }, _ => anyhow::bail!( "IndexWorker finished backfilling index {index_metadata:?} which wasn't a \ database index" ), }; let name = index_metadata.name.clone(); SystemMetadataModel::new_global(tx) .replace(full_index_id, index_metadata.into_value().try_into()?) .await?; let table_name = tx.table_mapping().tablet_name(*name.table())?; tracing::info!("Finished backfill of index {}", name); if is_index_on_system_table || is_system_index_on_user_table { tracing::info!( "Finished backfill of system index {table_name}.{}", name.descriptor() ); } log_index_backfilled(); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server