Skip to main content
Glama

Convex MCP server

Official
by get-convex
file_reader.rs33.4 kB
use std::collections::BTreeMap; use anyhow::{ anyhow, Context, }; use async_compression::tokio::bufread::{ GzipDecoder, ZstdDecoder, }; use chrono::{ DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, }; use common::async_compat::TokioAsyncReadCompatExt; use convex_fivetran_common::fivetran_sdk::{ value_type::Inner as FivetranValue, Compression as FivetranFileCompression, DataType as FivetranDataType, FileParams, }; use convex_fivetran_destination::api_types::FivetranFieldName; use futures::{ stream::BoxStream, StreamExt, }; use prost_types::Timestamp; use tokio::{ fs::File, io::{ self, BufReader, }, }; use crate::{ aes::{ Aes256Key, AesDecryptor, }, schema::FivetranTableSchema, }; #[derive(Debug, PartialEq)] pub enum FivetranFileValue { Value(FivetranValue), Unmodified, } #[derive(Debug, PartialEq)] pub struct FileRow(pub BTreeMap<FivetranFieldName, FivetranFileValue>); pub struct FivetranReaderParams { unmodified_string: String, null_string: String, } impl From<FileParams> for FivetranReaderParams { fn from(value: FileParams) -> Self { Self { unmodified_string: value.unmodified_string, null_string: value.null_string, } } } /// See https://github.com/fivetran/fivetran_sdk/blob/main/development-guide.md#encryption pub enum FivetranFileEncryption { None, Aes { key: Aes256Key }, } pub async fn create_csv_deserializer( file_path: &str, compression: FivetranFileCompression, encryption: FivetranFileEncryption, ) -> anyhow::Result<csv_async::AsyncDeserializer<impl futures::AsyncRead + Unpin + Send>> { let reader = BufReader::new(File::open(file_path).await.context("Couldn't open file")?); // Decryption let reader: Box<dyn io::AsyncRead + Unpin + Send> = match encryption { FivetranFileEncryption::None => Box::new(reader), FivetranFileEncryption::Aes { key } => Box::new(AesDecryptor::new(reader, key).await?), }; // Decompression let reader: Box<dyn io::AsyncRead + Unpin + Send> = match compression { FivetranFileCompression::Off => reader, FivetranFileCompression::Zstd => Box::new(ZstdDecoder::new(BufReader::new(reader))), FivetranFileCompression::Gzip => Box::new(GzipDecoder::new(BufReader::new(reader))), }; let deserializer = csv_async::AsyncDeserializer::from_reader(Box::new(reader.compat())); if !deserializer.has_headers() { anyhow::bail!("Missing file headers"); } Ok(deserializer) } pub fn read_rows<'a, R>( deserializer: &'a mut csv_async::AsyncDeserializer<R>, params: &'a FivetranReaderParams, schema: &'a FivetranTableSchema, ) -> BoxStream<'a, anyhow::Result<FileRow>> where R: futures::AsyncRead + Unpin + Send, { deserializer .deserialize::<BTreeMap<String, String>>() .map(|result| match result { Ok(field_map) => Ok(FileRow( field_map .into_iter() .map(|(field, value)| -> anyhow::Result<_> { let field: FivetranFieldName = field.parse().context("Couldn't parse field")?; let file_value = if value == params.unmodified_string { FivetranFileValue::Unmodified } else if value == params.null_string { FivetranFileValue::Value(FivetranValue::Null(true)) } else { let column = schema .columns .get(&field) .context("Column not in schema")? .to_owned(); FivetranFileValue::Value( try_parse_fivetran_value(value.clone(), column.data_type).context( format!( "Couldn't parse field {} value {} as {:?} from file", field, value, column.data_type, ), )?, ) }; Ok((field, file_value)) }) .try_collect()?, )), Err(csv_error) => Err(anyhow!(csv_error).context("Can’t deserialize the CSV file")), }) .boxed() } /// Parses a Fivetran value in the format /// [used in Fivetran CSV files](https://github.com/fivetran/fivetran_sdk/blob/main/development-guide.md#examples-of-data-types). fn try_parse_fivetran_value( value: String, target_type: FivetranDataType, ) -> anyhow::Result<FivetranValue> { Ok(match target_type { FivetranDataType::Unspecified => { anyhow::bail!("Can’t parse a value to an unspecified type") }, FivetranDataType::Boolean => FivetranValue::Bool(value.parse()?), FivetranDataType::Short => FivetranValue::Short(value.parse()?), FivetranDataType::Int => FivetranValue::Int(value.parse()?), FivetranDataType::Long => FivetranValue::Long(value.parse()?), FivetranDataType::Float => FivetranValue::Float(value.parse()?), FivetranDataType::Double => FivetranValue::Double(value.parse()?), FivetranDataType::NaiveDate => FivetranValue::NaiveDate(Timestamp { seconds: NaiveDateTime::new( NaiveDate::parse_from_str(&value, "%Y-%m-%d")?, NaiveTime::default(), ) .and_utc() .timestamp(), nanos: 0, }), FivetranDataType::NaiveTime => { let dt = NaiveDateTime::new( NaiveDate::default(), NaiveTime::parse_from_str(&value, "%H:%M:%S%.f")?, ) .and_utc(); FivetranValue::NaiveTime(Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, }) }, FivetranDataType::NaiveDatetime => { let dt = NaiveDateTime::parse_from_str(&value, "%Y-%m-%dT%H:%M:%S%.f")?.and_utc(); FivetranValue::NaiveDatetime(Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, }) }, FivetranDataType::UtcDatetime => { let date_time = DateTime::parse_from_rfc3339(&value)?; FivetranValue::UtcDatetime(Timestamp { seconds: date_time.timestamp(), nanos: date_time.nanosecond() as i32, }) }, FivetranDataType::Binary => FivetranValue::Binary(base64::decode(value)?), FivetranDataType::Decimal => FivetranValue::Decimal(value), FivetranDataType::Xml => FivetranValue::Xml(value), FivetranDataType::String => FivetranValue::String(value), FivetranDataType::Json => FivetranValue::Json(value), }) } #[cfg(test)] fn to_csv_string_representation(value: &FivetranValue) -> Option<String> { match value { FivetranValue::Null(_) => None, FivetranValue::Bool(v) => Some(v.to_string()), FivetranValue::Short(v) => Some(v.to_string()), FivetranValue::Int(v) => Some(v.to_string()), FivetranValue::Long(v) => Some(v.to_string()), FivetranValue::Float(v) => Some(v.to_string()), FivetranValue::Double(v) => Some(v.to_string()), FivetranValue::NaiveDate(Timestamp { seconds, nanos }) => { DateTime::from_timestamp(*seconds, *nanos as u32) .map(|dt| dt.naive_utc().date().format("%Y-%m-%d").to_string()) }, FivetranValue::NaiveTime(Timestamp { seconds, nanos }) => { DateTime::from_timestamp(*seconds, *nanos as u32) .map(|dt| dt.time().format("%H:%M:%S%.f").to_string()) }, FivetranValue::NaiveDatetime(Timestamp { seconds, nanos }) => { DateTime::from_timestamp(*seconds, *nanos as u32) .map(|dt| dt.naive_utc().format("%Y-%m-%dT%H:%M:%S%.f").to_string()) }, FivetranValue::UtcDatetime(Timestamp { seconds, nanos }) => { DateTime::from_timestamp(*seconds, *nanos as u32) .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string()) }, FivetranValue::Binary(bytes) => Some(base64::encode(bytes)), FivetranValue::Decimal(v) => Some(v.clone()), FivetranValue::String(v) => Some(v.clone()), FivetranValue::Json(v) => Some(v.clone()), FivetranValue::Xml(v) => Some(v.clone()), } } #[cfg(test)] mod tests { use std::{ collections::BTreeMap, env, str::FromStr, }; use cmd_util::env::env_config; use convex_fivetran_common::fivetran_sdk::{ value_type::Inner as FivetranValue, Compression as FivetranFileCompression, DataType as FivetranDataType, }; use convex_fivetran_destination::api_types::{ FivetranFieldName, FivetranTableName, }; use futures::StreamExt; use maplit::btreemap; use proptest::prelude::*; use prost_types::Timestamp; use tokio::{ fs::File, io::AsyncReadExt, }; use crate::{ aes::Aes256Key, convert::fivetran_data_type, file_reader::{ create_csv_deserializer, read_rows, to_csv_string_representation, try_parse_fivetran_value, FileRow, FivetranFileEncryption, FivetranFileValue, FivetranReaderParams, }, schema::{ FivetranTableColumn, FivetranTableSchema, }, }; fn fixture_path(fixture_name: &str) -> String { String::from( env::current_dir() .unwrap() .join("src") .join("tests") .join("fixtures") .join(fixture_name) .to_str() .unwrap(), ) } fn fivetran_table_schema( fields: BTreeMap<&'static str, FivetranDataType>, ) -> FivetranTableSchema { FivetranTableSchema { name: FivetranTableName::from_str("my_table").unwrap(), columns: fields .into_iter() .map(|(name, data_type)| { ( FivetranFieldName::from_str(name).unwrap(), FivetranTableColumn { data_type, in_primary_key: false, }, ) }) .collect(), } } #[tokio::test] async fn test_parse_a_fivetran_csv_file() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("magic-nullvalue"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Long, "title" => FivetranDataType::String, "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_update.csv"), FivetranFileCompression::Off, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![ FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(3)), FivetranFieldName::from_str("title")? => FivetranFileValue::Unmodified, FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Int(15)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1707436799, nanos: 999_999_999, })), }), FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(2)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::String("The empire strikes back".into())), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Unmodified, FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1707436800, nanos: 0, })), }), FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(99)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Int(99)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1704773419, nanos: 156057706, })), }), ], ); Ok(()) } #[tokio::test] async fn test_backslash_in_csv_files_are_not_escape_chars() -> anyhow::Result<()> { // See https://github.com/fivetran/fivetran_sdk/blob/main/development-guide.md#csv let params = FivetranReaderParams { unmodified_string: String::from("unmod"), null_string: String::from("null"), }; let schema = fivetran_table_schema(btreemap! { "path" => FivetranDataType::String, }); let mut deserializer = create_csv_deserializer( &fixture_path("backslash.csv"), FivetranFileCompression::Off, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![FileRow(btreemap! { FivetranFieldName::from_str("path")? => FivetranFileValue::Value(FivetranValue::String("C:\\Program Files\\".to_string())), }),], ); Ok(()) } #[tokio::test] async fn test_parse_an_uncompressed_fivetran_csv_file() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("null-m8yilkvPsNulehxl2G6pmSQ3G3WWdLP"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Long, "title" => FivetranDataType::String, "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_delete.csv"), FivetranFileCompression::Off, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(1)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(true)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 0, nanos: 0, })), }),], ); Ok(()) } #[tokio::test] async fn test_parse_a_zstd_compressed_fivetran_csv_file() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("null-m8yilkvPsNulehxl2G6pmSQ3G3WWdLP"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Long, "title" => FivetranDataType::String, "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_delete.csv.zst"), FivetranFileCompression::Zstd, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(1)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(true)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 0, nanos: 0, })), }),], ); Ok(()) } #[tokio::test] async fn test_parse_an_encrypted_file() -> anyhow::Result<()> { let mut key = Aes256Key::default(); File::open(fixture_path("books_batch_1_insert.csv.zst.aes.key")) .await? .read_exact(&mut key.0) .await?; let params = FivetranReaderParams { unmodified_string: String::from(""), null_string: String::from(""), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Long, "title" => FivetranDataType::String, "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_batch_1_insert.csv.zst.aes"), FivetranFileCompression::Zstd, FivetranFileEncryption::Aes { key }, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![ FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(1)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::String("The Hitchhiker's Guide to the Galaxy".into())), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Int(42)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(try_parse_fivetran_value("2024-01-08T18:52:07.754685370Z".into(), FivetranDataType::UtcDatetime)?), }), FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(2)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::String("The Lord of the Rings".into())), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Int(1)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(try_parse_fivetran_value("2024-01-08T18:52:07.754685370Z".into(), FivetranDataType::UtcDatetime)?), }), ], ); Ok(()) } #[tokio::test] async fn test_parse_a_gzip_compressed_fivetran_csv_file() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("null-m8yilkvPsNulehxl2G6pmSQ3G3WWdLP"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Long, "title" => FivetranDataType::String, "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_delete.csv.gz"), FivetranFileCompression::Gzip, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert_eq!( rows.collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>()?, vec![FileRow(btreemap! { FivetranFieldName::from_str("id")? => FivetranFileValue::Value(FivetranValue::Long(1)), FivetranFieldName::from_str("title")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("magic_number")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(true)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 0, nanos: 0, })), }),], ); Ok(()) } #[tokio::test] async fn test_parsing_fails_when_a_column_is_missing_from_the_schema() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("magic-nullvalue"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Short, "title" => FivetranDataType::String, // magic_number missing "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_update.csv"), FivetranFileCompression::Off, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert!(rows .collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>() .is_err()); Ok(()) } #[tokio::test] async fn test_parsing_fails_when_a_column_is_incorrect_in_the_schema() -> anyhow::Result<()> { let params = FivetranReaderParams { unmodified_string: String::from("unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3"), null_string: String::from("magic-nullvalue"), }; let schema = fivetran_table_schema(btreemap! { "id" => FivetranDataType::Short, "title" => FivetranDataType::Int, // ! "magic_number" => FivetranDataType::Int, "_fivetran_deleted" => FivetranDataType::Boolean, "_fivetran_synced" => FivetranDataType::UtcDatetime, }); let mut deserializer = create_csv_deserializer( &fixture_path("books_update.csv"), FivetranFileCompression::Off, FivetranFileEncryption::None, ) .await?; let rows = read_rows(&mut deserializer, &params, &schema); assert!(rows .collect::<Vec<_>>() .await .into_iter() .collect::<Result<Vec<_>, _>>() .is_err()); Ok(()) } #[test] fn test_parse_boolean() { assert_eq!( try_parse_fivetran_value("true".into(), FivetranDataType::Boolean).unwrap(), FivetranValue::Bool(true) ); assert_eq!( try_parse_fivetran_value("false".into(), FivetranDataType::Boolean).unwrap(), FivetranValue::Bool(false) ); } #[test] fn test_parse_short() { assert_eq!( try_parse_fivetran_value("-32768".into(), FivetranDataType::Short).unwrap(), FivetranValue::Short(-32768) ); assert_eq!( try_parse_fivetran_value("123".into(), FivetranDataType::Short).unwrap(), FivetranValue::Short(123) ); assert_eq!( try_parse_fivetran_value("32767".into(), FivetranDataType::Short).unwrap(), FivetranValue::Short(32767) ); } #[test] fn test_parse_int() { assert_eq!( try_parse_fivetran_value("-2147483648".into(), FivetranDataType::Int).unwrap(), FivetranValue::Int(-2147483648) ); assert_eq!( try_parse_fivetran_value("456".into(), FivetranDataType::Int).unwrap(), FivetranValue::Int(456) ); assert_eq!( try_parse_fivetran_value("2147483647".into(), FivetranDataType::Int).unwrap(), FivetranValue::Int(2147483647) ); } #[test] fn test_parse_long() { assert_eq!( try_parse_fivetran_value("-9223372036854775808".into(), FivetranDataType::Long) .unwrap(), FivetranValue::Long(i64::MIN) ); assert_eq!( try_parse_fivetran_value("789".into(), FivetranDataType::Long).unwrap(), FivetranValue::Long(789) ); assert_eq!( try_parse_fivetran_value("9223372036854775807".into(), FivetranDataType::Long).unwrap(), FivetranValue::Long(i64::MAX) ); } #[test] fn test_parse_decimal() { assert_eq!( try_parse_fivetran_value( "3.1415926535897932384626433832".into(), FivetranDataType::Decimal ) .unwrap(), FivetranValue::Decimal("3.1415926535897932384626433832".into()) ); } #[test] fn test_parse_float() { assert_eq!( try_parse_fivetran_value("1.23".into(), FivetranDataType::Float).unwrap(), FivetranValue::Float(1.23) ); assert_eq!( try_parse_fivetran_value("3.4028236E+24".into(), FivetranDataType::Float).unwrap(), FivetranValue::Float(3.402_823_6E24) ); } #[test] fn test_parse_double() { assert_eq!( try_parse_fivetran_value("4.56".into(), FivetranDataType::Double).unwrap(), FivetranValue::Double(4.56) ); assert_eq!( try_parse_fivetran_value("-2.2250738585072014E-308".into(), FivetranDataType::Double) .unwrap(), FivetranValue::Double(-2.2250738585072014E-308) ); } #[test] fn test_parse_naive_date() { assert_eq!( try_parse_fivetran_value("2007-12-03".into(), FivetranDataType::NaiveDate).unwrap(), FivetranValue::NaiveDate(Timestamp { seconds: 1196640000, nanos: 0, }) ); } #[test] fn test_parse_naive_time() { assert_eq!( try_parse_fivetran_value("19:41:30".into(), FivetranDataType::NaiveTime).unwrap(), FivetranValue::NaiveTime(Timestamp { seconds: 19 * 60 * 60 + 41 * 60 + 30, nanos: 0, }) ); assert_eq!( try_parse_fivetran_value("19:41:30.500".into(), FivetranDataType::NaiveTime).unwrap(), FivetranValue::NaiveTime(Timestamp { seconds: 19 * 60 * 60 + 41 * 60 + 30, nanos: 500_000_000, }) ); } #[test] fn test_parse_naive_datetime() { assert_eq!( try_parse_fivetran_value( "2007-12-03T10:15:30".into(), FivetranDataType::NaiveDatetime ) .unwrap(), FivetranValue::NaiveDatetime(Timestamp { seconds: 1196676930, nanos: 0, }) ); assert_eq!( try_parse_fivetran_value( "1970-01-01T00:00:00".into(), FivetranDataType::NaiveDatetime ) .unwrap(), FivetranValue::NaiveDatetime(Timestamp { seconds: 0, nanos: 0, }) ); assert_eq!( try_parse_fivetran_value( "1970-01-01T00:00:00.123".into(), FivetranDataType::NaiveDatetime ) .unwrap(), FivetranValue::NaiveDatetime(Timestamp { seconds: 0, nanos: 123_000_000, }) ); } #[test] fn test_parse_utc_datetime() { assert_eq!( try_parse_fivetran_value( "2007-12-03T10:15:30.123Z".into(), FivetranDataType::UtcDatetime ) .unwrap(), FivetranValue::UtcDatetime(Timestamp { seconds: 1196676930, nanos: 123_000_000, }) ); } #[test] fn test_parse_binary() { assert_eq!( try_parse_fivetran_value("SGVsbG8gd29ybGQ=".to_string(), FivetranDataType::Binary) .unwrap(), FivetranValue::Binary(b"Hello world".to_vec()) ); } #[test] fn test_parse_xml() { assert_eq!( try_parse_fivetran_value("<tag></tag>".into(), FivetranDataType::Xml).unwrap(), FivetranValue::Xml("<tag></tag>".into()) ); } #[test] fn test_parse_string() { assert_eq!( try_parse_fivetran_value("Hello world".into(), FivetranDataType::String).unwrap(), FivetranValue::String("Hello world".into()) ); } #[test] fn test_parse_json() { assert_eq!( try_parse_fivetran_value("{\"key\": \"value\"}".into(), FivetranDataType::Json) .unwrap(), FivetranValue::Json("{\"key\": \"value\"}".into()) ); } proptest! { #![proptest_config(ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() })] #[test] fn test_fivetran_csv_conversion_roundtrips(value in any::<FivetranValue>()) { if let FivetranValue::Null(_) = value { return Ok(()); } let value_type = fivetran_data_type(&value).unwrap(); let as_str = to_csv_string_representation(&value).unwrap(); prop_assert_eq!(value, try_parse_fivetran_value(as_str, value_type).unwrap()); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server