Skip to main content
Glama

Convex MCP server

Official
by get-convex
vector_index_manager.rs22.5 kB
use std::sync::Arc; use common::{ bootstrap_model::index::{ vector_index::{ FragmentedVectorSegment, VectorIndexBackfillState, VectorIndexSnapshotData, VectorIndexState, }, IndexConfig, IndexMetadata, }, document::{ ParseDocument, ParsedDocument, ResolvedDocument, }, knobs::SEARCHLIGHT_CLUSTER_NAME, runtime::block_in_place, types::{ IndexId, Timestamp, WriteTimestamp, }, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use futures::{ future::BoxFuture, FutureExt, }; use imbl::{ ordmap::Entry, OrdMap, }; use indexing::index_registry::{ index_backfilling_error, Index, IndexRegistry, }; use storage::Storage; use value::{ InternalId, ResolvedDocumentId, }; use crate::{ memory_index::MemoryVectorIndex, metrics::{ self, finish_index_manager_update_timer, VectorIndexType, }, qdrant_index::QdrantSchema, query::{ InternalVectorSearch, VectorSearchQueryResult, }, searcher::VectorSearcher, CompiledVectorSearch, DocInVectorIndex, }; #[derive(Clone)] pub struct VectorIndexManager { pub indexes: IndexState, } #[derive(Clone)] pub enum IndexState { Bootstrapping(OrdMap<IndexId, VectorIndexState>), Ready(OrdMap<IndexId, (VectorIndexState, MemoryVectorIndex)>), } impl IndexState { pub fn insert( &mut self, id: InternalId, state: VectorIndexState, memory_index: MemoryVectorIndex, ) { match self { IndexState::Bootstrapping(ref mut indexes) => { indexes.insert(id, state); }, IndexState::Ready(ref mut indexes) => { indexes.insert(id, (state, memory_index)); }, }; } pub fn update( &mut self, id: &IndexId, new_vector_index_state: Option<VectorIndexState>, mutate_memory: impl FnOnce(&mut MemoryVectorIndex) -> anyhow::Result<()>, ) -> anyhow::Result<()> { match self { IndexState::Bootstrapping(indexes) => { if let Some(new_vector_index_state) = new_vector_index_state { anyhow::ensure!( indexes.insert(*id, new_vector_index_state).is_some(), format!("Missing vector index for {id}"), ); } }, IndexState::Ready(indexes) => match indexes.entry(*id) { Entry::Vacant(_) => anyhow::bail!("Missing vector index for {id}"), Entry::Occupied(ref mut e) => { if let Some(new_vector_index_state) = new_vector_index_state { e.get_mut().0 = new_vector_index_state; } mutate_memory(&mut e.get_mut().1)?; }, }, } Ok(()) } pub fn delete(&mut self, id: &InternalId) { match self { IndexState::Bootstrapping(ref mut indexes) => { indexes.remove(id); }, IndexState::Ready(ref mut indexes) => { indexes.remove(id); }, } } } fn get_vector_index_states( registry: &IndexRegistry, ) -> anyhow::Result<OrdMap<InternalId, VectorIndexState>> { let mut indexes = OrdMap::new(); for index in registry.all_vector_indexes() { let IndexConfig::Vector { spec: _, ref on_disk_state, } = index.config else { continue; }; indexes.insert(index.id().internal_id(), on_disk_state.clone()); } Ok(indexes) } impl VectorIndexManager { pub fn is_bootstrapping(&self) -> bool { matches!(self.indexes, IndexState::Bootstrapping(..)) } pub fn bootstrap_index_metadata(registry: &IndexRegistry) -> anyhow::Result<Self> { let vector_indexes_and_metadata = get_vector_index_states(registry)?; let indexes = IndexState::Bootstrapping(vector_indexes_and_metadata); Ok(Self { indexes }) } pub fn backfilled_and_enabled_index_sizes( &self, ) -> anyhow::Result<impl Iterator<Item = (IndexId, usize)> + '_> { Ok(self .require_ready_indexes()? .iter() .map(|(id, (_, idx))| (*id, idx.size()))) } pub fn num_transactions(&self, index_id: IndexId) -> anyhow::Result<Option<usize>> { let Some((_, index)) = self.require_ready_indexes()?.get(&index_id) else { return Ok(None); }; Ok(Some(index.num_transactions())) } fn require_ready_indexes( &self, ) -> anyhow::Result<&OrdMap<InternalId, (VectorIndexState, MemoryVectorIndex)>> { if let IndexState::Ready(ref indexes) = self.indexes { Ok(indexes) } else { anyhow::bail!(ErrorMetadata::feature_temporarily_unavailable( "VectorIndexesUnavailable", "Vector indexes are bootstrapping and not yet available for use", )) } } fn require_ready_index( &self, id: &InternalId, ) -> anyhow::Result<Option<&(VectorIndexState, MemoryVectorIndex)>> { Ok(self.require_ready_indexes()?.get(id)) } pub fn update( &mut self, index_registry: &IndexRegistry, deletion: Option<&ResolvedDocument>, insertion: Option<&ResolvedDocument>, ts: WriteTimestamp, ) -> anyhow::Result<DocInVectorIndex> { let timer = metrics::index_manager_update_timer(); let Some(id) = deletion.as_ref().or(insertion.as_ref()).map(|d| d.id()) else { finish_index_manager_update_timer(timer, metrics::IndexUpdateType::None); return Ok(DocInVectorIndex::Absent); }; if self.update_vector_index_metadata(id, index_registry, deletion, insertion, ts)? { finish_index_manager_update_timer(timer, metrics::IndexUpdateType::IndexMetadata); return Ok(DocInVectorIndex::Absent); } if let IndexState::Ready(..) = self.indexes { if self.update_vector_index_contents(id, index_registry, deletion, insertion, ts)? { finish_index_manager_update_timer(timer, metrics::IndexUpdateType::Document); return Ok(DocInVectorIndex::Present); } } finish_index_manager_update_timer(timer, metrics::IndexUpdateType::None); Ok(DocInVectorIndex::Absent) } fn update_vector_index_contents( &mut self, id: ResolvedDocumentId, index_registry: &IndexRegistry, deletion: Option<&ResolvedDocument>, insertion: Option<&ResolvedDocument>, ts: WriteTimestamp, ) -> anyhow::Result<bool> { let mut at_least_one_matching_index = false; for index in index_registry.vector_indexes_by_table(id.tablet_id) { let IndexConfig::Vector { ref spec, .. } = index.metadata.config else { continue; }; let qdrant_schema = QdrantSchema::new(spec); let old_value = deletion.as_ref().and_then(|d| qdrant_schema.index(d)); let new_value = insertion.as_ref().and_then(|d| qdrant_schema.index(d)); at_least_one_matching_index = at_least_one_matching_index || old_value.is_some() || new_value.is_some(); self.indexes.update(&index.id, None, |memory_index| { memory_index.update(id.internal_id(), ts, old_value, new_value) })?; } Ok(at_least_one_matching_index) } fn update_vector_index_metadata( &mut self, id: ResolvedDocumentId, index_registry: &IndexRegistry, deletion: Option<&ResolvedDocument>, insertion: Option<&ResolvedDocument>, ts: WriteTimestamp, ) -> anyhow::Result<bool> { if id.tablet_id != index_registry.index_table() { return Ok(false); } match (deletion, insertion) { (None, Some(insertion)) => { let metadata = IndexMetadata::try_from(insertion.value().clone().0)?; if let IndexConfig::Vector { ref on_disk_state, .. } = metadata.config { let VectorIndexState::Backfilling(state) = on_disk_state else { anyhow::bail!( "Inserted new search index that wasn't backfilling: {metadata:?}" ); }; let index = VectorIndexState::Backfilling(state.clone()); self.indexes.insert( insertion.id().internal_id(), index, MemoryVectorIndex::new(ts), ); metrics::log_index_created() } }, (Some(prev_version), Some(next_version)) => { let prev_metadata: ParsedDocument<IndexMetadata<_>> = prev_version.parse()?; let next_metadata: ParsedDocument<IndexMetadata<_>> = next_version.parse()?; let (old_snapshot, new_snapshot, staged) = match (&prev_metadata.config, &next_metadata.config) { ( IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilling(VectorIndexBackfillState { .. }), .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilling(VectorIndexBackfillState { staged, .. }), .. }, ) => (None, None, *staged), ( IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilling { .. }, .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilled { snapshot, staged }, .. }, ) => (None, Some(snapshot), *staged), ( IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilled { snapshot: old_snapshot, .. }, .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::SnapshottedAt(new_snapshot), .. }, ) => (Some(old_snapshot), Some(new_snapshot), false), ( IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilled { snapshot: old_snapshot, .. }, .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilled { snapshot: new_snapshot, staged, }, .. }, ) => (Some(old_snapshot), Some(new_snapshot), *staged), ( IndexConfig::Vector { on_disk_state: VectorIndexState::SnapshottedAt(old_snapshot), .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::SnapshottedAt(new_snapshot), .. }, ) => (Some(old_snapshot), Some(new_snapshot), false), ( IndexConfig::Vector { on_disk_state: VectorIndexState::SnapshottedAt(old_snapshot), .. }, IndexConfig::Vector { on_disk_state: VectorIndexState::Backfilled { snapshot: new_snapshot, staged, }, .. }, ) => { anyhow::ensure!( old_snapshot == new_snapshot, "Snapshot mismatch when disabling vector index" ); anyhow::ensure!(staged, "Disabled vector index must be staged"); (Some(old_snapshot), Some(new_snapshot), *staged) }, (IndexConfig::Vector { .. }, _) | (_, IndexConfig::Vector { .. }) => { anyhow::bail!( "Invalid index type transition: {prev_metadata:?} to \ {next_metadata:?}" ); }, _ => (None, None, false), }; if let Some(new_snapshot) = new_snapshot { let is_newly_enabled = !prev_metadata.config.is_enabled() && next_metadata.config.is_enabled(); let is_updated_snapshot = if let Some(old_snapshot) = old_snapshot { old_snapshot.ts < new_snapshot.ts } else { true }; if is_newly_enabled || is_updated_snapshot { let is_next_index_enabled = next_metadata.config.is_enabled(); let updated_state = if is_next_index_enabled { VectorIndexState::SnapshottedAt(new_snapshot.clone()) } else { VectorIndexState::Backfilled { snapshot: new_snapshot.clone(), staged, } }; self.indexes.update( &id.internal_id(), Some(updated_state), |memory_index| memory_index.truncate(new_snapshot.ts.succ()?), )?; if !prev_metadata.into_value().config.is_enabled() && is_next_index_enabled { metrics::log_index_backfilled(); } else { metrics::log_index_advanced(); } } } }, (Some(deletion), None) => { let metadata: ParsedDocument<IndexMetadata<_>> = deletion.parse()?; if metadata.is_vector_index() { self.indexes.delete(&deletion.id().internal_id()); metrics::log_index_deleted(); } }, _ => panic!("Had neither a deletion nor insertion despite checking above"), } Ok(true) } pub async fn vector_search( &self, index: &Index, query: InternalVectorSearch, searcher: Arc<dyn VectorSearcher>, search_storage: Arc<dyn Storage>, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { let timer = metrics::search_timer(&SEARCHLIGHT_CLUSTER_NAME); let result: anyhow::Result<_> = try { let IndexConfig::Vector { ref spec, .. } = index.metadata.config else { anyhow::bail!(ErrorMetadata::bad_request( "IndexNotAVectorIndexError", format!( "Index {} is not a vector index", query.printable_index_name()? ) )); }; let Some((vector_index, memory_index)) = self.require_ready_index(&index.id())? else { anyhow::bail!("Vector index {:?} not available", index.id()); }; let qdrant_schema = QdrantSchema::new(spec); let VectorIndexState::SnapshottedAt(ref snapshot) = vector_index else { anyhow::bail!(index_backfilling_error(&query.printable_index_name()?)); }; let (disk_revisions, vector_index_type) = match snapshot.data { VectorIndexSnapshotData::Unknown(_) => { anyhow::bail!(index_backfilling_error(&query.printable_index_name()?)) }, VectorIndexSnapshotData::MultiSegment(ref segments) => ( self.multi_segment_search( query, searcher, segments, search_storage, qdrant_schema, memory_index, snapshot.ts, ) .await?, VectorIndexType::MultiSegment, ), }; (disk_revisions, vector_index_type) }; match result { Ok((disk_revisions, vector_index_type)) => { metrics::finish_search(timer, &disk_revisions, vector_index_type); Ok(disk_revisions) }, Err(e) => { if e.is_bad_request() { timer.finish_developer_error(); } Err(e) }, } } async fn multi_segment_search( &self, query: InternalVectorSearch, searcher: Arc<dyn VectorSearcher>, segments: &Vec<FragmentedVectorSegment>, search_storage: Arc<dyn Storage>, qdrant_schema: QdrantSchema, memory_index: &MemoryVectorIndex, ts: Timestamp, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { self.compile_search_and_truncate( query, qdrant_schema, memory_index, ts, |qdrant_schema, compiled_query, overfetch_delta| { async move { let timer = metrics::searchlight_client_execute_timer( VectorIndexType::MultiSegment, &SEARCHLIGHT_CLUSTER_NAME, ); let total_segments = segments.len(); let results = searcher .execute_multi_segment_vector_query( search_storage.clone(), segments .iter() .cloned() .map(|segment| segment.to_paths_proto()) .try_collect()?, qdrant_schema, compiled_query.clone(), overfetch_delta as u32, ) .await?; metrics::log_num_segments_searched_total(total_segments); metrics::finish_searchlight_client_execute(timer, &results); Ok(results) } .boxed() }, ) .await } async fn compile_search_and_truncate<'a>( &'a self, query: InternalVectorSearch, qdrant_schema: QdrantSchema, memory_index: &MemoryVectorIndex, ts: Timestamp, call_searchlight: impl FnOnce( QdrantSchema, CompiledVectorSearch, usize, ) -> BoxFuture<'a, anyhow::Result<Vec<VectorSearchQueryResult>>>, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { let compiled_query = qdrant_schema.compile(query)?; let updated_matches = memory_index.updated_matches(ts, &compiled_query)?; let overfetch_delta = updated_matches.len(); metrics::log_searchlight_overfetch_delta(overfetch_delta); let mut disk_revisions = call_searchlight(qdrant_schema, compiled_query.clone(), overfetch_delta).await?; block_in_place(|| { // Filter out revisions that are no longer latest. disk_revisions.retain(|r| !updated_matches.contains(&r.id)); let memory_revisions = memory_index.query(ts, &compiled_query)?; disk_revisions.extend(memory_revisions); let original_len = disk_revisions.len(); disk_revisions.sort_by(|a, b| a.cmp(b).reverse()); disk_revisions.truncate(compiled_query.limit as usize); metrics::log_num_discarded_revisions(original_len - disk_revisions.len()); anyhow::Ok(()) })?; Ok(disk_revisions) } pub fn total_in_memory_size(&self) -> usize { if let IndexState::Ready(ref indexes) = self.indexes { indexes .iter() .map(|(_, (_, memory_index))| memory_index.size()) .sum() } else { 0 } } pub fn in_memory_sizes(&self) -> Vec<(IndexId, usize)> { if let IndexState::Ready(ref indexes) = self.indexes { indexes.iter().map(|(id, (_, s))| (*id, s.size())).collect() } else { vec![] } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server