Skip to main content
Glama

Convex MCP server

Official
by get-convex
convert.rs26.8 kB
use std::{ collections::BTreeMap, str::FromStr, }; use anyhow::{ bail, Context, }; use chrono::DateTime; use common::value::{ ConvexObject, ConvexValue, FieldName, }; use convex_fivetran_common::fivetran_sdk::value_type::Inner as FivetranValue; #[cfg(test)] use convex_fivetran_common::fivetran_sdk::DataType as FivetranDataType; use convex_fivetran_destination::constants::{ ID_CONVEX_FIELD_NAME, ID_FIVETRAN_FIELD_NAME, METADATA_CONVEX_FIELD_NAME, SOFT_DELETE_CONVEX_FIELD_NAME, SOFT_DELETE_FIVETRAN_FIELD_NAME, SYNCED_CONVEX_FIELD_NAME, SYNCED_FIVETRAN_FIELD_NAME, UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME, }; use prost_types::Timestamp; use serde_json::Value as JsonValue; use crate::file_reader::{ FileRow, FivetranFileValue, }; fn fivetran_to_convex_value(value: FivetranValue) -> anyhow::Result<ConvexValue> { // https://www.notion.so/convex-dev/Fivetran-Destination-Connector-Implementation-bc917ad7f68b483a93212d93dbbf7b0d?pvs=4#b54c641656284be28451f4be06adf0ab Ok(match value { FivetranValue::Null(_) => ConvexValue::Null, FivetranValue::Bool(v) => ConvexValue::Boolean(v), FivetranValue::Short(v) => ConvexValue::Float64(v.into()), FivetranValue::Int(v) => ConvexValue::Float64(v.into()), FivetranValue::Long(v) => ConvexValue::Int64(v), FivetranValue::Float(v) => ConvexValue::Float64(v.into()), FivetranValue::Double(v) => ConvexValue::Float64(v), FivetranValue::NaiveDate(Timestamp { seconds, nanos }) => ConvexValue::String( DateTime::from_timestamp(seconds, nanos as u32) .context("Invalid datetime value")? .naive_utc() .date() .format("%Y-%m-%d") .to_string() .try_into()?, ), FivetranValue::NaiveTime(Timestamp { seconds, nanos }) => ConvexValue::String( DateTime::from_timestamp(seconds, nanos as u32) .context("Invalid datetime value")? .time() .format("%H:%M:%S%.f") .to_string() .try_into()?, ), FivetranValue::NaiveDatetime(Timestamp { seconds, nanos }) => ConvexValue::String( DateTime::from_timestamp(seconds, nanos as u32) .context("Invalid datetime value")? .naive_utc() .to_string() .try_into()?, ), FivetranValue::UtcDatetime(timestamp) => ConvexValue::Float64(timestamp_to_ms(timestamp)), FivetranValue::Decimal(v) => ConvexValue::String(v.try_into()?), FivetranValue::Binary(v) => ConvexValue::Bytes(v.try_into()?), FivetranValue::String(v) => ConvexValue::String(v.try_into()?), FivetranValue::Xml(v) => ConvexValue::String(v.try_into()?), FivetranValue::Json(v) => { let json_value = serde_json::from_str(&v) .context("Your data source contains a JSON value which isn’t valid.")?; json_to_convex_value(json_value, Level::Top).context("Your data source contains JSON data that isn’t supported by Convex. You can learn more about the values supported by Convex on https://docs.convex.dev/database/types")? }, }) } #[derive(PartialEq, Eq)] enum Level { Top, Nested, } /// Converts a JSON value to a Convex value. /// /// Unlike the default JsonValue → ConvexValue conversion, this conversion does /// not support internal representation for types such as binary. /// /// This conversion only supports arrays, objects and null at the top level. We /// do so because it forces JSON columns to be marked in the Convex schema with /// a type that Fivetran will recognize as representing a JSON column. fn json_to_convex_value(value: JsonValue, level: Level) -> anyhow::Result<ConvexValue> { Ok(match value { JsonValue::Null => ConvexValue::Null, JsonValue::Bool(b) => { anyhow::ensure!( level != Level::Top, "Booleans aren’t supported as top-level JSON values" ); ConvexValue::from(b) }, JsonValue::Number(n) => { anyhow::ensure!( level != Level::Top, "Numbers aren’t supported as top-level JSON values" ); let n = n .as_f64() .context("Arbitrary precision JSON integers unsupported")?; ConvexValue::from(n) }, JsonValue::String(s) => { anyhow::ensure!( level != Level::Top, "Strings aren’t supported as top-level JSON values" ); ConvexValue::try_from(s)? }, JsonValue::Array(arr) => { let mut out = Vec::with_capacity(arr.len()); for a in arr { out.push(json_to_convex_value(a, Level::Nested)?); } ConvexValue::try_from(out)? }, JsonValue::Object(map) => { let mut fields = BTreeMap::new(); for (key, value) in map { let field_name = FieldName::from_str(&key)?; fields.insert(field_name, json_to_convex_value(value, Level::Nested)?); } ConvexValue::try_from(fields)? }, }) } fn timestamp_to_ms(ts: Timestamp) -> f64 { ts.seconds as f64 * 1000.0 + ((ts.nanos as f64) / 1_000_000.0) } /// Converts a Fivetran file row to a Convex object. /// /// The values marked as unmodified in the Fivetran row are omitted from the /// Convex object. /// /// The Fivetran metadata columns (`_fivetran_synced`, `_fivetran_id`, and /// `_fivetran_deleted`) become nested attributes of the `fivetran` attribute in /// Convex. /// /// Other columns starting by an underscore become nested fields in /// `fivetran.columns`. This is done because their names are reserved in Convex, /// while such names are frequently used by Fivetran data sources. /// /// For instance, the following Convex row… /// ```no_run /// ┌──────────────┬─────┬─────────┬──────────────┬────────────────────────────────┬───────────────────┐ /// │ name │ age │ _secret │ _fivetran_id │ _fivetran_synced │ _fivetran_deleted │ /// ├──────────────┼─────┼─────────┼──────────────┼────────────────────────────────┼───────────────────┤ /// │ [Unmodified] │ 21 │ true │ 42 │ 2024-01-09T04:10:19.156057706Z │ false │ /// └──────────────┴─────┴─────────┴──────────────┴────────────────────────────────┴───────────────────┘ /// ``` /// /// …is converted to the following Convex object: /// ```no_run /// { /// age: 21, /// fivetran: { /// id: 42, /// synced: 1704773419156, /// deleted: false, /// columns: { /// secret: true /// } /// } /// } /// ``` impl TryInto<ConvexObject> for FileRow { type Error = anyhow::Error; fn try_into(self) -> Result<ConvexObject, Self::Error> { let mut row: BTreeMap<FieldName, ConvexValue> = BTreeMap::new(); let mut metadata: BTreeMap<FieldName, ConvexValue> = BTreeMap::new(); let mut underscore_columns: BTreeMap<FieldName, ConvexValue> = BTreeMap::new(); for (field_name, value) in self.0 { // Ignore unmodified values let FivetranFileValue::Value(value) = value else { continue; }; if field_name == *SYNCED_FIVETRAN_FIELD_NAME { let FivetranValue::UtcDatetime(timestamp) = value else { bail!("Unexpected value for _fivetran_synced: {:?}", value); }; metadata.insert( SYNCED_CONVEX_FIELD_NAME.clone().into(), ConvexValue::Float64(timestamp_to_ms(timestamp)), ); } else if field_name == *SOFT_DELETE_FIVETRAN_FIELD_NAME { metadata.insert( SOFT_DELETE_CONVEX_FIELD_NAME.clone().into(), fivetran_to_convex_value(value)?, ); } else if field_name == *ID_FIVETRAN_FIELD_NAME { metadata.insert( ID_CONVEX_FIELD_NAME.clone().into(), fivetran_to_convex_value(value)?, ); } else if let Some(field_name) = field_name.strip_prefix('_') { let field_name = FieldName::from_str(field_name) .context("Invalid field name in the source data")?; underscore_columns.insert(field_name, fivetran_to_convex_value(value)?); } else { let field_name = FieldName::from_str(&field_name) .context("Invalid field name in the source data")?; row.insert(field_name, fivetran_to_convex_value(value)?); } } if !underscore_columns.is_empty() { metadata.insert( UNDERSCORED_COLUMNS_CONVEX_FIELD_NAME.clone().into(), ConvexValue::Object(ConvexObject::try_from(underscore_columns)?), ); } row.insert( METADATA_CONVEX_FIELD_NAME.clone().into(), ConvexValue::Object(ConvexObject::try_from(metadata)?), ); ConvexObject::try_from(row) } } #[cfg(test)] pub fn fivetran_data_type(value: &FivetranValue) -> Option<FivetranDataType> { match value { FivetranValue::Null(_) => None, FivetranValue::Bool(_) => Some(FivetranDataType::Boolean), FivetranValue::Short(_) => Some(FivetranDataType::Short), FivetranValue::Int(_) => Some(FivetranDataType::Int), FivetranValue::Long(_) => Some(FivetranDataType::Long), FivetranValue::Float(_) => Some(FivetranDataType::Float), FivetranValue::Double(_) => Some(FivetranDataType::Double), FivetranValue::NaiveDate(_) => Some(FivetranDataType::NaiveDate), FivetranValue::NaiveTime(_) => Some(FivetranDataType::NaiveTime), FivetranValue::NaiveDatetime(_) => Some(FivetranDataType::NaiveDatetime), FivetranValue::UtcDatetime(_) => Some(FivetranDataType::UtcDatetime), FivetranValue::Decimal(_) => Some(FivetranDataType::Decimal), FivetranValue::Binary(_) => Some(FivetranDataType::Binary), FivetranValue::String(_) => Some(FivetranDataType::String), FivetranValue::Json(_) => Some(FivetranDataType::Json), FivetranValue::Xml(_) => Some(FivetranDataType::Xml), } } #[cfg(test)] fn timestamp_from_ms(ms_since_unix_epoch: f64) -> Timestamp { let ms_in_s = 1000.0; let ns_in_ms = 1_000_000.0; Timestamp { seconds: f64::div_euclid(ms_since_unix_epoch, ms_in_s) as i64, nanos: (ms_since_unix_epoch.rem_euclid(ms_in_s) * ns_in_ms) as i32, } } /// Converts a Convex value to the original Fivetran value value, given the type /// it was originally converted from. #[cfg(test)] fn roundtrip_converted_value( value: ConvexValue, original_type: FivetranDataType, ) -> anyhow::Result<FivetranValue> { use chrono::{ NaiveDate, NaiveDateTime, NaiveTime, }; Ok(match (value, original_type) { (ConvexValue::Boolean(v), FivetranDataType::Boolean) => FivetranValue::Bool(v), (ConvexValue::Float64(v), FivetranDataType::Short) => FivetranValue::Short(v as i32), (ConvexValue::Float64(v), FivetranDataType::Int) => FivetranValue::Int(v as i32), (ConvexValue::Int64(v), FivetranDataType::Long) => FivetranValue::Long(v), (ConvexValue::String(v), FivetranDataType::Decimal) => FivetranValue::Decimal(v.into()), (ConvexValue::Float64(v), FivetranDataType::Float) => FivetranValue::Float(v as f32), (ConvexValue::Float64(v), FivetranDataType::Double) => FivetranValue::Double(v), (ConvexValue::String(v), FivetranDataType::NaiveTime) => { let dt = NaiveDateTime::new( NaiveDate::default(), NaiveTime::parse_from_str(&v, "%H:%M:%S%.f")?, ) .and_utc(); FivetranValue::NaiveTime(Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, }) }, (ConvexValue::String(v), FivetranDataType::NaiveDate) => { let dt = NaiveDateTime::new( NaiveDate::parse_from_str(&v, "%Y-%m-%d")?, NaiveTime::default(), ) .and_utc(); FivetranValue::NaiveDate(Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, }) }, (ConvexValue::String(v), FivetranDataType::NaiveDatetime) => { let dt = NaiveDateTime::parse_from_str(&v, "%Y-%m-%d %H:%M:%S%.f")?.and_utc(); FivetranValue::NaiveDatetime(Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, }) }, (ConvexValue::Float64(v), FivetranDataType::UtcDatetime) => { FivetranValue::UtcDatetime(timestamp_from_ms(v)) }, (ConvexValue::Bytes(v), FivetranDataType::Binary) => FivetranValue::Binary(v.into()), (ConvexValue::String(v), FivetranDataType::Xml) => FivetranValue::Xml(v.into()), (ConvexValue::String(v), FivetranDataType::String) => FivetranValue::String(v.into()), (val, FivetranDataType::Json) => { let (ConvexValue::Object(_) | ConvexValue::Array(_)) = val else { bail!("Unexpected JSON value") }; FivetranValue::Json(val.json_serialize()?) }, (val, _) => { bail!("Unexpected value {val:?} for the given Fivetran type {original_type:?}",) }, }) } #[cfg(test)] mod tests { use std::str::FromStr; use cmd_util::env::env_config; use common::{ assert_obj, value::{ ConvexObject, ConvexValue, }, }; use convex_fivetran_common::fivetran_sdk::value_type::Inner as FivetranValue; use convex_fivetran_destination::api_types::FivetranFieldName; use maplit::btreemap; use proptest::prelude::*; use prost_types::Timestamp; use super::fivetran_to_convex_value; use crate::{ convert::{ fivetran_data_type, roundtrip_converted_value, timestamp_from_ms, }, file_reader::{ FileRow, FivetranFileValue, }, }; #[test] fn convert_file_row_into_convex_object() -> anyhow::Result<()> { let actual: ConvexObject = FileRow(btreemap! { FivetranFieldName::from_str("name")? => FivetranFileValue::Value(FivetranValue::String("Nicolas".to_string())), FivetranFieldName::from_str("null_attribute")? => FivetranFileValue::Value(FivetranValue::Null(true)), FivetranFieldName::from_str("unmodified_attribute")? => FivetranFileValue::Unmodified, FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1715700652, nanos: 563000000, })), }).try_into()?; let expected = assert_obj!( "name" => "Nicolas", "null_attribute" => ConvexValue::Null, // unmodified values are not included in the result "fivetran" => assert_obj!( "synced" => 1715700652563.0, ), ); assert_eq!(actual, expected); Ok(()) } #[test] fn convert_file_row_with_all_metadata_fields() -> anyhow::Result<()> { let actual: ConvexObject = FileRow(btreemap! { FivetranFieldName::from_str("name")? => FivetranFileValue::Value(FivetranValue::String("Nicolas".to_string())), FivetranFieldName::from_str("_fivetran_id")? => FivetranFileValue::Value(FivetranValue::Int(42)), FivetranFieldName::from_str("_fivetran_deleted")? => FivetranFileValue::Value(FivetranValue::Bool(false)), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1715700652, nanos: 563000000, })), }).try_into()?; let expected = assert_obj!( "name" => "Nicolas", "fivetran" => assert_obj!( "id" => 42.0, "deleted" => false, "synced" => 1715700652563.0, ), ); assert_eq!(actual, expected); Ok(()) } #[test] fn convert_file_row_with_column_names_starting_with_underscore() -> anyhow::Result<()> { let actual: ConvexObject = FileRow(btreemap! { FivetranFieldName::from_str("_name")? => FivetranFileValue::Value(FivetranValue::String("Nicolas".to_string())), FivetranFieldName::from_str("_fivetran_synced")? => FivetranFileValue::Value(FivetranValue::UtcDatetime(Timestamp { seconds: 1715700652, nanos: 563000000, })), }).try_into()?; let expected = assert_obj!( // unmodified values are not included in the result "fivetran" => assert_obj!( "synced" => 1715700652563.0, "columns" => assert_obj!( "name" => "Nicolas", ), ), ); assert_eq!(actual, expected); Ok(()) } #[test] fn booleans_are_converted_directly() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Bool(false))?, ConvexValue::Boolean(false) ); assert_eq!( fivetran_to_convex_value(FivetranValue::Bool(true))?, ConvexValue::Boolean(true) ); Ok(()) } #[test] fn small_integer_types_are_converted_to_v_number() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Short(42))?, ConvexValue::Float64(42.0) ); assert_eq!( fivetran_to_convex_value(FivetranValue::Int(-4))?, ConvexValue::Float64(-4.0) ); Ok(()) } #[test] fn longs_are_converted_directly() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Long(i64::MAX))?, ConvexValue::Int64(i64::MAX) ); Ok(()) } #[test] fn decimals_are_represented_as_strings() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Decimal("123.456".to_string()))?, ConvexValue::String("123.456".try_into()?) ); Ok(()) } #[test] fn floats_are_converted_to_doubles() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Float(1.0))?, ConvexValue::Float64(1.0) ); Ok(()) } #[test] fn doubles_are_converted_directly() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Double(std::f64::consts::PI))?, ConvexValue::Float64(std::f64::consts::PI) ); Ok(()) } #[test] fn naive_date_is_converted_to_strings() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveDate(Timestamp { seconds: 1196640000, nanos: 0 }))?, ConvexValue::String("2007-12-03".try_into()?) ); assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveDate(Timestamp { seconds: 0, nanos: 0 }))?, ConvexValue::String("1970-01-01".try_into()?) ); Ok(()) } #[test] fn naive_time_is_converted_to_strings() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveTime(Timestamp { seconds: 19 * 60 * 60 + 41 * 60 + 30, nanos: 0 }))?, ConvexValue::String("19:41:30".try_into()?) ); Ok(()) } #[test] fn naive_datetime_is_converted_to_strings() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveDatetime(Timestamp { seconds: 1196676930, nanos: 0 }))?, ConvexValue::String("2007-12-03 10:15:30".try_into()?) ); assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveDatetime(Timestamp { seconds: 1196676930, nanos: 1_000_000 }))?, ConvexValue::String("2007-12-03 10:15:30.001".try_into()?) ); assert_eq!( fivetran_to_convex_value(FivetranValue::NaiveDatetime(Timestamp { seconds: 0, nanos: 0 }))?, ConvexValue::String("1970-01-01 00:00:00".try_into()?) ); Ok(()) } #[test] fn utc_datetimes_are_converted_to_ms_timestamps() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::UtcDatetime(Timestamp { seconds: 1196676930, nanos: 123_000_000, }))?, ConvexValue::Float64(1196676930.0 * 1000.0 + 123.0) ); Ok(()) } #[test] fn binary_is_converted_directly() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Binary(vec![0, 255, 1, 2, 3]))?, ConvexValue::Bytes(vec![0, 255, 1, 2, 3].try_into()?) ); Ok(()) } #[test] fn xml_is_converted_to_string() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Xml( "<?xml version=\"1.0\" encoding=\"UTF-8\"?><document/>".to_string() ))?, ConvexValue::String( "<?xml version=\"1.0\" encoding=\"UTF-8\"?><document/>".try_into()? ) ); Ok(()) } #[test] fn string_is_converted_directly() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::String("Hello world".to_string()))?, ConvexValue::String("Hello world".try_into()?) ); Ok(()) } #[test] fn json_objects_are_converted() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Json( "{ \"null\": null, \"bool\": false, \"number\": 42, \"string\": \"Hello world\", \"array\": [-1], \"object\": { \"nested\": true } }" .to_string() ))?, ConvexValue::Object(assert_obj!( "null" => ConvexValue::Null, "bool" => false, "number" => 42.0, "string" => "Hello world", "array" => ConvexValue::Array(vec![ConvexValue::Float64(-1.0)].try_into()?), "object" => ConvexValue::Object(assert_obj!( "nested" => true, )), )), ); Ok(()) } #[test] fn json_arrays_are_converted() -> anyhow::Result<()> { assert_eq!( fivetran_to_convex_value(FivetranValue::Json("[1,2,3]".to_string()))?, ConvexValue::Array( vec![ ConvexValue::Float64(1.0), ConvexValue::Float64(2.0), ConvexValue::Float64(3.0), ] .try_into()? ) ); Ok(()) } #[test] fn other_json_values_are_not_converted() -> anyhow::Result<()> { fivetran_to_convex_value(FivetranValue::Json("42".to_string())).unwrap_err(); Ok(()) } #[test] fn json_conversions_can_fail() -> anyhow::Result<()> { fivetran_to_convex_value(FivetranValue::Json("{\"$reserved\": true}".to_string())) .unwrap_err(); Ok(()) } #[test] fn ms_to_timestamp_conversion() { assert_eq!( timestamp_from_ms(1234.0), Timestamp { seconds: 1, nanos: 234_000_000 } ); assert_eq!( timestamp_from_ms(-1.0), Timestamp { seconds: -1, nanos: 999_000_000 } ); } proptest! { #![proptest_config(ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() })] #[test] fn test_fivetran_to_convex_conversion_roundtrips(value in any::<FivetranValue>()) { // Skipping FivetranValue::Json since a JSON value might have different string // representation due to whitespace, and serde_json’s equality doesn’t match JavaScript // value equality (e.g. 0.0 != 0 when comparing the equality of JsonValue) if let FivetranValue::Null(_) | FivetranValue::Json(_) = value { return Ok(()); } let original_data_type = fivetran_data_type(&value) .expect("The original value has no data type"); let Ok(converted_value) = fivetran_to_convex_value(value.clone()) else { panic!("Can’t serialize the value {value:?}"); }; let roundtripped_value = roundtrip_converted_value(converted_value, original_data_type) .expect("Couldn’t roundtrip the value"); prop_assert_eq!( fivetran_data_type(&roundtripped_value) .expect("The roundtripped value has no data type"), original_data_type ); prop_assert_eq!(roundtripped_value, value); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server