Skip to main content
Glama

Convex MCP server

Official
by get-convex
text_test_utils.rs13.7 kB
use std::sync::Arc; use anyhow::Context; use common::{ bootstrap_model::index::{ text_index::{ FragmentedTextSegment, TextIndexSnapshot, TextIndexSnapshotData, TextIndexState, }, IndexConfig, IndexMetadata, TabletIndexMetadata, }, document::{ ParsedDocument, ResolvedDocument, }, pause::PauseController, persistence::PersistenceReader, query::{ Query, QueryOperator, QuerySource, Search, SearchFilterExpression, }, runtime::testing::TestRuntime, types::{ GenericIndexName, IndexDescriptor, IndexName, TabletIndexName, }, version::MIN_NPM_VERSION_FOR_FUZZY_SEARCH, }; use futures::try_join; use maplit::btreeset; use must_let::must_let; use search::{ searcher::InProcessSearcher, Searcher, SegmentTermMetadataFetcher, MAX_CANDIDATE_REVISIONS, }; use storage::Storage; use sync_types::Timestamp; use value::{ assert_obj, DeveloperDocumentId, FieldPath, ResolvedDocumentId, TableName, TableNamespace, }; use crate::{ bootstrap_model::index_backfills::IndexBackfillModel, search_index_workers::{ search_compactor::CompactionConfig, FlusherType, }, test_helpers::{ DbFixtures, DbFixturesArgs, }, text_index_worker::{ compactor::{ new_text_compactor_for_tests, TextIndexCompactor, }, flusher::{ backfill_text_indexes, FlusherBuilder, TextIndexFlusher, }, BuildTextIndexArgs, TextIndexMetadataWriter, }, Database, IndexBackfillMetadata, IndexModel, ResolvedQuery, TestFacingModel, Transaction, }; #[derive(Clone)] pub struct TextFixtures { pub rt: TestRuntime, pub storage: Arc<dyn Storage>, pub db: Database<TestRuntime>, pub reader: Arc<dyn PersistenceReader>, searcher: Arc<dyn Searcher>, segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, writer: TextIndexMetadataWriter<TestRuntime>, namespace: TableNamespace, config: CompactionConfig, } impl TextFixtures { pub async fn new(rt: TestRuntime) -> anyhow::Result<Self> { Self::new_with_config(rt, CompactionConfig::default()).await } pub async fn new_with_config( rt: TestRuntime, config: CompactionConfig, ) -> anyhow::Result<Self> { let in_process_searcher = InProcessSearcher::new(rt.clone()).await?; let DbFixtures { tp, db, search_storage, searcher, .. } = DbFixtures::new_with_args( &rt, DbFixturesArgs { searcher: Some(Arc::new(in_process_searcher.clone())), ..Default::default() }, ) .await?; let segment_term_metadata_fetcher = Arc::new(in_process_searcher); let writer = TextIndexMetadataWriter::new( rt.clone(), db.clone(), tp.reader(), search_storage.clone(), BuildTextIndexArgs { search_storage: search_storage.clone(), segment_term_metadata_fetcher: segment_term_metadata_fetcher.clone(), }, ); Ok(Self { rt, db, reader: tp.reader(), storage: search_storage, segment_term_metadata_fetcher, writer, namespace: TableNamespace::test_user(), searcher, config, }) } pub(crate) fn new_search_flusher_builder(&self) -> FlusherBuilder<TestRuntime> { FlusherBuilder::new( self.rt.clone(), self.db.clone(), self.reader.clone(), self.storage.clone(), self.segment_term_metadata_fetcher.clone(), self.writer.clone(), FlusherType::Backfill, ) } pub fn new_backfill_text_flusher(&self) -> TextIndexFlusher<TestRuntime> { self.new_search_flusher_builder().set_soft_limit(0).build() } pub fn new_live_text_flusher(&self) -> TextIndexFlusher<TestRuntime> { self.new_search_flusher_builder() .set_soft_limit(0) .set_live_flush() .build() } pub fn new_backfill_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> { self.new_search_flusher_builder() .set_soft_limit(2048) .build() } pub fn new_live_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> { self.new_search_flusher_builder() .set_soft_limit(2048) .set_live_flush() .build() } pub fn new_compactor(&self) -> TextIndexCompactor<TestRuntime> { new_text_compactor_for_tests( self.rt.clone(), self.db.clone(), self.reader.clone(), self.storage.clone(), self.searcher.clone(), self.segment_term_metadata_fetcher.clone(), self.config.clone(), ) } pub async fn enabled_text_index(&self) -> anyhow::Result<IndexData> { let index_data = self.backfilled_text_index().await?; let mut tx = self.db.begin_system().await?; IndexModel::new(&mut tx) .enable_index_for_testing(index_data.namespace, &index_data.index_name) .await?; self.db.commit(tx).await?; Ok(index_data) } pub async fn backfilled_text_index(&self) -> anyhow::Result<IndexData> { let index_data = self.insert_backfilling_text_index().await?; self.backfill().await?; Ok(index_data) } pub async fn index_backfill_progress( &self, index_id: DeveloperDocumentId, ) -> anyhow::Result<Option<Arc<ParsedDocument<IndexBackfillMetadata>>>> { let mut tx = self.db.begin_system().await?; IndexBackfillModel::new(&mut tx) .existing_backfill_metadata(index_id) .await } pub async fn backfill(&self) -> anyhow::Result<()> { backfill_text_indexes( self.rt.clone(), self.db.clone(), self.reader.clone(), self.storage.clone(), self.segment_term_metadata_fetcher.clone(), ) .await } pub async fn assert_backfilled(&self, index_name: &IndexName) -> anyhow::Result<Timestamp> { let mut tx = self.db.begin_system().await?; let new_metadata = IndexModel::new(&mut tx) .pending_index_metadata(self.namespace, index_name)? .context("Index missing or in an unexpected state")? .into_value(); must_let!(let IndexMetadata { config: IndexConfig::Text { on_disk_state: TextIndexState::Backfilled { snapshot: TextIndexSnapshot { ts, .. }, staged: _, }, .. }, .. } = new_metadata); Ok(ts) } pub async fn insert_backfilling_text_index(&self) -> anyhow::Result<IndexData> { let mut tx = self.db.begin_system().await?; let index_metadata = backfilling_text_index()?; let index_name = &index_metadata.name; let index_id = IndexModel::new(&mut tx) .add_application_index(self.namespace, index_metadata.clone()) .await?; let table_id = tx .table_mapping() .namespace(self.namespace) .id(index_name.table())? .tablet_id; self.db.commit(tx).await?; let resolved_index_name = TabletIndexName::new(table_id, index_name.descriptor().clone())?; Ok(IndexData { index_id, resolved_index_name, index_name: index_name.clone(), namespace: self.namespace, }) } pub async fn insert_backfilling_text_index_with_document(&self) -> anyhow::Result<IndexData> { let index_data = self.insert_backfilling_text_index().await?; let mut tx = self.db.begin_system().await?; add_document(&mut tx, index_data.index_name.table(), "A long text field").await?; self.db.commit(tx).await?; Ok(index_data) } pub async fn get_index_metadata( &self, index_name: GenericIndexName<TableName>, ) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> { let mut tx = self.db.begin_system().await?; let mut index_model = IndexModel::new(&mut tx); let mut metadata = index_model.enabled_index_metadata(self.namespace, &index_name)?; if metadata.is_none() { metadata = index_model.pending_index_metadata(self.namespace, &index_name)?; } let metadata = metadata.context("Index is neither pending nor enabled!")?; Ok(metadata) } pub async fn get_segments_metadata( &self, index_name: GenericIndexName<TableName>, ) -> anyhow::Result<Vec<FragmentedTextSegment>> { let metadata = self.get_index_metadata(index_name).await?; must_let!(let IndexConfig::Text { on_disk_state, .. } = &metadata.config); let snapshot = match on_disk_state { TextIndexState::Backfilling(_) => anyhow::bail!("Still backfilling!"), TextIndexState::Backfilled { snapshot, .. } | TextIndexState::SnapshottedAt(snapshot) => snapshot, }; must_let!(let TextIndexSnapshotData::MultiSegment(segments) = &snapshot.data); Ok(segments.clone()) } pub async fn replace_document( &self, doc_id: ResolvedDocumentId, text: &str, ) -> anyhow::Result<()> { let mut tx = self.db.begin_system().await?; let document = assert_obj!( "text" => text, "channel" => "#general", ); tx.replace_inner(doc_id, document).await?; self.db.commit(tx).await?; Ok(()) } pub async fn add_document(&self, text: &str) -> anyhow::Result<ResolvedDocumentId> { let table_name = TABLE_NAME.parse::<TableName>()?; let mut tx = self.db.begin_system().await?; let doc_id = add_document(&mut tx, &table_name, text).await?; self.db.commit(tx).await?; Ok(doc_id) } pub async fn enable_index( &self, index_name: &GenericIndexName<TableName>, ) -> anyhow::Result<()> { let mut tx = self.db.begin_system().await?; IndexModel::new(&mut tx) .enable_index_for_testing(self.namespace, index_name) .await?; self.db.commit(tx).await?; Ok(()) } pub async fn search( &self, index_name: GenericIndexName<TableName>, query_string: &str, ) -> anyhow::Result<Vec<ResolvedDocument>> { let mut tx = self.db.begin_system().await?; let filters = vec![SearchFilterExpression::Search( SEARCH_FIELD.parse()?, query_string.into(), )]; let search = Search { table: index_name.table().clone(), index_name, filters, }; let query = Query { source: QuerySource::Search(search), operators: vec![QueryOperator::Limit(MAX_CANDIDATE_REVISIONS)], }; let mut query_stream = ResolvedQuery::new_with_version( &mut tx, TableNamespace::test_user(), query, Some(MIN_NPM_VERSION_FOR_FUZZY_SEARCH.clone()), )?; let mut values = vec![]; while let Some(value) = query_stream.next(&mut tx, None).await? { values.push(value); } Ok(values) } pub async fn run_compaction_during_flush( &self, pause: PauseController, label: &'static str, ) -> anyhow::Result<()> { let flusher = self.new_live_text_flusher(); let hold_guard = pause.hold(label); let flush = flusher.step(); let compactor = self.new_compactor(); let compact_during_flush = async move { if let Some(pause_guard) = hold_guard.wait_for_blocked().await { let (metrics, _) = compactor.step().await?; for (_index_name, num_segments_compacted) in metrics { assert!(num_segments_compacted > 0); } pause_guard.unpause(); }; Ok::<(), anyhow::Error>(()) }; try_join!(flush, compact_during_flush)?; Ok(()) } } const TABLE_NAME: &str = "table"; const SEARCH_FIELD: &str = "text"; pub struct IndexData { pub index_id: ResolvedDocumentId, pub index_name: IndexName, pub resolved_index_name: TabletIndexName, pub namespace: TableNamespace, } pub fn backfilling_text_index() -> anyhow::Result<IndexMetadata<TableName>> { let table_name: TableName = TABLE_NAME.parse()?; let index_name = IndexName::new(table_name, IndexDescriptor::new("search_index")?)?; let search_field: FieldPath = SEARCH_FIELD.parse()?; let filter_field: FieldPath = "channel".parse()?; let metadata = IndexMetadata::new_backfilling_text_index( index_name, search_field, btreeset![filter_field], ); Ok(metadata) } pub async fn add_document( tx: &mut Transaction<TestRuntime>, table_name: &TableName, text: &str, ) -> anyhow::Result<ResolvedDocumentId> { let document = assert_obj!( "text" => text, "channel" => "#general", ); TestFacingModel::new(tx).insert(table_name, document).await }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server