Skip to main content
Glama

Convex MCP server

Official
by get-convex
airbyte_import.rs5.99 kB
use std::collections::BTreeSet; use common::{ components::ComponentPath, knobs::TRANSACTION_MAX_NUM_USER_WRITES, query::{ Order, Query, }, }; use database::{ ResolvedQuery, TableModel, }; use keybroker::Identity; use maplit::btreemap; use runtime::testing::TestRuntime; use value::{ assert_obj, TableName, TableNamespace, }; use crate::{ airbyte_import::{ AirbyteRecord, PrimaryKey, ValidatedAirbyteStream, }, test_helpers::ApplicationTestExt, Application, }; #[convex_macro::test_runtime] async fn test_clear_tables_and_import(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let table: TableName = "table".parse()?; // Insert a bunch of empty messages over multiple transactions via streaming // import let mut records = vec![]; let rows_to_insert = *TRANSACTION_MAX_NUM_USER_WRITES + 5; for _ in 0..rows_to_insert { records.push(AirbyteRecord::new(table.clone(), false, assert_obj!())); } let streams = btreemap! { table.clone() => ValidatedAirbyteStream::Append }; let rows_inserted = application .import_airbyte_records(&Identity::system(), records, streams) .await?; assert_eq!(rows_inserted, rows_to_insert as u64); // clear_tables should deleted the same number of documents as import inserted let deleted_docs = application .clear_tables( &Identity::system(), vec![(ComponentPath::root(), table.clone())], ) .await?; assert_eq!(deleted_docs, rows_inserted); // Check the table is empty after clearing it let mut tx = application.begin(Identity::system()).await?; assert!( TableModel::new(&mut tx) .table_is_empty(TableNamespace::test_user(), &table) .await? ); assert!(TableModel::new(&mut tx).table_exists(TableNamespace::test_user(), &table)); Ok(()) } #[convex_macro::test_runtime] async fn test_delete_tables_and_import(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let table: TableName = "table".parse()?; // Insert a bunch of empty messages over multiple transactions via streaming // import let mut records = vec![]; let rows_to_insert = *TRANSACTION_MAX_NUM_USER_WRITES + 5; for _ in 0..rows_to_insert { records.push(AirbyteRecord::new(table.clone(), false, assert_obj!())); } let streams = btreemap! { table.clone() => ValidatedAirbyteStream::Append }; let rows_inserted = application .import_airbyte_records(&Identity::system(), records, streams) .await?; assert_eq!(rows_inserted, rows_to_insert as u64); // delete_tables should deleted the same number of documents as streaming import // inserted let table_namespace = TableNamespace::test_user(); let deleted_docs = application .delete_tables(&Identity::system(), vec![table.clone()], table_namespace) .await?; assert_eq!(deleted_docs, rows_inserted); // Check the table is empty after clearing it let mut tx = application.begin(Identity::system()).await?; assert!( TableModel::new(&mut tx) .table_is_empty(table_namespace, &table) .await? ); assert!(!TableModel::new(&mut tx).table_exists(table_namespace, &table)); Ok(()) } #[convex_macro::test_runtime] async fn test_dedup_import(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let table: TableName = "table".parse()?; let primary_key = PrimaryKey::try_from(vec![vec!["primary_key".to_string()]])?; let indexes = btreemap! {table.clone() => primary_key.clone()}; application .add_primary_key_indexes(&Identity::system(), indexes) .await?; // Make sure the indexes are enabled. Backfill would happen asynchronously, but // because the table is empty, it should be done here. application .wait_for_primary_key_indexes_ready(Identity::system(), BTreeSet::from([table.clone()])) .await?; let objects = vec![ // Updated value for key1 ( assert_obj!("field1" => "value1", "primary_key" => "key1"), false, ), ( assert_obj!("field1" => "value2", "primary_key" => "key1"), false, ), // No changes to key2 ( assert_obj!("field1" => "value1", "primary_key" => "key2"), false, ), // Deleted value for key3 ( assert_obj!("field1" => "value1", "primary_key" => "key3"), false, ), (assert_obj!("primary_key" => "key3"), true), ]; let records = objects .iter() .cloned() .map(|(obj, deleted)| AirbyteRecord::new(table.clone(), deleted, obj)) .collect(); let streams = btreemap! { table.clone() => ValidatedAirbyteStream::Dedup(primary_key) }; let count = application .import_airbyte_records(&Identity::system(), records, streams) .await?; assert_eq!(count, objects.len() as u64); let mut tx = application.begin(Identity::system()).await?; let mut query_stream = ResolvedQuery::new( &mut tx, TableNamespace::test_user(), Query::full_table_scan(table, Order::Asc), )?; let mut objects_in_table = vec![]; while let Some(doc) = query_stream.next(&mut tx, Some(3)).await? { objects_in_table.push(doc.into_value()); } let objects = &objects[1..objects.len() - 2]; assert_eq!(objects.len(), objects_in_table.len()); for (i, (obj, _)) in objects.iter().enumerate() { let obj_in_table = &*objects_in_table[i]; for field in ["field1", "primary_key"] { assert_eq!(obj.get(field), obj_in_table.get(field)) } } Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server