Skip to main content
Glama

Convex MCP server

Official
by get-convex
schema.rs74.8 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, ops::Deref, str::FromStr, }; use anyhow::bail; use common::{ bootstrap_model::index::database_index::IndexedFields, schemas::{ validator::{ FieldValidator, ObjectValidator, Validator, }, DocumentSchema, IndexSchema, TableDefinition, }, types::IndexDescriptor, value::{ FieldPath, IdentifierFieldName, TableName, }, }; use convex_fivetran_common::fivetran_sdk::{ self, Column, DataType as FivetranDataType, }; use convex_fivetran_destination::{ api_types::{ FivetranFieldName, FivetranTableName, }, constants::{ FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR, FIVETRAN_SYNCED_INDEX_DESCRIPTOR, FIVETRAN_SYNC_INDEX_WITHOUT_SOFT_DELETE_FIELDS, FIVETRAN_SYNC_INDEX_WITH_SOFT_DELETE_FIELDS, ID_CONVEX_FIELD_NAME, ID_FIVETRAN_FIELD_NAME, METADATA_CONVEX_FIELD_NAME, SOFT_DELETE_CONVEX_FIELD_NAME, SOFT_DELETE_FIELD_PATH, SOFT_DELETE_FIVETRAN_FIELD_NAME, SYNCED_CONVEX_FIELD_NAME, SYNCED_FIVETRAN_FIELD_NAME, UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME, }, }; use crate::{ error::{ DestinationError, MetadataFieldError, SuggestedIndex, SuggestedTable, TableSchemaError, }, log, }; #[derive(Clone, Debug)] pub struct FivetranTableColumn { pub data_type: FivetranDataType, pub in_primary_key: bool, } #[derive(Debug, derive_more::From, Clone)] pub struct FivetranTableSchema { pub name: FivetranTableName, pub columns: BTreeMap<FivetranFieldName, FivetranTableColumn>, } impl TryFrom<fivetran_sdk::Table> for FivetranTableSchema { type Error = DestinationError; fn try_from(table: fivetran_sdk::Table) -> Result<Self, Self::Error> { let table_name: FivetranTableName = table .name .parse() .map_err(|err| DestinationError::InvalidTableName(table.name, err))?; let columns = table .columns .into_iter() .map(|column| -> Result<_, _> { let data_type = column.r#type(); Ok(( column.name.parse().map_err(|err| { DestinationError::InvalidColumnName(column.name, table_name.clone(), err) })?, FivetranTableColumn { data_type, in_primary_key: column.primary_key, }, )) }) .try_collect()?; Ok(FivetranTableSchema { name: table_name, columns, }) } } #[derive(PartialEq, Eq)] enum Nullability { NonNullable, Nullable, } /// Generates a Convex validator matching the column in the Fivetran schema. /// /// This does not return the only possible validator for this column, but it /// will be the one that will be suggested if the user doesn’t have a /// matching Convex schema. fn suggested_validator(data_type: FivetranDataType, nullability: Nullability) -> Validator { // https://www.notion.so/convex-dev/Fivetran-Destination-Connector-Implementation-bc917ad7f68b483a93212d93dbbf7b0d?pvs=4#d9e675c1fe8b4c5bb54beb26b9f2b721 let non_nullable_validator = match data_type { FivetranDataType::Unspecified => Validator::Any, FivetranDataType::Boolean => Validator::Boolean, FivetranDataType::Short => Validator::Float64, FivetranDataType::Int => Validator::Float64, FivetranDataType::Long => Validator::Int64, FivetranDataType::Decimal => Validator::String, FivetranDataType::Float => Validator::Float64, FivetranDataType::Double => Validator::Float64, FivetranDataType::NaiveDate => Validator::String, FivetranDataType::NaiveTime => Validator::String, FivetranDataType::NaiveDatetime => Validator::String, FivetranDataType::UtcDatetime => Validator::Float64, FivetranDataType::Binary => Validator::Bytes, FivetranDataType::Xml => Validator::String, FivetranDataType::String => Validator::String, FivetranDataType::Json => Validator::Object(ObjectValidator(BTreeMap::new())), }; if nullability == Nullability::Nullable && data_type != FivetranDataType::Unspecified && data_type != FivetranDataType::Json { Validator::Union(vec![non_nullable_validator, Validator::Null]) } else { non_nullable_validator } } pub fn suggested_convex_table( table: fivetran_sdk::Table, ) -> Result<TableDefinition, DestinationError> { let schema = FivetranTableSchema::try_from(table)?; schema.suggested_convex_table() } impl FivetranTableSchema { fn suggested_convex_table(&self) -> anyhow::Result<TableDefinition, DestinationError> { let mut field_validators: BTreeMap<IdentifierFieldName, FieldValidator> = self .columns .iter() .filter(|(field_name, _)| { !field_name.is_fivetran_system_field() && !field_name.is_underscored_field() }) .map(|(field_name, column)| -> anyhow::Result<_, _> { let field_name = field_name.parse().map_err(|err| { DestinationError::UnsupportedColumnName( field_name.clone(), self.name.clone(), err, ) })?; Ok(( field_name, FieldValidator::required_field_type(suggested_validator( column.data_type, Nullability::Nullable, )), )) }) .try_collect()?; field_validators.insert( METADATA_CONVEX_FIELD_NAME.clone(), self.suggested_metadata_validator(), ); let document_type = Some(DocumentSchema::Union(vec![ObjectValidator( field_validators, )])); let table_name: TableName = self .name .parse() .map_err(|err| DestinationError::UnsupportedTableName(self.name.to_string(), err))?; let indexes = self.suggested_indexes().map_err(|err| { DestinationError::IncorrectSchemaForTableWithoutSuggestion(table_name.clone(), err) })?; Ok(TableDefinition { table_name, document_type, indexes, staged_db_indexes: Default::default(), text_indexes: Default::default(), staged_text_indexes: Default::default(), vector_indexes: Default::default(), staged_vector_indexes: Default::default(), }) } fn suggested_indexes( &self, ) -> anyhow::Result<BTreeMap<IndexDescriptor, IndexSchema>, TableSchemaError> { let indexes: Vec<IndexSchema> = vec![self.suggested_primary_key_index()?, self.sync_index()]; Ok(indexes .into_iter() .map(|index| (index.index_descriptor.clone(), index)) .collect()) } fn suggested_primary_key_index(&self) -> anyhow::Result<IndexSchema, TableSchemaError> { let mut primary_key_index_fields: Vec<FieldPath> = vec![]; if self.is_using_soft_deletes() { primary_key_index_fields.push(SOFT_DELETE_FIELD_PATH.clone()); } // We are here suggesting to index the columns in lexicographic order. This is // not the only possible primary key index, as the columns in the primary key // can be placed in an arbitrary order. for (name, column) in self.columns.iter() { if column.in_primary_key { let field_path: FieldPath = name .clone() .try_into() .map_err(|err| TableSchemaError::UnsupportedFieldName(name.clone(), err))?; primary_key_index_fields.push(field_path); } } let fields = IndexedFields::try_from(primary_key_index_fields) .map_err(TableSchemaError::UnsupportedPrimaryKey)?; Ok(IndexSchema { index_descriptor: FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(), fields, }) } fn sync_index(&self) -> IndexSchema { IndexSchema { index_descriptor: FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone(), fields: if self.is_using_soft_deletes() { FIVETRAN_SYNC_INDEX_WITH_SOFT_DELETE_FIELDS.clone() } else { FIVETRAN_SYNC_INDEX_WITHOUT_SOFT_DELETE_FIELDS.clone() }, } } /// Generates the recommended validator for the `fivetran` column of this /// table. /// /// The validator looks like: /// /// ```no_run /// fivetran: v.object({ /// synced: v.number(), /// id: v.string(), // only if the table has no natural primary key /// deleted: v.boolean(), // only if the table is using soft deletes /// columns: v.object({ // only if the (for instance `_field`) /// field: v.union(v.string(), v.null()), // (for instance) /// }), /// }), /// ``` /// /// See: https://github.com/fivetran/fivetran_sdk/blob/main/development-guide.md#system-columns fn suggested_metadata_validator(&self) -> FieldValidator { let mut fields = BTreeMap::new(); fields.insert( SYNCED_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(Validator::Float64), ); if let Some(column) = self.columns.get(&ID_FIVETRAN_FIELD_NAME) { fields.insert( ID_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(suggested_validator( column.data_type, Nullability::NonNullable, )), ); } if self.columns.contains_key(&SOFT_DELETE_FIVETRAN_FIELD_NAME) { fields.insert( SOFT_DELETE_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(Validator::Boolean), ); } let underscored_fields: BTreeMap<_, _> = self .columns .iter() .filter(|(name, _)| name.is_underscored_field()) .flat_map(|(name, column)| { name[1..].parse().ok().map(|name| { ( name, FieldValidator::required_field_type(suggested_validator( column.data_type, Nullability::Nullable, )), ) }) }) .collect(); if !underscored_fields.is_empty() { fields.insert( UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(Validator::Object(ObjectValidator( underscored_fields, ))), ); } FieldValidator::required_field_type(Validator::Object(ObjectValidator(fields))) } fn validate_metadata_validator( &self, metadata_validator: &FieldValidator, ) -> Result<(), MetadataFieldError> { let Validator::Object(metadata_validator) = metadata_validator.validator() else { return Err(MetadataFieldError::InvalidMetadataFieldType); }; // Synced let expected_synced_validator = Some(FieldValidator::required_field_type(Validator::Float64)); if metadata_validator.0.get(SYNCED_CONVEX_FIELD_NAME.deref()) != expected_synced_validator.as_ref() { return Err(MetadataFieldError::InvalidSyncedField); } // Fivetran ID let expected_id_validator = self.columns.get(&ID_FIVETRAN_FIELD_NAME).map(|column| { FieldValidator::required_field_type(suggested_validator( column.data_type, Nullability::NonNullable, )) }); if metadata_validator.0.get(ID_CONVEX_FIELD_NAME.deref()) != expected_id_validator.as_ref() { return Err(MetadataFieldError::InvalidIdField); } // Soft delete let expected_soft_delete_validator = self .columns .contains_key(&SOFT_DELETE_FIVETRAN_FIELD_NAME) .then_some(FieldValidator::required_field_type(Validator::Boolean)); if metadata_validator .0 .get(SOFT_DELETE_CONVEX_FIELD_NAME.deref()) != expected_soft_delete_validator.as_ref() { return Err(MetadataFieldError::InvalidDeletedField); } // `fivetran.columns` in the Convex schema only contains existing columns for metadata_column_name in column_names_in_metadata(metadata_validator)? { if !self.columns.contains_key(&metadata_column_name) { return Err(MetadataFieldError::ColumnInMetadataNotInDataSource( metadata_column_name, )); } } // All non-system columns starting by _ in the Fivetran table exist in the // Convex schema with a type matching their original type let underscored_columns = self .columns .iter() .filter(|(field_name, _)| field_name.is_underscored_field()); for (field_name, column) in underscored_columns { let Some(columns_validator) = metadata_validator .0 .get(UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.deref()) else { return Err(MetadataFieldError::MissingColumnsField(field_name.clone())); }; let Validator::Object(columns_validator) = columns_validator.validator() else { return Err(MetadataFieldError::InvalidColumnsFieldType); }; let actual_validator = columns_validator .0 .get(&field_name[1..]) .ok_or_else(|| MetadataFieldError::MissingFieldInColumns(field_name.clone()))? .validator(); if !is_field_validator_valid(actual_validator, column.data_type) { return Err(MetadataFieldError::IncorrectColumnSpecification { field_name: field_name.clone(), actual_validator: actual_validator.clone(), expected_validator: suggested_validator( column.data_type, Nullability::NonNullable, ), }); } } Ok(()) } /// Validates that the columns in the Convex destination match the Fivetran /// schema. pub fn validate_destination_schema( &self, convex_table_schema: &DocumentSchema, ) -> Result<(), TableSchemaError> { // Ensure that there are no columns with forbidden names if self.columns.contains_key( &FivetranFieldName::from_str(&METADATA_CONVEX_FIELD_NAME) .expect("Expecting the name of the metadata field to also be valid in Fivetran"), ) { return Err(TableSchemaError::SourceTableHasFivetranField); } // Ensure that every destination column is in the source let DocumentSchema::Union(object_validator) = convex_table_schema else { return Err(TableSchemaError::DestinationHasAnySchema); }; let [object_validator] = &object_validator[..] else { return Err(TableSchemaError::DestinationHasMultipleSchemas); }; if let Some(missing_field) = object_validator.0.keys().find(|field_name| { let Ok(fivetran_field_name) = FivetranFieldName::from_str(&field_name.to_string()) else { return false; }; **field_name != *METADATA_CONVEX_FIELD_NAME && !self.columns.contains_key(&fivetran_field_name) }) { return Err(TableSchemaError::FieldMissingInSource( missing_field.clone(), )); } // Validate user columns for (fivetran_field_name, fivetran_column) in self.columns.iter().filter(|(field_name, _)| { !field_name.is_fivetran_system_field() && !field_name.is_underscored_field() }) { let convex_field_name: IdentifierFieldName = IdentifierFieldName::from_str(fivetran_field_name).map_err(|err| { TableSchemaError::UnsupportedFieldName(fivetran_field_name.clone(), err) })?; let actual_validator = object_validator .0 .get(&convex_field_name) .ok_or_else(|| TableSchemaError::MissingField { field_name: fivetran_field_name.clone(), suggested_validator: suggested_validator( fivetran_column.data_type, Nullability::Nullable, ), })? .validator(); if !is_field_validator_valid(actual_validator, fivetran_column.data_type) { return Err(TableSchemaError::NonmatchingFieldValidator { field_name: fivetran_field_name.clone(), actual_validator: actual_validator.clone(), expected_validator: suggested_validator( fivetran_column.data_type, Nullability::Nullable, ), fivetran_type: fivetran_column.data_type, }); } } // Validate the metadata column let Some(metadata_validator) = object_validator.0.get(&METADATA_CONVEX_FIELD_NAME.clone()) else { return Err(TableSchemaError::MissingMetadataColumn { suggested: self.suggested_metadata_validator(), }); }; self.validate_metadata_validator(metadata_validator) .map_err(|error| TableSchemaError::IncorrectMetadataColumn { error, actual: metadata_validator.clone(), suggested: self.suggested_metadata_validator(), })?; Ok(()) } fn is_using_soft_deletes(&self) -> bool { self.columns.contains_key(&SOFT_DELETE_FIVETRAN_FIELD_NAME) } pub fn validate_destination_indexes( &self, indexes: &BTreeMap<IndexDescriptor, IndexSchema>, ) -> Result<(), TableSchemaError> { let indexes_targets: BTreeMap<IndexDescriptor, IndexedFields> = indexes .clone() .values() .map(|index| (index.index_descriptor.clone(), index.fields.clone())) .collect(); // _fivetran_synced index let expected_index = if self.is_using_soft_deletes() { FIVETRAN_SYNC_INDEX_WITH_SOFT_DELETE_FIELDS.deref() } else { FIVETRAN_SYNC_INDEX_WITHOUT_SOFT_DELETE_FIELDS.deref() }; if !indexes_targets .values() .any(|fields| fields == expected_index) { return Err(if self.is_using_soft_deletes() { TableSchemaError::MissingSyncIndexWithSoftDeletes } else { TableSchemaError::MissingSyncIndex }); } // Primary key index let Some(primary_key_index_fields) = indexes_targets.get(&FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR) else { return Err(TableSchemaError::MissingPrimaryKeyIndex(SuggestedIndex( self.suggested_primary_key_index()?, ))); }; if !self.is_primary_key_index(primary_key_index_fields)? { return Err(TableSchemaError::WrongPrimaryKeyIndex(SuggestedIndex( self.suggested_primary_key_index()?, ))); } Ok(()) } /// Validates that a given index is a valid index for the Fivetran primary /// key. fn is_primary_key_index( &self, indexed_fields: &IndexedFields, ) -> anyhow::Result<bool, TableSchemaError> { let primary_key_columns: BTreeSet<FieldPath> = self .columns .iter() .filter(|(_, col)| col.in_primary_key) .map(|(name, _)| -> anyhow::Result<_, _> { let field_path: FieldPath = name .clone() .try_into() .map_err(|err| TableSchemaError::UnsupportedFieldName(name.clone(), err))?; Ok(field_path) }) .try_collect()?; let fields = indexed_fields.deref(); if self.is_using_soft_deletes() { // The index must start with _fivetran_deleted let Some(first_field) = fields.first() else { return Ok(false); }; if first_field != SOFT_DELETE_FIELD_PATH.deref() { return Ok(false); } } let fields_to_compare: BTreeSet<FieldPath> = fields .iter() .skip(if self.is_using_soft_deletes() { 1 } else { 0 }) .cloned() .collect(); Ok(fields_to_compare == primary_key_columns) } pub fn to_convex_table(&self) -> anyhow::Result<TableDefinition> { let table_name: TableName = self.name.parse()?; let mut object_schema = ObjectValidator(BTreeMap::new()); let mut metadata_object_schema = ObjectValidator(BTreeMap::new()); let mut underscored_columns_object_schema = ObjectValidator(BTreeMap::new()); for (field_name, column) in self.columns.iter() { // Handle system columns // Soft delete if field_name == &*SOFT_DELETE_FIVETRAN_FIELD_NAME { metadata_object_schema.0.insert( SOFT_DELETE_CONVEX_FIELD_NAME.clone(), FieldValidator::optional_field_type(Validator::Boolean), ); } // Fivetran pseudo-ID else if field_name == &*ID_FIVETRAN_FIELD_NAME { metadata_object_schema.0.insert( ID_CONVEX_FIELD_NAME.clone(), FieldValidator::optional_field_type(Validator::String), ); } // Synchronization timestamp else if field_name == &*SYNCED_FIVETRAN_FIELD_NAME { metadata_object_schema.0.insert( SYNCED_CONVEX_FIELD_NAME.clone(), FieldValidator::optional_field_type(Validator::Float64), ); } // Columns having a Fivetran name starting by _ else if let Some(field_name) = field_name.strip_prefix('_') { let field_name = field_name.parse()?; let column_type = column.data_type; let field_validator = FieldValidator::optional_field_type(recognize_convex_type(&column_type)?); underscored_columns_object_schema .0 .insert(field_name, field_validator); } // User columns else { let field_name = field_name.parse()?; let column_type = column.data_type; let field_validator = FieldValidator::optional_field_type(recognize_convex_type(&column_type)?); object_schema.0.insert(field_name, field_validator); } } metadata_object_schema.0.insert( UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(Validator::Object( underscored_columns_object_schema, )), ); object_schema.0.insert( METADATA_CONVEX_FIELD_NAME.clone(), FieldValidator::required_field_type(Validator::Object(metadata_object_schema)), ); let indexes = self.suggested_indexes()?; let document_schema = DocumentSchema::Union(vec![object_schema]); Ok(TableDefinition { table_name, indexes, staged_db_indexes: BTreeMap::new(), text_indexes: BTreeMap::new(), staged_text_indexes: BTreeMap::new(), vector_indexes: BTreeMap::new(), staged_vector_indexes: BTreeMap::new(), document_type: Some(document_schema), }) } } fn column_names_in_metadata( metadata_validator: &ObjectValidator, ) -> Result<Vec<FivetranFieldName>, MetadataFieldError> { let Some(columns_validator) = metadata_validator .0 .get(UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.deref()) else { return Ok(Vec::new()); }; let Validator::Object(columns_validator) = columns_validator.validator() else { return Err(MetadataFieldError::InvalidColumnsFieldType); }; let column_names: Vec<_> = columns_validator .0 .keys() .map(|convex_field_name| { format!("_{convex_field_name}").parse().map_err(|err| { MetadataFieldError::UnsupportedColumnName(convex_field_name.clone(), err) }) }) .try_collect()?; Ok(column_names) } /// Validates that the table in the Convex schema is compatible with the source /// Fivetran table. /// /// For the same Fivetran table, there can be multiple valid Convex schemas. For /// instance, the fields in the primary key index can be in an arbitrary order. /// Also, fields in Convex can either be nullable (e.g. `v.union(v.string(), /// v.null())`) or not (e.g. `v.string()`). pub fn validate_destination_schema_table( fivetran_table: fivetran_sdk::Table, convex_table: &TableDefinition, ) -> Result<(), DestinationError> { let fivetran_table_name = FivetranTableName::from_str(&fivetran_table.name) .map_err(|err| DestinationError::InvalidTableName(fivetran_table.name.clone(), err))?; let table_name = TableName::from_str(&fivetran_table.name).map_err(|err| { DestinationError::UnsupportedTableName(fivetran_table_name.to_string(), err) })?; let fivetran_table_schema = FivetranTableSchema::try_from(fivetran_table)?; let Some(convex_table_schema) = &convex_table.document_type else { return Err(DestinationError::MissingTable( table_name, SuggestedTable(fivetran_table_schema.suggested_convex_table()?), )); }; fivetran_table_schema .validate_destination_schema(convex_table_schema) .map_err(|err| { fivetran_table_schema .suggested_convex_table() .map(|suggested_table| { DestinationError::IncorrectSchemaForTable( table_name.clone(), err, SuggestedTable(suggested_table), ) }) .unwrap_or_else(|e| e) })?; fivetran_table_schema .validate_destination_indexes(&convex_table.indexes) .map_err(|err| { fivetran_table_schema .suggested_convex_table() .map(|suggested_table| { DestinationError::IncorrectSchemaForTable( table_name.clone(), err, SuggestedTable(suggested_table), ) }) .unwrap_or_else(|e| e) })?; Ok(()) } pub fn is_field_validator_valid(actual_validator: &Validator, data_type: FivetranDataType) -> bool { let expected_validator = suggested_validator(data_type, Nullability::NonNullable); actual_validator == &expected_validator || actual_validator == &Validator::Union(vec![Validator::Null, expected_validator.clone()]) || actual_validator == &Validator::Union(vec![expected_validator, Validator::Null]) } /// Converts the given Convex schema table to a Fivetran table. This is used in /// the implementation of the `AlterTable` endpoint so that Fivetran can be /// aware of the current state of the Convex destination. pub fn to_fivetran_table( convex_table: &TableDefinition, ) -> anyhow::Result<fivetran_sdk::Table, DestinationError> { let fivetran_columns = to_fivetran_columns(convex_table)?; Ok(fivetran_sdk::Table { name: convex_table.table_name.to_string(), columns: fivetran_columns, }) } /// Returns the validator for the `fivetran` field of the given Convex table /// definition. /// /// Returns `None` if the `fivetran` field isn’t specified or is incorrectly /// specified. fn metadata_field_validator(validator: &ObjectValidator) -> Option<&ObjectValidator> { // System columns let field_validator = validator.0.get(&METADATA_CONVEX_FIELD_NAME.clone())?; let Validator::Object(metadata_object_validator) = field_validator.validator() else { return None; }; Some(metadata_object_validator) } fn user_columns(table_def: &TableDefinition, validator: &ObjectValidator) -> Vec<Column> { let primary_key_index = table_def .indexes .get(&FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR); if primary_key_index.is_none() { log(&format!( "The table {} in your Convex schema is missing a `by_primary_key` index, so Fivetran \ will not able to identify the columns of its primary key.", table_def.table_name )); } validator .0 .iter() .filter(|(field_name, _)| **field_name != *METADATA_CONVEX_FIELD_NAME) .flat_map(|(field_name, field_validator)| { let fivetran_data_type = recognize_fivetran_type(field_validator.validator()).ok(); if fivetran_data_type.is_none() { log(&format!( "The type of the field `field_name` in the table `{}` isn’t supported by \ Fivetran.", table_def.table_name )) } Some(fivetran_sdk::Column { name: field_name.to_string(), r#type: fivetran_data_type.unwrap_or(FivetranDataType::Unspecified) as i32, primary_key: primary_key_index.is_some_and(|primary_key_index| { primary_key_index .fields .contains(&FieldPath::for_root_field(field_name.clone())) }), params: None, }) }) .collect() } fn to_fivetran_columns( table_def: &TableDefinition, ) -> Result<Vec<fivetran_sdk::Column>, DestinationError> { let Some(DocumentSchema::Union(validators)) = &table_def.document_type else { return Err(DestinationError::DestinationHasAnySchema( table_def.table_name.clone(), )); }; let [validator] = &validators[..] else { return Err(DestinationError::DestinationHasMultipleSchemas( table_def.table_name.clone(), )); }; let mut columns: Vec<fivetran_sdk::Column> = Vec::new(); // System columns let metadata_validator = metadata_field_validator(validator); if let Some(metadata_validator) = metadata_validator { // Soft delete if metadata_validator .0 .contains_key(&SOFT_DELETE_CONVEX_FIELD_NAME.clone()) { columns.push(fivetran_sdk::Column { name: SOFT_DELETE_FIVETRAN_FIELD_NAME.to_string(), r#type: FivetranDataType::Boolean as i32, primary_key: false, params: None, }); } // Fivetran pseudo-ID if let Some(field_validator) = metadata_validator.0.get(&ID_CONVEX_FIELD_NAME.clone()) { let id_field_type = recognize_fivetran_type(field_validator.validator()).ok(); if id_field_type.is_none() { log(&format!( "The type of the field `convex.id` in the table `{}` isn’t supported by \ Fivetran.", table_def.table_name )) } columns.push(fivetran_sdk::Column { name: ID_FIVETRAN_FIELD_NAME.to_string(), r#type: id_field_type.unwrap_or(FivetranDataType::Unspecified) as i32, primary_key: true, params: None, }); } // Synchronization timestamp columns.push(fivetran_sdk::Column { name: SYNCED_FIVETRAN_FIELD_NAME.to_string(), r#type: FivetranDataType::UtcDatetime as i32, primary_key: false, params: None, }); // Columns having a Fivetran name starting by _ if let Some(columns_validator) = metadata_validator .0 .get(&UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.clone()) { if let Validator::Object(columns_validator) = columns_validator.validator() { let primary_key_index = table_def .indexes .get(&FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR); for (column_name, column_validator) in columns_validator.0.iter() { let field_path = FieldPath::new(vec![ METADATA_CONVEX_FIELD_NAME.clone(), UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.clone(), column_name.clone(), ]) .expect("A three-column field path is always valid"); columns.push(fivetran_sdk::Column { name: format!("_{column_name}"), r#type: recognize_fivetran_type(column_validator.validator()) .unwrap_or(FivetranDataType::Unspecified) as i32, primary_key: primary_key_index.is_some_and(|primary_key_index| { primary_key_index.fields.contains(&field_path) }), params: None, }); } }; } } // User columns columns.append(&mut user_columns(table_def, validator)); Ok(columns) } fn recognize_fivetran_type(validator: &Validator) -> anyhow::Result<FivetranDataType> { match validator { Validator::Float64 => Ok(FivetranDataType::Double), Validator::Int64 => Ok(FivetranDataType::Long), Validator::Boolean => Ok(FivetranDataType::Boolean), Validator::String => Ok(FivetranDataType::String), Validator::Bytes => Ok(FivetranDataType::Binary), Validator::Object(_) | Validator::Array(_) => Ok(FivetranDataType::Json), // Allow nullable types Validator::Union(validators) => match &validators[..] { [v] | [Validator::Null, v] | [v, Validator::Null] => recognize_fivetran_type(v), _ => bail!("Unsupported union"), }, Validator::Null | Validator::Literal(_) | Validator::Id(_) | Validator::Set(_) | Validator::Record(..) | Validator::Map(..) | Validator::Any => bail!("The type of this Convex column isn’t supported by Fivetran."), } } fn recognize_convex_type(data_type: &FivetranDataType) -> anyhow::Result<Validator> { let validator = match data_type { FivetranDataType::Double => Validator::Float64, FivetranDataType::Long => Validator::Int64, FivetranDataType::Boolean => Validator::Boolean, FivetranDataType::String => Validator::String, FivetranDataType::Binary => Validator::Bytes, FivetranDataType::Json => Validator::Object(ObjectValidator(BTreeMap::new())), _ => anyhow::bail!("The type of this Convex column isn’t supported by Fivetran."), }; Ok(Validator::Union(vec![validator, Validator::Null])) } #[cfg(test)] mod tests { use std::collections::{ BTreeMap, BTreeSet, HashSet, }; use cmd_util::env::env_config; use common::{ bootstrap_model::index::database_index::IndexedFields, object_validator, schemas::{ validator::{ FieldValidator, ObjectValidator, Validator, }, DocumentSchema, IndexSchema, TableDefinition, }, types::IndexDescriptor, value::FieldPath, }; use convex_fivetran_common::fivetran_sdk::{ self, Column, DataType as FivetranDataType, Table, }; use convex_fivetran_destination::constants::{ FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR, FIVETRAN_SYNCED_INDEX_DESCRIPTOR, }; use maplit::{ btreemap, btreeset, hashset, }; use must_let::must_let; use pretty_assertions::assert_eq; use proptest::prelude::*; use super::{ validate_destination_schema_table, FivetranTableColumn, FivetranTableSchema, }; use crate::{ error::DestinationError, schema::to_fivetran_table, testing::fivetran_table_strategy, }; fn fivetran_table( columns: BTreeMap<&str, FivetranDataType>, primary_key_columns: HashSet<&str>, ) -> fivetran_sdk::Table { for col_name in &primary_key_columns { if !columns.contains_key(col_name) { panic!("Unknown column `{col_name}` in the primary key"); } } Table { name: "my_table".into(), columns: columns .into_iter() .map(|(col_name, col_type)| Column { name: col_name.into(), r#type: col_type as i32, primary_key: primary_key_columns.contains(col_name), params: None, }) .collect(), } } fn fivetran_table_schema( columns: BTreeMap<&str, FivetranDataType>, primary_key_columns: BTreeSet<&str>, ) -> FivetranTableSchema { FivetranTableSchema { name: "my_table".parse().unwrap(), columns: columns .into_iter() .map(|(name, data_type)| { ( name.parse().unwrap(), FivetranTableColumn { data_type, in_primary_key: primary_key_columns.contains(name), }, ) }) .collect(), } } fn convex_table( fields: BTreeMap<&str, FieldValidator>, indexes: BTreeMap<&str, Vec<FieldPath>>, ) -> TableDefinition { TableDefinition { table_name: "table_name".parse().unwrap(), staged_db_indexes: Default::default(), text_indexes: Default::default(), staged_text_indexes: Default::default(), vector_indexes: Default::default(), staged_vector_indexes: Default::default(), document_type: Some(DocumentSchema::Union(vec![ObjectValidator( fields .into_iter() .map(|(field_name, field_validator)| { (field_name.parse().unwrap(), field_validator) }) .collect(), )])), indexes: convex_indexes(indexes), } } fn convex_indexes( indexes: BTreeMap<&str, Vec<FieldPath>>, ) -> BTreeMap<IndexDescriptor, IndexSchema> { indexes .into_iter() .map(|(index_name, index_fields)| { let index_descriptor = IndexDescriptor::new(index_name.to_string()).unwrap(); ( index_descriptor.clone(), IndexSchema { index_descriptor, fields: IndexedFields::try_from(index_fields).unwrap(), }, ) }) .collect() } #[test] fn it_allows_correct_convex_tables() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"id"}, ), &convex_table( btreemap! { "id" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::Null, Validator::Int64, ])), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec!["id".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap(); Ok(()) } #[test] fn it_errors_when_a_field_has_an_incorrect_type() -> anyhow::Result<()> { must_let!( let Err( DestinationError::IncorrectSchemaForTable(_, _, _) ) = validate_destination_schema_table( fivetran_table( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"id"}, ), &convex_table( btreemap! { "id" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::Null, Validator::Float64, // incorrect ])), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec!["id".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) ); Ok(()) } #[test] fn it_allows_convex_tables_when_a_field_isnt_nullable_in_convex() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"id"}, ), &convex_table( btreemap! { "id" => FieldValidator::required_field_type(Validator::Int64), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "id".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap(); Ok(()) } #[test] fn it_allows_convex_tables_with_optional_fivetran_system_columns() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "name" => FivetranDataType::String, "_fivetran_synced" => FivetranDataType::UtcDatetime, "_fivetran_id" => FivetranDataType::String, "_fivetran_deleted" => FivetranDataType::Boolean, }, hashset! {"_fivetran_id"}, ), &convex_table( btreemap! { "name" => FieldValidator::required_field_type(Validator::String), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "id" => FieldValidator::required_field_type(Validator::String), "deleted" => FieldValidator::required_field_type(Validator::Boolean), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "id".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap(); Ok(()) } #[test] fn it_allows_tables_with_fivetran_columns_starting_by_underscore() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "_key" => FivetranDataType::String, "_nullable_field" => FivetranDataType::String, "_non_nullable_field" => FivetranDataType::String, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"_key"}, ), &convex_table( btreemap! { "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "columns" => FieldValidator::required_field_type(Validator::Object(object_validator!( "key" => FieldValidator::required_field_type(Validator::String), "nullable_field" => FieldValidator::required_field_type(Validator::String), "non_nullable_field" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::String, Validator::Null, ])), ))), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "columns".parse()?, "key".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap(); Ok(()) } #[test] fn it_refuses_tables_where_a_fivetran_field_with_underscore_is_missing_in_convex( ) -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "_field" => FivetranDataType::String, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"_field"}, ), &convex_table( btreemap! { "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "columns".parse()?, "field".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap_err(); Ok(()) } #[test] fn it_refuses_tables_with_extraneous_columns_in_the_metadata_field() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "name" => FivetranDataType::String, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"name"}, ), &convex_table( btreemap! { "name" => FieldValidator::required_field_type(Validator::String), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "columns" => FieldValidator::required_field_type(Validator::Object(object_validator!( "extraneous" => FieldValidator::required_field_type(Validator::String), ))), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "name".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap_err(); Ok(()) } #[test] fn it_allows_convex_tables_with_multiple_columns_in_the_primary_key() -> anyhow::Result<()> { validate_destination_schema_table( fivetran_table( btreemap! { "a" => FivetranDataType::String, "b" => FivetranDataType::String, "c" => FivetranDataType::String, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, hashset! {"a", "b", "c"}, ), &convex_table( btreemap! { "a" => FieldValidator::required_field_type(Validator::String), "b" => FieldValidator::required_field_type(Validator::String), "c" => FieldValidator::required_field_type(Validator::String), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "deleted" => FieldValidator::required_field_type(Validator::Boolean), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ // _fivetran_deleted must be the first field in the index FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, // The other fields can be in an arbitrary order FieldPath::new(vec!["b".parse()?])?, FieldPath::new(vec!["a".parse()?])?, FieldPath::new(vec!["c".parse()?])?, ], "sync_index_named_arbitrarily" => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], }, ), ) .unwrap(); Ok(()) } #[test] fn it_requires_two_system_indexes() -> anyhow::Result<()> { assert!(fivetran_table_schema( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, btreeset! {"id"}, ) .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec!["id".parse()?])?, ], "my_sync_index" => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], })) .is_ok()); Ok(()) } #[test] fn it_fails_if_a_required_index_is_missing() -> anyhow::Result<()> { let table_schema = fivetran_table_schema( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, }, btreeset! {"id"}, ); let primary_key_index = vec![FieldPath::new(vec!["id".parse()?])?]; let sync_index = vec![FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?]; assert!(table_schema .validate_destination_indexes(&convex_indexes(btreemap! {})) .is_err()); assert!(table_schema .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => primary_key_index, })) .is_err()); assert!(table_schema .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => sync_index, })) .is_err()); Ok(()) } #[test] fn required_indexes_include_the_soft_delete_field_if_it_exists() -> anyhow::Result<()> { fivetran_table_schema( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, "_fivetran_deleted" => FivetranDataType::Boolean, }, btreeset! {"id"}, ) .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["id".parse()?])?, ], "my_sync_index" => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], })) .expect("Failed to validate indexes"); // The soft delete field must come before the other fields assert!(fivetran_table_schema( btreemap! { "id" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, "_fivetran_deleted" => FivetranDataType::Boolean, }, btreeset! {"id"} ) .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["id".parse()?])?, ], "my_sync_index" => vec![ // Wrong FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["_creationTime".parse()?])?, ], })) .is_err()); Ok(()) } #[test] fn primary_key_columns_can_be_in_an_arbitrary_order_in_the_index() -> anyhow::Result<()> { let fivetran_table_schema = fivetran_table_schema( btreemap! { "a" => FivetranDataType::Long, "b" => FivetranDataType::Long, "c" => FivetranDataType::Long, "_fivetran_synced" => FivetranDataType::UtcDatetime, "_fivetran_deleted" => FivetranDataType::Boolean, }, btreeset! {"a", "b", "c"}, ); let sync_index = vec![ FieldPath::new(vec!["fivetran".parse()?, "deleted".parse()?])?, FieldPath::new(vec!["fivetran".parse()?, "synced".parse()?])?, FieldPath::new(vec!["_creationTime".parse()?])?, ]; assert!(fivetran_table_schema .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["b".parse()?])?, FieldPath::new(vec!["a".parse()?])?, FieldPath::new(vec!["c".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => sync_index.clone(), })) .is_ok()); assert!(fivetran_table_schema .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["c".parse()?])?, FieldPath::new(vec!["b".parse()?])?, FieldPath::new(vec!["a".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => sync_index.clone(), })) .is_ok()); // The _fivetran_deleted field must be first assert!(fivetran_table_schema .validate_destination_indexes(&convex_indexes(btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec!["c".parse()?])?, FieldPath::new(vec!["b".parse()?])?, FieldPath::new(vec!["a".parse()?])?, // Error FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => sync_index, })) .is_err()); Ok(()) } #[test] fn it_converts_convex_tables_to_fivetran_tables() -> anyhow::Result<()> { assert_eq!( to_fivetran_table(&convex_table( btreemap! { "id" => FieldValidator::required_field_type(Validator::Int64), "name" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::Null, Validator::String, ])), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec!["id".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, ], }, ))?, Table { name: "table_name".into(), columns: vec![ Column { name: "_fivetran_synced".to_string(), r#type: FivetranDataType::UtcDatetime as i32, primary_key: false, params: None, }, Column { name: "id".to_string(), r#type: FivetranDataType::Long as i32, primary_key: true, params: None, }, Column { name: "name".to_string(), r#type: FivetranDataType::String as i32, primary_key: false, params: None, }, ], } ); Ok(()) } #[test] fn it_converts_convex_tables_to_fivetran_tables_with_soft_deletes_and_fivetran_id( ) -> anyhow::Result<()> { assert_eq!( to_fivetran_table(&convex_table( btreemap! { "data" => FieldValidator::required_field_type(Validator::Bytes), "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "id" => FieldValidator::required_field_type(Validator::String), "deleted" => FieldValidator::required_field_type(Validator::Boolean), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec!["id".parse()?])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "deleted".parse()?, ])?, FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, ], }, ))?, Table { name: "table_name".into(), columns: vec![ Column { name: "_fivetran_deleted".to_string(), r#type: FivetranDataType::Boolean as i32, primary_key: false, params: None, }, Column { name: "_fivetran_id".to_string(), r#type: FivetranDataType::String as i32, primary_key: true, params: None, }, Column { name: "_fivetran_synced".to_string(), r#type: FivetranDataType::UtcDatetime as i32, primary_key: false, params: None, }, Column { name: "data".to_string(), r#type: FivetranDataType::Binary as i32, primary_key: false, params: None, }, ], } ); Ok(()) } #[test] fn it_converts_convex_tables_to_fivetran_tables_containing_columns_with_underscore( ) -> anyhow::Result<()> { assert_eq!( to_fivetran_table(&convex_table( btreemap! { "fivetran" => FieldValidator::required_field_type(Validator::Object( object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "columns" => FieldValidator::required_field_type(Validator::Object( object_validator!( "key" => FieldValidator::required_field_type(Validator::String), "nullable_field" => FieldValidator::required_field_type(Validator::String), "non_nullable_field" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::String, Validator::Null, ])), ) )), ), )), }, btreemap! { FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "columns".parse()?, "key".parse()?, ])?, ], FIVETRAN_SYNCED_INDEX_DESCRIPTOR.as_str() => vec![ FieldPath::new(vec![ "fivetran".parse()?, "synced".parse()?, ])?, ], }, ))?, Table { name: "table_name".into(), columns: vec![ Column { name: "_fivetran_synced".to_string(), r#type: FivetranDataType::UtcDatetime as i32, primary_key: false, params: None, }, Column { name: "_key".to_string(), r#type: FivetranDataType::String as i32, primary_key: true, params: None, }, Column { name: "_non_nullable_field".to_string(), r#type: FivetranDataType::String as i32, primary_key: false, params: None, }, Column { name: "_nullable_field".to_string(), r#type: FivetranDataType::String as i32, primary_key: false, params: None, }, ], } ); Ok(()) } #[test] fn it_suggests_convex_tables() -> anyhow::Result<()> { let fivetran_table = fivetran_table_schema( btreemap! { "name" => FivetranDataType::String, "slug" => FivetranDataType::String, "_key" => FivetranDataType::String, "_fivetran_synced" => FivetranDataType::UtcDatetime, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_id" => FivetranDataType::String, }, btreeset! {"slug", "_fivetran_id", "_key"}, ); assert_eq!( fivetran_table.suggested_convex_table()?, TableDefinition { table_name: "my_table".parse()?, indexes: btreemap! { FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone() => IndexSchema { index_descriptor: FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone(), fields: vec![ "fivetran.deleted".parse()?, "fivetran.synced".parse()?, "_creationTime".parse()?, ].try_into()? }, FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone() => IndexSchema { index_descriptor: FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(), fields: vec![ "fivetran.deleted".parse()?, "fivetran.id".parse()?, "fivetran.columns.key".parse()?, "slug".parse()?, ].try_into()? } }, staged_db_indexes: btreemap! {}, staged_text_indexes: btreemap! {}, staged_vector_indexes: btreemap! {}, document_type: Some(DocumentSchema::Union(vec![object_validator!( "name" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::String, Validator::Null, ])), "slug" => FieldValidator::required_field_type(Validator::Union(vec![ Validator::String, Validator::Null, ])), "fivetran" => FieldValidator::required_field_type(Validator::Object(object_validator!( "synced" => FieldValidator::required_field_type(Validator::Float64), "deleted" => FieldValidator::required_field_type(Validator::Boolean), "id" => FieldValidator::required_field_type(Validator::String), "columns" => FieldValidator::required_field_type(Validator::Object(object_validator!( "key" => FieldValidator::required_field_type( Validator::Union(vec![ Validator::String, Validator::Null ]) ) ))), ))), )])), text_indexes: Default::default(), vector_indexes: Default::default(), }, ); Ok(()) } proptest! { #![proptest_config(ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() })] #[test] fn suggested_convex_schemas_are_always_valid(fivetran_table in fivetran_table_strategy()) { let schema: FivetranTableSchema = fivetran_table.clone().try_into()?; let suggested_convex_table = schema.suggested_convex_table()?; prop_assert!( validate_destination_schema_table(fivetran_table, &suggested_convex_table).is_ok() ); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server