text_test_utils.rs•13.7 kB
use std::sync::Arc;
use anyhow::Context;
use common::{
bootstrap_model::index::{
text_index::{
FragmentedTextSegment,
TextIndexSnapshot,
TextIndexSnapshotData,
TextIndexState,
},
IndexConfig,
IndexMetadata,
TabletIndexMetadata,
},
document::{
ParsedDocument,
ResolvedDocument,
},
pause::PauseController,
persistence::PersistenceReader,
query::{
Query,
QueryOperator,
QuerySource,
Search,
SearchFilterExpression,
},
runtime::testing::TestRuntime,
types::{
GenericIndexName,
IndexDescriptor,
IndexName,
TabletIndexName,
},
version::MIN_NPM_VERSION_FOR_FUZZY_SEARCH,
};
use futures::try_join;
use maplit::btreeset;
use must_let::must_let;
use search::{
searcher::InProcessSearcher,
Searcher,
SegmentTermMetadataFetcher,
MAX_CANDIDATE_REVISIONS,
};
use storage::Storage;
use sync_types::Timestamp;
use value::{
assert_obj,
DeveloperDocumentId,
FieldPath,
ResolvedDocumentId,
TableName,
TableNamespace,
};
use crate::{
bootstrap_model::index_backfills::IndexBackfillModel,
search_index_workers::{
search_compactor::CompactionConfig,
FlusherType,
},
test_helpers::{
DbFixtures,
DbFixturesArgs,
},
text_index_worker::{
compactor::{
new_text_compactor_for_tests,
TextIndexCompactor,
},
flusher::{
backfill_text_indexes,
FlusherBuilder,
TextIndexFlusher,
},
BuildTextIndexArgs,
TextIndexMetadataWriter,
},
Database,
IndexBackfillMetadata,
IndexModel,
ResolvedQuery,
TestFacingModel,
Transaction,
};
#[derive(Clone)]
pub struct TextFixtures {
pub rt: TestRuntime,
pub storage: Arc<dyn Storage>,
pub db: Database<TestRuntime>,
pub reader: Arc<dyn PersistenceReader>,
searcher: Arc<dyn Searcher>,
segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
writer: TextIndexMetadataWriter<TestRuntime>,
namespace: TableNamespace,
config: CompactionConfig,
}
impl TextFixtures {
pub async fn new(rt: TestRuntime) -> anyhow::Result<Self> {
Self::new_with_config(rt, CompactionConfig::default()).await
}
pub async fn new_with_config(
rt: TestRuntime,
config: CompactionConfig,
) -> anyhow::Result<Self> {
let in_process_searcher = InProcessSearcher::new(rt.clone()).await?;
let DbFixtures {
tp,
db,
search_storage,
searcher,
..
} = DbFixtures::new_with_args(
&rt,
DbFixturesArgs {
searcher: Some(Arc::new(in_process_searcher.clone())),
..Default::default()
},
)
.await?;
let segment_term_metadata_fetcher = Arc::new(in_process_searcher);
let writer = TextIndexMetadataWriter::new(
rt.clone(),
db.clone(),
tp.reader(),
search_storage.clone(),
BuildTextIndexArgs {
search_storage: search_storage.clone(),
segment_term_metadata_fetcher: segment_term_metadata_fetcher.clone(),
},
);
Ok(Self {
rt,
db,
reader: tp.reader(),
storage: search_storage,
segment_term_metadata_fetcher,
writer,
namespace: TableNamespace::test_user(),
searcher,
config,
})
}
pub(crate) fn new_search_flusher_builder(&self) -> FlusherBuilder<TestRuntime> {
FlusherBuilder::new(
self.rt.clone(),
self.db.clone(),
self.reader.clone(),
self.storage.clone(),
self.segment_term_metadata_fetcher.clone(),
self.writer.clone(),
FlusherType::Backfill,
)
}
pub fn new_backfill_text_flusher(&self) -> TextIndexFlusher<TestRuntime> {
self.new_search_flusher_builder().set_soft_limit(0).build()
}
pub fn new_live_text_flusher(&self) -> TextIndexFlusher<TestRuntime> {
self.new_search_flusher_builder()
.set_soft_limit(0)
.set_live_flush()
.build()
}
pub fn new_backfill_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> {
self.new_search_flusher_builder()
.set_soft_limit(2048)
.build()
}
pub fn new_live_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> {
self.new_search_flusher_builder()
.set_soft_limit(2048)
.set_live_flush()
.build()
}
pub fn new_compactor(&self) -> TextIndexCompactor<TestRuntime> {
new_text_compactor_for_tests(
self.rt.clone(),
self.db.clone(),
self.reader.clone(),
self.storage.clone(),
self.searcher.clone(),
self.segment_term_metadata_fetcher.clone(),
self.config.clone(),
)
}
pub async fn enabled_text_index(&self) -> anyhow::Result<IndexData> {
let index_data = self.backfilled_text_index().await?;
let mut tx = self.db.begin_system().await?;
IndexModel::new(&mut tx)
.enable_index_for_testing(index_data.namespace, &index_data.index_name)
.await?;
self.db.commit(tx).await?;
Ok(index_data)
}
pub async fn backfilled_text_index(&self) -> anyhow::Result<IndexData> {
let index_data = self.insert_backfilling_text_index().await?;
self.backfill().await?;
Ok(index_data)
}
pub async fn index_backfill_progress(
&self,
index_id: DeveloperDocumentId,
) -> anyhow::Result<Option<Arc<ParsedDocument<IndexBackfillMetadata>>>> {
let mut tx = self.db.begin_system().await?;
IndexBackfillModel::new(&mut tx)
.existing_backfill_metadata(index_id)
.await
}
pub async fn backfill(&self) -> anyhow::Result<()> {
backfill_text_indexes(
self.rt.clone(),
self.db.clone(),
self.reader.clone(),
self.storage.clone(),
self.segment_term_metadata_fetcher.clone(),
)
.await
}
pub async fn assert_backfilled(&self, index_name: &IndexName) -> anyhow::Result<Timestamp> {
let mut tx = self.db.begin_system().await?;
let new_metadata = IndexModel::new(&mut tx)
.pending_index_metadata(self.namespace, index_name)?
.context("Index missing or in an unexpected state")?
.into_value();
must_let!(let IndexMetadata {
config: IndexConfig::Text {
on_disk_state: TextIndexState::Backfilled {
snapshot: TextIndexSnapshot { ts, .. },
staged: _,
},
..
},
..
} = new_metadata);
Ok(ts)
}
pub async fn insert_backfilling_text_index(&self) -> anyhow::Result<IndexData> {
let mut tx = self.db.begin_system().await?;
let index_metadata = backfilling_text_index()?;
let index_name = &index_metadata.name;
let index_id = IndexModel::new(&mut tx)
.add_application_index(self.namespace, index_metadata.clone())
.await?;
let table_id = tx
.table_mapping()
.namespace(self.namespace)
.id(index_name.table())?
.tablet_id;
self.db.commit(tx).await?;
let resolved_index_name = TabletIndexName::new(table_id, index_name.descriptor().clone())?;
Ok(IndexData {
index_id,
resolved_index_name,
index_name: index_name.clone(),
namespace: self.namespace,
})
}
pub async fn insert_backfilling_text_index_with_document(&self) -> anyhow::Result<IndexData> {
let index_data = self.insert_backfilling_text_index().await?;
let mut tx = self.db.begin_system().await?;
add_document(&mut tx, index_data.index_name.table(), "A long text field").await?;
self.db.commit(tx).await?;
Ok(index_data)
}
pub async fn get_index_metadata(
&self,
index_name: GenericIndexName<TableName>,
) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> {
let mut tx = self.db.begin_system().await?;
let mut index_model = IndexModel::new(&mut tx);
let mut metadata = index_model.enabled_index_metadata(self.namespace, &index_name)?;
if metadata.is_none() {
metadata = index_model.pending_index_metadata(self.namespace, &index_name)?;
}
let metadata = metadata.context("Index is neither pending nor enabled!")?;
Ok(metadata)
}
pub async fn get_segments_metadata(
&self,
index_name: GenericIndexName<TableName>,
) -> anyhow::Result<Vec<FragmentedTextSegment>> {
let metadata = self.get_index_metadata(index_name).await?;
must_let!(let IndexConfig::Text { on_disk_state, .. } = &metadata.config);
let snapshot = match on_disk_state {
TextIndexState::Backfilling(_) => anyhow::bail!("Still backfilling!"),
TextIndexState::Backfilled { snapshot, .. }
| TextIndexState::SnapshottedAt(snapshot) => snapshot,
};
must_let!(let TextIndexSnapshotData::MultiSegment(segments) = &snapshot.data);
Ok(segments.clone())
}
pub async fn replace_document(
&self,
doc_id: ResolvedDocumentId,
text: &str,
) -> anyhow::Result<()> {
let mut tx = self.db.begin_system().await?;
let document = assert_obj!(
"text" => text,
"channel" => "#general",
);
tx.replace_inner(doc_id, document).await?;
self.db.commit(tx).await?;
Ok(())
}
pub async fn add_document(&self, text: &str) -> anyhow::Result<ResolvedDocumentId> {
let table_name = TABLE_NAME.parse::<TableName>()?;
let mut tx = self.db.begin_system().await?;
let doc_id = add_document(&mut tx, &table_name, text).await?;
self.db.commit(tx).await?;
Ok(doc_id)
}
pub async fn enable_index(
&self,
index_name: &GenericIndexName<TableName>,
) -> anyhow::Result<()> {
let mut tx = self.db.begin_system().await?;
IndexModel::new(&mut tx)
.enable_index_for_testing(self.namespace, index_name)
.await?;
self.db.commit(tx).await?;
Ok(())
}
pub async fn search(
&self,
index_name: GenericIndexName<TableName>,
query_string: &str,
) -> anyhow::Result<Vec<ResolvedDocument>> {
let mut tx = self.db.begin_system().await?;
let filters = vec![SearchFilterExpression::Search(
SEARCH_FIELD.parse()?,
query_string.into(),
)];
let search = Search {
table: index_name.table().clone(),
index_name,
filters,
};
let query = Query {
source: QuerySource::Search(search),
operators: vec![QueryOperator::Limit(MAX_CANDIDATE_REVISIONS)],
};
let mut query_stream = ResolvedQuery::new_with_version(
&mut tx,
TableNamespace::test_user(),
query,
Some(MIN_NPM_VERSION_FOR_FUZZY_SEARCH.clone()),
)?;
let mut values = vec![];
while let Some(value) = query_stream.next(&mut tx, None).await? {
values.push(value);
}
Ok(values)
}
pub async fn run_compaction_during_flush(
&self,
pause: PauseController,
label: &'static str,
) -> anyhow::Result<()> {
let flusher = self.new_live_text_flusher();
let hold_guard = pause.hold(label);
let flush = flusher.step();
let compactor = self.new_compactor();
let compact_during_flush = async move {
if let Some(pause_guard) = hold_guard.wait_for_blocked().await {
let (metrics, _) = compactor.step().await?;
for (_index_name, num_segments_compacted) in metrics {
assert!(num_segments_compacted > 0);
}
pause_guard.unpause();
};
Ok::<(), anyhow::Error>(())
};
try_join!(flush, compact_during_flush)?;
Ok(())
}
}
const TABLE_NAME: &str = "table";
const SEARCH_FIELD: &str = "text";
pub struct IndexData {
pub index_id: ResolvedDocumentId,
pub index_name: IndexName,
pub resolved_index_name: TabletIndexName,
pub namespace: TableNamespace,
}
pub fn backfilling_text_index() -> anyhow::Result<IndexMetadata<TableName>> {
let table_name: TableName = TABLE_NAME.parse()?;
let index_name = IndexName::new(table_name, IndexDescriptor::new("search_index")?)?;
let search_field: FieldPath = SEARCH_FIELD.parse()?;
let filter_field: FieldPath = "channel".parse()?;
let metadata = IndexMetadata::new_backfilling_text_index(
index_name,
search_field,
btreeset![filter_field],
);
Ok(metadata)
}
pub async fn add_document(
tx: &mut Transaction<TestRuntime>,
table_name: &TableName,
text: &str,
) -> anyhow::Result<ResolvedDocumentId> {
let document = assert_obj!(
"text" => text,
"channel" => "#general",
);
TestFacingModel::new(tx).insert(table_name, document).await
}