Skip to main content
Glama

Convex MCP server

Official
by get-convex
sync.rs12 kB
use std::collections::{ BTreeMap, BTreeSet, }; use anyhow::Context; use convex_fivetran_common::fivetran_sdk::{ self, update_response, value_type, Record, RecordType, UpdateResponse as FivetranUpdateResponse, ValueType, }; use futures::{ stream::BoxStream, StreamExt, }; use futures_async_stream::try_stream; use serde::{ Deserialize, Serialize, }; use value_type::Inner as FivetranValue; use crate::{ api_types::selection::Selection, convert::to_fivetran_row, convex_api::{ DocumentDeltasCursor, ListSnapshotCursor, SnapshotValue, Source, }, log::log, }; /// The value currently used for the `version` field of [`State`]. const CURSOR_VERSION: i64 = 1; /// Stores the current synchronization state of a destination. A state will be /// send (as JSON) to Fivetran every time we perform a checkpoint, and will be /// returned to us every time Fivetran calls the `update` method of the /// connector. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "camelCase")] #[serde(deny_unknown_fields)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub struct State { /// The version of the connector that emitted this checkpoint. Could be used /// in the future to support backward compatibility with older state /// formats. pub version: i64, pub checkpoint: Checkpoint, /// If set, then we are tracking the full set of tables that the connector /// has every seen, so we are able to issue truncates the first time we /// see a table. /// /// Older versions of state.json do not have this field set. Once all /// state.json have this field, we can make this non-optional. /// /// The format of this string is `{table_name}` for the root component, /// or `{component_path}/{table_name}` for tables in other components. pub tables_seen: Option<BTreeSet<String>>, } impl State { pub fn create(checkpoint: Checkpoint, tables_seen: Option<BTreeSet<String>>) -> Self { Self { version: CURSOR_VERSION, checkpoint, tables_seen, } } } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(deny_unknown_fields)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub enum Checkpoint { /// A checkpoint emitted during the initial synchonization. InitialSync { snapshot: i64, cursor: ListSnapshotCursor, }, /// A checkpoint emitted after an initial synchronzation has been completed. DeltaUpdates { cursor: DocumentDeltasCursor }, } /// A simplification of the messages sent to Fivetran in the `update` endpoint. #[derive(Debug)] pub enum UpdateMessage { Update { schema_name: Option<String>, table_name: String, op_type: RecordType, row: BTreeMap<String, FivetranValue>, }, Checkpoint(State), } /// Conversion of the simplified update message type to the actual gRPC type. impl From<UpdateMessage> for FivetranUpdateResponse { fn from(value: UpdateMessage) -> Self { FivetranUpdateResponse { operation: Some(match value { UpdateMessage::Update { schema_name, table_name, op_type, row, } => update_response::Operation::Record(Record { schema_name, table_name, r#type: op_type as i32, data: row .into_iter() .map(|(field_name, field_value)| { ( field_name, ValueType { inner: Some(field_value), }, ) }) .collect(), }), UpdateMessage::Checkpoint(checkpoint) => { let state_json = serde_json::to_string(&checkpoint) .expect("Couldn’t serialize a checkpoint"); update_response::Operation::Checkpoint(fivetran_sdk::Checkpoint { state_json }) }, }), } } } /// Returns the stream that the `update` endpoint emits. pub fn sync( source: impl Source + 'static, state: Option<State>, selection: Selection, ) -> BoxStream<'static, anyhow::Result<UpdateMessage>> { let Some(state) = state else { return initial_sync(source, None, Some(BTreeSet::new()), selection).boxed(); }; let State { version: _version, checkpoint, tables_seen, } = state; match checkpoint { Checkpoint::InitialSync { snapshot, cursor } => { initial_sync(source, Some((snapshot, cursor)), tables_seen, selection).boxed() }, Checkpoint::DeltaUpdates { cursor } => { delta_sync(source, cursor, tables_seen, selection).boxed() }, } } /// Performs (or resume) an initial synchronization. #[try_stream(ok = UpdateMessage, error = anyhow::Error)] async fn initial_sync( source: impl Source, mut checkpoint: Option<(i64, ListSnapshotCursor)>, mut tables_seen: Option<BTreeSet<String>>, selection: Selection, ) { let log_msg = if let Some((snapshot, _)) = checkpoint { format!("Resuming an initial sync from {source} at {snapshot}") } else { format!("Starting an initial sync from {source}") }; log(&log_msg); let snapshot = loop { let snapshot = checkpoint.as_ref().map(|c| c.0); let cursor = checkpoint.as_ref().map(|c| c.1.clone()); let res = source .list_snapshot(snapshot, cursor.clone(), selection.clone()) .await?; for value in res.values { if let Some(ref mut tables_seen) = tables_seen { // Issue truncates if we see a table for the first time. // Skip the behavior for legacy state.json - where tables_seen wasn't tracked. let table_seen_key = value.table_path_for_state(); if !tables_seen.contains(&table_seen_key) { tables_seen.insert(table_seen_key); yield UpdateMessage::Update { schema_name: Some(value.fivetran_schema_name()), table_name: value.table.clone(), op_type: RecordType::Truncate, row: BTreeMap::new(), }; } } yield UpdateMessage::Update { schema_name: Some(value.fivetran_schema_name()), table_name: value.table, op_type: RecordType::Upsert, row: to_fivetran_row(value.fields)?, }; } if res.has_more { let cursor = ListSnapshotCursor::from( res.cursor.context("Missing cursor when has_more was set")?, ); yield UpdateMessage::Checkpoint(State::create( Checkpoint::InitialSync { snapshot: res.snapshot, cursor: cursor.clone(), }, tables_seen.clone(), )); checkpoint = Some((res.snapshot, cursor)); } else { break res.snapshot; } }; let cursor = DocumentDeltasCursor::from(snapshot); yield UpdateMessage::Checkpoint(State::create( Checkpoint::DeltaUpdates { cursor }, tables_seen, )); log(&format!( "Initial sync from {source} successful at cursor {cursor}." )); } /// Synchronizes the changes that happened after an initial synchronization or /// delta synchronization has been completed. #[try_stream(ok = UpdateMessage, error = anyhow::Error)] async fn delta_sync( source: impl Source, cursor: DocumentDeltasCursor, mut tables_seen: Option<BTreeSet<String>>, selection: Selection, ) { log(&format!("Delta sync from {source} starting at {cursor}.")); let mut cursor = cursor; let mut has_more = true; while has_more { let response = source.document_deltas(cursor, selection.clone()).await?; for value in response.values { if let Some(ref mut tables_seen) = tables_seen { // Issue truncates if we see a table for the first time. // Skip the behavior for legacy state.json - where tables_seen wasn't tracked. let table_seen_key = value.table_path_for_state(); if !tables_seen.contains(&table_seen_key) { tables_seen.insert(table_seen_key); yield UpdateMessage::Update { schema_name: Some(value.fivetran_schema_name()), table_name: value.table.clone(), op_type: RecordType::Truncate, row: BTreeMap::new(), }; } } yield UpdateMessage::Update { schema_name: Some(value.fivetran_schema_name()), table_name: value.table, op_type: if value.deleted { RecordType::Delete } else { RecordType::Upsert }, row: to_fivetran_row(value.fields)?, }; } cursor = DocumentDeltasCursor::from(response.cursor); has_more = response.has_more; // It is safe to take a snapshot here, because document_deltas // guarantees that the state given by one call is consistent. yield UpdateMessage::Checkpoint(State::create( Checkpoint::DeltaUpdates { cursor }, tables_seen.clone(), )); } log(&format!( "Delta sync changes applied from {source}. Final cursor {cursor}" )); } #[cfg(test)] mod state_serialization_tests { use cmd_util::env::env_config; use proptest::prelude::*; use crate::sync::{ Checkpoint, State, }; proptest! { #![proptest_config(ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() })] #[test] fn state_json_roundtrips(value in any::<State>()) { let json = serde_json::to_string(&value).unwrap(); prop_assert_eq!(value, serde_json::from_str(&json).unwrap()); } } #[test] fn refuses_unknown_state_object() { assert!(serde_json::from_str::<State>("{\"a\": \"b\"}").is_err()); } #[test] fn refuses_unknown_checkpoint_object() { assert!(serde_json::from_str::<State>( "{ \"version\": 1, \"snapshot\": { \"NewState\": { \"cursor\": 42 } } }" ) .is_err()); } #[test] fn deserializes_v1_initial_sync_checkpoints() { assert_eq!( serde_json::from_str::<State>( "{ \"version\": 1, \"checkpoint\": { \"InitialSync\": { \"snapshot\": 42, \ \"cursor\": \"abc123\" } } }" ) .unwrap(), State { version: 1, checkpoint: Checkpoint::InitialSync { snapshot: 42, cursor: String::from("abc123").into(), }, tables_seen: None, }, ); } #[test] fn deserializes_v1_delta_update_checkpoints() { assert_eq!( serde_json::from_str::<State>( "{ \"version\": 1, \"checkpoint\": { \"DeltaUpdates\": { \"cursor\": 42 } } }" ) .unwrap(), State { version: 1, checkpoint: Checkpoint::DeltaUpdates { cursor: 42.into() }, tables_seen: None, }, ); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server