table.rs•30 kB
use std::{
collections::BTreeSet,
sync::LazyLock,
};
use anyhow::Context;
use async_recursion::async_recursion;
use common::{
bootstrap_model::{
index::{
database_index::IndexedFields,
index_validation_error,
DeveloperIndexMetadata,
IndexMetadata,
TabletIndexMetadata,
INDEX_TABLE,
},
tables::{
TableMetadata,
TableState,
TABLES_TABLE,
},
},
document::{
ParseDocument,
ParsedDocument,
},
interval::Interval,
query::{
Order,
Query,
},
runtime::Runtime,
types::{
GenericIndexName,
TableName,
TabletIndexName,
},
value::TabletIdAndTableNumber,
};
use errors::ErrorMetadata;
use value::{
DeveloperDocumentId,
FieldPath,
ResolvedDocumentId,
TableNamespace,
TableNumber,
TabletId,
};
use crate::{
defaults::bootstrap_system_tables,
system_tables::{
SystemIndex,
SystemTable,
},
table_summary::table_summary_bootstrapping_error,
IndexModel,
ResolvedQuery,
SchemaModel,
SystemMetadataModel,
Transaction,
};
/// Each instance is limited to a certain number of user tables.
pub const MAX_USER_TABLES: usize = 10000;
/// Reserve the first 10K for system tables. User tables start at 10K
/// Older instances (before ~12/2023) may have lower numbered user tables.
pub const NUM_RESERVED_SYSTEM_TABLE_NUMBERS: u32 = 10000;
/// Reserve the first 512 table numbers for legacy tables, before we created
/// default system table numbers and reserving numbers for system tables.
/// Instances created before ~01/2024 may have lower numbered user or system
/// tables, but instances created after will have all tables >512.
pub const NUM_RESERVED_LEGACY_TABLE_NUMBERS: u32 = 512;
pub static TABLES_BY_NAME_INDEX: LazyLock<SystemIndex<TablesTable>> =
LazyLock::new(|| SystemIndex::new("by_name", [&NAME_FIELD_PATH]).unwrap());
pub static NAME_FIELD_PATH: LazyLock<FieldPath> =
LazyLock::new(|| "name".parse().expect("Invalid built-in field"));
pub struct TablesTable;
impl SystemTable for TablesTable {
type Metadata = TableMetadata;
fn table_name() -> &'static TableName {
&TABLES_TABLE
}
fn indexes() -> Vec<SystemIndex<Self>> {
vec![TABLES_BY_NAME_INDEX.clone()]
}
}
pub struct TableModel<'a, RT: Runtime> {
tx: &'a mut Transaction<RT>,
}
impl<'a, RT: Runtime> TableModel<'a, RT> {
pub fn new(tx: &'a mut Transaction<RT>) -> Self {
Self { tx }
}
/// Returns the number of documents in the table, up-to-date with the
/// current transaction.
pub async fn count(
&mut self,
namespace: TableNamespace,
table: &TableName,
) -> anyhow::Result<Option<u64>> {
let count = if let Some(tablet_id) = self
.tx
.table_mapping()
.namespace(namespace)
.id_if_exists(table)
{
self.count_tablet(tablet_id).await?
} else {
Some(0)
};
Ok(count)
}
/// Returns the number of documents in the table, up-to-date with the
/// current transaction.
pub async fn must_count(
&mut self,
namespace: TableNamespace,
table: &TableName,
) -> anyhow::Result<u64> {
self.count(namespace, table).await?.ok_or_else(|| {
table_summary_bootstrapping_error(Some("Table count unavailable while bootstrapping"))
})
}
pub async fn count_tablet(&mut self, tablet_id: TabletId) -> anyhow::Result<Option<u64>> {
// Add read dependency on the entire table.
// But we haven't explicitly read the documents, so don't record_read_documents.
self.tx.reads.record_indexed_directly(
TabletIndexName::by_id(tablet_id),
IndexedFields::by_id(),
Interval::all(),
)?;
// Get table count at the beginning of the transaction, then add the delta from
// the transaction so far.
let snapshot_count = self.tx.count_snapshot.count(tablet_id).await?;
let Some(snapshot_count) = snapshot_count else {
return Ok(None);
};
let transaction_delta = self.tx.table_count_deltas.get(&tablet_id).unwrap_or(&0);
let result = if *transaction_delta < 0 {
snapshot_count
.checked_sub(transaction_delta.unsigned_abs())
.context("Count underflow")
} else {
snapshot_count
.checked_add(*transaction_delta as u64)
.context("Count overflow")
};
Ok(Some(result?))
}
pub async fn must_count_tablet(&mut self, tablet_id: TabletId) -> anyhow::Result<u64> {
self.count_tablet(tablet_id).await?.ok_or_else(|| {
table_summary_bootstrapping_error(Some("Table count unavailable while bootstrapping"))
})
}
pub(crate) fn doc_table_id_to_name(
&mut self,
doc: ParsedDocument<TabletIndexMetadata>,
) -> anyhow::Result<ParsedDocument<DeveloperIndexMetadata>> {
doc.map(|metadata| metadata.map_table(&self.tx.table_mapping().tablet_to_name()))
}
pub(crate) fn doc_table_name_to_id(
&mut self,
namespace: TableNamespace,
doc: ParsedDocument<DeveloperIndexMetadata>,
) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> {
doc.map(|metadata| {
metadata.map_table(
&self
.tx
.table_mapping()
.namespace(namespace)
.name_to_tablet(),
)
})
}
pub fn table_exists(&mut self, namespace: TableNamespace, table: &TableName) -> bool {
self.tx
.table_mapping()
.namespace(namespace)
.name_exists(table)
}
pub fn count_user_tables(&mut self) -> usize {
self.tx.table_mapping().iter_active_user_tables().count()
}
pub async fn delete_active_table(
&mut self,
namespace: TableNamespace,
table_name: TableName,
) -> anyhow::Result<()> {
if !self.table_exists(namespace, &table_name) {
return Ok(());
}
SchemaModel::new(self.tx, namespace)
.enforce_table_deletion(table_name.clone())
.await?;
let table_id_and_number = self
.tx
.table_mapping()
.namespace(namespace)
.id(&table_name)?;
self.delete_table_by_id_bypassing_schema_enforcement(table_id_and_number.tablet_id)
.await
}
pub async fn delete_hidden_table(&mut self, tablet_id: TabletId) -> anyhow::Result<()> {
let table_metadata = self.get_table_metadata(tablet_id).await?;
// We don't need to validate hidden table with the schema.
anyhow::ensure!(table_metadata.state == TableState::Hidden);
self.delete_table_by_id_bypassing_schema_enforcement(tablet_id)
.await
}
pub async fn delete_table(&mut self, tablet_id: TabletId) -> anyhow::Result<()> {
self.delete_table_by_id_bypassing_schema_enforcement(tablet_id)
.await
}
async fn delete_table_by_id_bypassing_schema_enforcement(
&mut self,
tablet_id: TabletId,
) -> anyhow::Result<()> {
for index in IndexModel::new(self.tx)
.all_indexes_on_table(tablet_id)
.await?
{
let index_id = index.id();
SystemMetadataModel::new_global(self.tx)
.delete(index_id)
.await?;
}
let table_metadata = self.get_table_metadata(tablet_id).await?;
let table_doc_id = table_metadata.id();
let table_metadata = table_metadata.into_value();
let updated_table_metadata = TableMetadata {
name: table_metadata.name,
number: table_metadata.number,
state: TableState::Deleting,
namespace: table_metadata.namespace,
};
SystemMetadataModel::new_global(self.tx)
.replace(table_doc_id, updated_table_metadata.try_into()?)
.await?;
Ok(())
}
pub async fn get_table_metadata(
&mut self,
tablet_id: TabletId,
) -> anyhow::Result<ParsedDocument<TableMetadata>> {
let table_doc_id = self.tx.bootstrap_tables().table_resolved_doc_id(tablet_id);
self.tx
.get(table_doc_id)
.await?
.context(format!("Couldn't find table metadata for {tablet_id}"))?
.parse()
}
#[cfg(any(test, feature = "testing"))]
pub async fn table_is_empty(
&mut self,
namespace: TableNamespace,
table: &TableName,
) -> anyhow::Result<bool> {
Ok(self.must_count(namespace, table).await? == 0)
}
// Checks both _tables and _virtual_tables to find a non-conflicting table
// number
pub async fn next_user_table_number(
&mut self,
namespace: TableNamespace,
) -> anyhow::Result<TableNumber> {
self.next_table_number(false, namespace).await
}
pub async fn next_system_table_number(
&mut self,
namespace: TableNamespace,
) -> anyhow::Result<TableNumber> {
self.next_table_number(true, namespace).await
}
async fn next_table_number(
&mut self,
is_system: bool,
namespace: TableNamespace,
) -> anyhow::Result<TableNumber> {
let occupied_table_numbers = {
let mut occupied_table_numbers = BTreeSet::new();
let tables_query = Query::full_table_scan(TABLES_TABLE.clone(), Order::Asc);
let mut query_stream =
ResolvedQuery::new(self.tx, TableNamespace::Global, tables_query)?;
while let Some(table_metadata) = query_stream.next(self.tx, None).await? {
let parsed_metadata: ParsedDocument<TableMetadata> = table_metadata.parse()?;
if parsed_metadata.namespace == namespace {
occupied_table_numbers.insert(parsed_metadata.number);
}
}
occupied_table_numbers
};
let mut candidate_table_number = TableNumber::try_from(if is_system {
NUM_RESERVED_LEGACY_TABLE_NUMBERS
} else {
NUM_RESERVED_SYSTEM_TABLE_NUMBERS
})?
.increment()?;
while occupied_table_numbers.contains(&candidate_table_number) {
candidate_table_number = candidate_table_number.increment()?;
}
Ok(candidate_table_number)
}
/// Checks for conflicts when replacing table, e.g. snapshot import.
/// A table with the same name can be replaced with a different table
/// number, but if a different table has the same table number then we have
/// a problem.
fn check_can_overwrite(
&mut self,
namespace: TableNamespace,
table: &TableName,
table_number: Option<TableNumber>,
tables_affected_in_import: &BTreeSet<(TableNamespace, TableName)>,
) -> anyhow::Result<()> {
let Some(table_number) = table_number else {
return Ok(());
};
let Some(existing_table_by_number) = self
.tx
.table_mapping()
.namespace(namespace)
.name_by_number_if_exists(table_number)
.cloned()
else {
// No existing table with this number.
return Ok(());
};
if let Some(existing_virtual_table) = self
.tx
.virtual_system_mapping()
.system_to_virtual_table(&existing_table_by_number)
{
let existing_system_table = self
.tx
.virtual_system_mapping()
.virtual_to_system_table(existing_virtual_table)?;
if existing_system_table == table {
// Setting physical table to have the same table number as its virtual table is
// allowed.
return Ok(());
}
anyhow::bail!(ErrorMetadata::bad_request(
"TableConflict",
format!("New table {table} has IDs that conflict with existing system table")
));
}
// Don't mess with creating conflicts with bootstrap system tables.
anyhow::ensure!(
bootstrap_system_tables()
.iter()
.all(|t| *t.table_name() != existing_table_by_number),
"Conflict with bootstrap system table {existing_table_by_number}",
);
if existing_table_by_number == *table {
// Overwriting in-place, same table name and number.
return Ok(());
}
if tables_affected_in_import.contains(&(namespace, existing_table_by_number.clone())) {
// Overwriting would create a table number conflict with an
// existing table, but that existing table is also being
// overwritten.
return Ok(());
}
if existing_table_by_number.is_system() {
anyhow::bail!(ErrorMetadata::bad_request(
"TableConflict",
format!(
"New table `{table}` has IDs that conflict with existing internal table. \
Consider importing this table without `_id` fields or import into a new \
deployment."
)
));
}
anyhow::bail!(ErrorMetadata::bad_request(
"TableConflict",
format!(
"New table {table} has IDs that conflict with existing table \
{existing_table_by_number}"
)
));
}
pub async fn activate_table(
&mut self,
tablet_id: TabletId,
table_name: &TableName,
table_number: TableNumber,
tables_affected_in_import: &BTreeSet<(TableNamespace, TableName)>,
) -> anyhow::Result<u64> {
let mut documents_deleted = 0;
let table_metadata = self.get_table_metadata(tablet_id).await?;
match table_metadata.state {
TableState::Active => return Ok(0),
TableState::Deleting => anyhow::bail!("cannot activate {table_name} which is deleting"),
TableState::Hidden => {},
}
let namespace = table_metadata.namespace;
self.check_can_overwrite(
namespace,
table_name,
Some(table_number),
tables_affected_in_import,
)?;
if self.table_exists(namespace, table_name) {
let existing_table_by_name = self
.tx
.table_mapping()
.namespace(namespace)
.id(table_name)?;
documents_deleted += self.must_count(namespace, table_name).await?;
self.delete_table_by_id_bypassing_schema_enforcement(existing_table_by_name.tablet_id)
.await?
}
let table_metadata = TableMetadata::new_with_state(
namespace,
table_name.clone(),
table_number,
TableState::Active,
);
let tables_table_id = self.tables_table_id()?;
let table_doc_id = ResolvedDocumentId::new(
tables_table_id.tablet_id,
DeveloperDocumentId::new(tables_table_id.table_number, tablet_id.0),
);
SystemMetadataModel::new_global(self.tx)
.replace(table_doc_id, table_metadata.try_into()?)
.await?;
Ok(documents_deleted)
}
fn tables_table_id(&mut self) -> anyhow::Result<TabletIdAndTableNumber> {
self.tx
.table_mapping()
.namespace(TableNamespace::Global)
.name_to_id()(TABLES_TABLE.clone())
}
#[async_recursion]
pub async fn insert_table_metadata(
&mut self,
namespace: TableNamespace,
table: &TableName,
) -> anyhow::Result<()> {
// Don't implicitly create table metadata for system tables.
if table.is_system() {
return Ok(());
}
self._insert_table_metadata(namespace, table, None, TableState::Active)
.await?;
Ok(())
}
pub async fn insert_table_for_import(
&mut self,
namespace: TableNamespace,
table: &TableName,
table_number: Option<TableNumber>,
tables_affected_in_import: &BTreeSet<(TableNamespace, TableName)>,
) -> anyhow::Result<TabletIdAndTableNumber> {
anyhow::ensure!(
bootstrap_system_tables()
.iter()
.all(|t| t.table_name() != table),
"Conflict with bootstrap system table {table}",
);
let existing_table_by_name = self
.tx
.table_mapping()
.namespace(namespace)
.id_and_number_if_exists(table)
.map(|id| id.table_number);
let table_number = table_number.or(existing_table_by_name);
self.check_can_overwrite(namespace, table, table_number, tables_affected_in_import)?;
self._insert_table_metadata(namespace, table, table_number, TableState::Hidden)
.await
}
async fn _insert_table_metadata(
&mut self,
namespace: TableNamespace,
table: &TableName,
table_number: Option<TableNumber>,
state: TableState,
) -> anyhow::Result<TabletIdAndTableNumber> {
if state == TableState::Active && self.table_exists(namespace, table) {
let table_id = self.tx.table_mapping().namespace(namespace).id(table)?;
anyhow::ensure!(table_number.is_none() || table_number == Some(table_id.table_number));
Ok(table_id)
} else {
anyhow::ensure!(
self.count_user_tables() < MAX_USER_TABLES,
index_validation_error::too_many_tables(MAX_USER_TABLES)
);
let table_number = if let Some(table_number) = table_number {
anyhow::ensure!(
state == TableState::Hidden
|| !self
.tx
.table_mapping()
.namespace(namespace)
.table_number_exists()(table_number),
ErrorMetadata::bad_request(
"InvalidId",
format!("conflict trying to create {table} with number {table_number}")
)
);
table_number
} else {
self.next_user_table_number(namespace).await?
};
let table_metadata =
TableMetadata::new_with_state(namespace, table.clone(), table_number, state);
let table_doc_id = SystemMetadataModel::new_global(self.tx)
.insert_metadata(&TABLES_TABLE, table_metadata.try_into()?)
.await?;
let tablet_id = TabletId(table_doc_id.internal_id());
// Add the system defined indexes for the newly created table. Since the newly
// created table is empty, we can start these indexes as `Enabled`.
let metadata = IndexMetadata::new_enabled(
GenericIndexName::by_id(tablet_id),
IndexedFields::by_id(),
);
SystemMetadataModel::new_global(self.tx)
.insert_metadata(&INDEX_TABLE, metadata.try_into()?)
.await?;
let metadata = IndexMetadata::new_enabled(
GenericIndexName::by_creation_time(tablet_id),
IndexedFields::creation_time(),
);
SystemMetadataModel::new_global(self.tx)
.insert_metadata(&INDEX_TABLE, metadata.try_into()?)
.await?;
Ok(TabletIdAndTableNumber {
tablet_id,
table_number,
})
}
}
#[cfg(any(test, feature = "testing"))]
pub async fn insert_table_metadata_for_test(
&mut self,
namespace: TableNamespace,
table: &TableName,
) -> anyhow::Result<TabletIdAndTableNumber> {
self._insert_table_metadata(namespace, table, None, TableState::Active)
.await
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use common::{
bootstrap_model::schema::{
SchemaMetadata,
SchemaState,
},
db_schema,
document::ParseDocument,
object_validator,
schemas::{
validator::{
FieldValidator,
Validator,
},
DatabaseSchema,
DocumentSchema,
},
};
use must_let::must_let;
use runtime::testing::TestRuntime;
use value::{
TableName,
TableNamespace,
};
use crate::{
bootstrap_model::table::NUM_RESERVED_SYSTEM_TABLE_NUMBERS,
test_helpers::new_tx,
SchemaModel,
TableModel,
Transaction,
};
#[convex_macro::test_runtime]
async fn delete_table_with_missing_table_does_nothing(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
let mut model = TableModel::new(&mut tx);
model
.delete_active_table(TableNamespace::test_user(), TableName::from_str("missing")?)
.await
}
#[convex_macro::test_runtime]
async fn insert_table_metadata_inserts_table(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
assert!(model.table_exists(TableNamespace::test_user(), &table_name));
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_valid_table_deletes_it(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await?;
assert!(!model.table_exists(TableNamespace::test_user(), &table_name));
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_table_not_in_active_schema_deletes_it(
rt: TestRuntime,
) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
set_active_schema(&mut tx, db_schema!()).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await?;
assert!(!model.table_exists(TableNamespace::test_user(), &table_name));
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_other_table_in_active_schema_deletes_it(
rt: TestRuntime,
) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
set_active_schema(&mut tx, db_schema!("other_table" => DocumentSchema::Any)).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await?;
assert!(!model.table_exists(TableNamespace::test_user(), &table_name));
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_table_in_active_schema_fails(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
set_active_schema(&mut tx, db_schema!("my_table" => DocumentSchema::Any)).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
let result = model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await;
let error = result.unwrap_err();
assert!(
error
.to_string()
.contains("Failed to delete table \"my_table\" because it appears in the schema"),
"{error}"
);
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_table_in_active_schema_as_reference_fails(
rt: TestRuntime,
) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
set_active_schema(&mut tx, db_schema!("other_table" =>
DocumentSchema::Union(vec![object_validator!(
"field" => FieldValidator::required_field_type(Validator::Id("my_table".parse()?))
)])
)).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
let result = model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await;
let error = result.unwrap_err();
assert!(
error.to_string().contains(
"Failed to delete table \"my_table\" because `v.id(\"my_table\")` appears in the \
schema of table \"other_table\""
),
"{error}"
);
Ok(())
}
#[convex_macro::test_runtime]
async fn delete_table_with_table_in_validated_schema_fails_schema(
rt: TestRuntime,
) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
let mut schema_model = SchemaModel::new_root_for_test(&mut tx);
let (schema_id, _state) = schema_model
.submit_pending(db_schema!("my_table" => DocumentSchema::Any))
.await?;
schema_model.mark_validated(schema_id).await?;
let mut model = TableModel::new(&mut tx);
let table_name = TableName::from_str("my_table")?;
model
.insert_table_metadata(TableNamespace::test_user(), &table_name)
.await?;
model
.delete_active_table(TableNamespace::test_user(), table_name.clone())
.await?;
let mut schema_model = SchemaModel::new_root_for_test(&mut tx);
assert!(schema_model
.get_by_state(SchemaState::Validated)
.await?
.is_none());
let schema = ParseDocument::<SchemaMetadata>::parse(tx.get(schema_id).await?.unwrap())?;
must_let!(let SchemaState::Failed { error, table_name } = &schema.state);
assert_eq!(table_name, &Some("my_table".to_string()));
assert!(
error
.to_string()
.contains("Failed to delete table \"my_table\" because it appears in the schema"),
"{error}"
);
Ok(())
}
#[convex_macro::test_runtime]
async fn test_next_table_number(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
let mut model = TableModel::new(&mut tx);
let namespace = TableNamespace::test_user();
let next_table_number: u32 = model.next_user_table_number(namespace).await?.into();
assert_eq!(
(NUM_RESERVED_SYSTEM_TABLE_NUMBERS + 1) as usize,
next_table_number as usize
);
let table_name = TableName::from_str("my_table")?;
model.insert_table_metadata(namespace, &table_name).await?;
let new_next_table_number: u32 = model.next_user_table_number(namespace).await?.into();
assert_eq!(next_table_number + 1, new_next_table_number);
Ok(())
}
#[convex_macro::test_runtime]
async fn test_next_table_number_conflict(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = new_tx(rt).await?;
tx.create_system_table_testing(TableNamespace::test_user(), &"_system1".parse()?, None)
.await?;
TableModel::new(&mut tx)
.insert_table_metadata_for_test(TableNamespace::test_user(), &"users".parse()?)
.await?;
tx.create_system_table_testing(TableNamespace::test_user(), &"_system2".parse()?, None)
.await?;
let table_mapping = tx.table_mapping().namespace(TableNamespace::test_user());
let system_table_number1: u32 = table_mapping.id(&"_system1".parse()?)?.table_number.into();
let system_table_number2: u32 = table_mapping.id(&"_system2".parse()?)?.table_number.into();
let user_table_number: u32 = table_mapping.id(&"users".parse()?)?.table_number.into();
assert!(user_table_number > NUM_RESERVED_SYSTEM_TABLE_NUMBERS);
assert!(system_table_number1 < NUM_RESERVED_SYSTEM_TABLE_NUMBERS);
assert!(system_table_number2 < NUM_RESERVED_SYSTEM_TABLE_NUMBERS);
assert!(system_table_number1 + 1 == system_table_number2);
Ok(())
}
async fn set_active_schema(
tx: &mut Transaction<TestRuntime>,
schema: DatabaseSchema,
) -> anyhow::Result<()> {
let mut schema_model = SchemaModel::new_root_for_test(tx);
let (schema_id, _state) = schema_model.submit_pending(schema).await?;
schema_model.mark_validated(schema_id).await?;
schema_model.mark_active(schema_id).await?;
Ok(())
}
}