Skip to main content
Glama

Convex MCP server

Official
by get-convex
tests.rs29.2 kB
use std::{ collections::BTreeMap, sync::Arc, }; use application::{ test_helpers::ApplicationTestExt, Application, }; use common::{ assert_obj, components::{ CanonicalizedComponentFunctionPath, ComponentId, ComponentPath, PublicFunctionPath, }, http::{ RequestDestination, ResolvedHostname, }, runtime::{ shutdown_and_join, Runtime, SpawnHandle, }, types::{ FunctionCaller, MemberId, SessionId, SessionRequestSeqNumber, Timestamp, }, value::{ assert_val, ConvexObject, ConvexValue, }, version::ClientVersion, RequestId, }; use errors::ErrorMetadataAnyhowExt; use isolate::test_helpers::TEST_SOURCE_ISOLATE_ONLY; use keybroker::{ testing::TestUserIdentity, Identity, KeyBroker, DEV_SECRET, }; use model::{ config::{ types::ConfigMetadata, ConfigModel, }, udf_config::types::UdfConfig, }; use must_let::must_let; use parking_lot::Mutex; use runtime::testing::TestRuntime; use sync_types::{ types::SerializedArgs, AuthenticationToken, ClientMessage, Query, QueryId, QuerySetModification, StateModification, UserIdentityAttributes, }; use tokio::sync::mpsc; use crate::{ worker::{ measurable_unbounded_channel, SingleFlightReceiver, }, ServerMessage, SyncWorker, SyncWorkerConfig, }; struct SyncTest<RT: Runtime> { pub rt: RT, pub kb: KeyBroker, application: Application<RT>, } impl<RT: Runtime> SyncTest<RT> { async fn new(rt: RT) -> anyhow::Result<Self> { let application = Application::new_for_tests(&rt).await?; let kb = application.key_broker().clone(); let application_ = application.clone(); // Populate UDFs. { // Only analyze isolate modules, so sync tests don't require prod // runtime. Remove the filter if you need to test node modules. let modules = TEST_SOURCE_ISOLATE_ONLY.clone(); let source_package = application.upload_package(&modules, None, None).await?; let udf_config = UdfConfig::new_for_test(&rt, "1000.0.0".parse()?); let analyze_results = application .analyze( udf_config.clone(), modules.clone(), source_package.clone(), BTreeMap::new(), BTreeMap::new(), ) .await??; let mut tx = application_.begin(Identity::system()).await?; ConfigModel::new(&mut tx, ComponentId::test_user()) .apply( ConfigMetadata::new(), modules, udf_config, Some(source_package), analyze_results, None, ) .await?; application_.commit_test(tx).await?; } Ok(Self { rt, kb, application, }) } fn new_worker(&self) -> anyhow::Result<TestSyncWorker<RT>> { let config = SyncWorkerConfig::default(); self.new_worker_with_config(config, None) } fn new_worker_with_config( &self, config: SyncWorkerConfig, max_observed_timestamp: Option<Timestamp>, ) -> anyhow::Result<TestSyncWorker<RT>> { let worker_failed = Arc::new(Mutex::new(None)); let (client_tx, client_rx) = mpsc::unbounded_channel(); let (server_tx, server_rx) = measurable_unbounded_channel(); let worker_failed_ = worker_failed.clone(); let api = Arc::new(self.application.clone()); let rt = self.rt.clone(); let future = async move { // TODO(CX-597): The panic in this future currently gets swallowed by // `futures::RemoteHandle`. if let Err(e) = SyncWorker::new( api, rt, ResolvedHostname { instance_name: String::new(), destination: RequestDestination::ConvexCloud, }, config, client_rx, server_tx, Box::new(|_session_id| ()), 0, ) .go() .await { worker_failed_.lock().replace(e); } }; let worker_handle = self.rt.spawn("sync_test", future); client_tx.send(( ClientMessage::Connect { session_id: SessionId::nil(), connection_count: 0, last_close_reason: "InitialConnect".to_string(), max_observed_timestamp, client_ts: None, }, self.rt.monotonic_now(), ))?; Ok(TestSyncWorker { rt: self.rt.clone(), worker_handle, worker_failed, tx: client_tx, rx: server_rx, }) } } struct TestSyncWorker<RT: Runtime> { rt: RT, tx: mpsc::UnboundedSender<(ClientMessage, tokio::time::Instant)>, rx: SingleFlightReceiver, worker_handle: Box<dyn SpawnHandle>, worker_failed: Arc<Mutex<Option<anyhow::Error>>>, } impl<RT: Runtime> TestSyncWorker<RT> { fn send(&self, message: ClientMessage) -> anyhow::Result<()> { self.tx.send((message, self.rt.monotonic_now()))?; Ok(()) } async fn receive(&mut self) -> anyhow::Result<ServerMessage> { let message = self .rx .next() .await .ok_or_else(|| { anyhow::anyhow!( "Sync worker unexpectedly hung up: {:?}", self.worker_failed.lock() ) })? .0; Ok(message) } async fn mutation( &mut self, path: &str, args: ConvexObject, request_id: SessionRequestSeqNumber, ) -> anyhow::Result<(ConvexValue, Timestamp)> { self.send(ClientMessage::Mutation { request_id, udf_path: path.parse()?, args: SerializedArgs::from_args(vec![args.into()])?, component_path: None, })?; must_let!(let ServerMessage::MutationResponse { request_id: outgoing_mutation_id, result, ts, .. } = self.receive().await?); // Check that the mutation ID on the outgoing message matches to // confirm we intercepted the right message assert_eq!(request_id, outgoing_mutation_id); Ok((result.unwrap().unpack(), ts.unwrap())) } async fn shutdown(self) -> anyhow::Result<()> { shutdown_and_join(self.worker_handle).await?; self.worker_failed.lock().take().map_or(Ok(()), Err) } fn with_worker_error<T>(&self, f: impl FnOnce(&Option<anyhow::Error>) -> T) -> T { let worker_failed = self.worker_failed.lock(); f(&worker_failed) } } #[convex_macro::test_runtime] async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let name1 = assert_val!("orinoco"); let name2 = assert_val!("tizoncito"); // 1. Initialize the two account balances. let (result, _) = sync_worker .mutation( "sync:initialize", assert_obj!("name" => name1.clone(), "balance" => 100.0), 0, ) .await?; // initialize doesn't explicitly return a result, so the result is Null assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { .. } = sync_worker.receive().await?); let (result, initialization_ts) = sync_worker .mutation( "sync:initialize", assert_obj!("name" => name2.clone(), "balance" => 50.0), 1, ) .await?; assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { .. } = sync_worker.receive().await?); // 2. Start a new subscription. let query = Query { query_id: QueryId::new(0), udf_path: "sync:accountBalance".parse()?, args: SerializedArgs::from_args(vec![assert_obj!("name" => name1.clone()).into()])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(query)], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, .. } = sync_worker.receive().await?); assert_eq!(start.query_set, 0); assert_eq!(end.query_set, 1); let ts0 = end.ts; assert!(Timestamp::MIN < ts0); assert!(ts0 >= initialization_ts); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert_eq!(value.unpack(), ConvexValue::from(100.0)); // 3. Mutate a single query and see that it gets updated. let (result, _) = sync_worker .mutation( "sync:deposit", assert_obj!("name" => name1.clone(), "balance" => 50.0), 2, ) .await?; assert_eq!(result, assert_val!("orinoco's balance is now 150")); must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, .. } = sync_worker.receive().await?); assert_eq!(start.query_set, 1); assert_eq!(end.query_set, 1); let ts1 = end.ts; assert!(ts0 < ts1); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert_eq!(value.unpack(), ConvexValue::from(150.0)); // 4. Add a new query. let query = Query { query_id: QueryId::new(1), udf_path: "sync:accountBalance".parse()?, args: SerializedArgs::from_args(vec![assert_obj!("name" => name2.clone()).into()])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 1, new_version: 2, modifications: vec![QuerySetModification::Add(query)], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, .. } = sync_worker.receive().await?); assert_eq!(start.query_set, 1); assert_eq!(end.query_set, 2); let ts2 = end.ts; assert!(ts1 <= ts2); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(1)); assert_eq!(value.unpack(), ConvexValue::from(50.0)); // 5. Do a transfer and see that the two queries get updated atomically. let (result, _) = sync_worker .mutation( "sync:transfer", assert_obj!("from" => name1, "to" => name2, "amount" => 25.0), 3, ) .await?; assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, .. } = sync_worker.receive().await?); assert_eq!(start.query_set, 2); assert_eq!(end.query_set, 2); let ts3 = end.ts; assert!(ts2 < ts3); assert_eq!(modifications.len(), 2); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert_eq!(value.unpack(), ConvexValue::from(125.0)); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[1]); assert_eq!(*query_id, QueryId::new(1)); assert_eq!(value.unpack(), ConvexValue::from(75.0)); // 5. Remove a query. let msg = ClientMessage::ModifyQuerySet { base_version: 2, new_version: 3, modifications: vec![QuerySetModification::Remove { query_id: QueryId::new(0), }], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, .. } = sync_worker.receive().await?); assert_eq!(start.query_set, 2); assert_eq!(end.query_set, 3); assert!(ts3 <= end.ts); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryRemoved { query_id } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); sync_worker.shutdown().await?; Ok(()) } // Regression test that makes sure queries can be removed even if they were just // added. The query should be removed from the in_progress_queries. #[convex_macro::test_runtime] async fn test_remove_in_progress_query(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let name1: ConvexValue = assert_val!("orinoco"); // Start a new subscription, and then cancel it. let query_id = QueryId::new(0); let query = Query { query_id, udf_path: "sync:accountBalance".parse()?, args: SerializedArgs::from_args(vec![assert_obj!("name" => name1.clone()).into()])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![ QuerySetModification::Add(query), QuerySetModification::Remove { query_id }, ], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { start_version: start, end_version: end, modifications, client_clock_skew: _, server_ts: _, } = sync_worker.receive().await?); assert_eq!(start.query_set, 0); assert_eq!(end.query_set, 1); assert!(Timestamp::MIN < end.ts); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryRemoved { query_id } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); sync_worker.shutdown().await?; Ok(()) } #[convex_macro::test_runtime] async fn test_query_failure(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let query1 = Query { query_id: QueryId::new(0), udf_path: "sync:fail".parse()?, args: SerializedArgs::from_args(vec![assert_obj!("i" => ConvexValue::from(0.0)).into()])?, journal: None, component_path: None, }; let query2 = Query { query_id: QueryId::new(1), udf_path: "sync:fail".parse()?, args: SerializedArgs::from_args(vec![assert_obj!("i" => ConvexValue::from(3.0)).into()])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![ QuerySetModification::Add(query1), QuerySetModification::Add(query2), ], }; sync_worker.send(msg)?; // First, we should successfully transition to both of these queries failing. must_let!(let ServerMessage::Transition { end_version, modifications, .. } = sync_worker.receive().await?); assert_eq!(modifications.len(), 2); must_let!(let StateModification::QueryFailed { query_id, error_message, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert!(error_message.contains("I can't go for that")); must_let!(let StateModification::QueryFailed { query_id, error_message, .. } = &modifications[1]); assert_eq!(*query_id, QueryId::new(1)); assert!(error_message.contains("But I won't do that")); // Add a successful query to the query set. let query3: Query = Query { query_id: QueryId::new(2), udf_path: "sync:succeed".parse()?, args: SerializedArgs::from_args(vec![])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: end_version.query_set, new_version: end_version.query_set + 1, modifications: vec![QuerySetModification::Add(query3)], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { end_version, modifications, .. } = sync_worker.receive().await?); assert_eq!(modifications.len(), 1); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(2)); assert_eq!(value.unpack(), ConvexValue::try_from("on my list")?); // Remove the two failing queries. let msg = ClientMessage::ModifyQuerySet { base_version: end_version.query_set, new_version: end_version.query_set + 1, modifications: vec![ QuerySetModification::Remove { query_id: QueryId::new(0), }, QuerySetModification::Remove { query_id: QueryId::new(1), }, ], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { modifications, .. } = sync_worker.receive().await?); assert_eq!(modifications.len(), 2); assert_eq!( modifications[0], StateModification::QueryRemoved { query_id: QueryId::new(0) } ); assert_eq!( modifications[1], StateModification::QueryRemoved { query_id: QueryId::new(1) } ); sync_worker.shutdown().await?; Ok(()) } #[convex_macro::test_runtime] async fn test_admin_auth(rt: TestRuntime) -> anyhow::Result<()> { // Normal path let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let admin_key = test.kb.issue_admin_key(MemberId(1)); sync_worker.send(ClientMessage::Authenticate { token: AuthenticationToken::Admin(admin_key.as_string(), None), base_version: 0, })?; must_let!(let ServerMessage::Transition { start_version, end_version, .. } = sync_worker.receive().await?); assert_eq!(start_version.identity, 0); assert_eq!(end_version.identity, 1); sync_worker.shutdown().await?; Ok(()) } #[convex_macro::test_runtime] async fn test_admin_auth_bad_key(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let bogus_admin_key = KeyBroker::new("bozo-fish-123", DEV_SECRET.try_into()?)?.issue_admin_key(MemberId(1)); sync_worker.send(ClientMessage::Authenticate { token: AuthenticationToken::Admin(bogus_admin_key.as_string(), None), base_version: 0, })?; let err = sync_worker.receive().await.unwrap_err(); assert!( err.to_string().contains( "Sync worker unexpectedly hung up: Some(The provided admin key was invalid for this \ instance" ), "{err}" ); sync_worker.with_worker_error(|e| assert!(e.as_ref().unwrap().is_auth_update_failed())); Ok(()) } #[convex_macro::test_runtime] async fn test_acting_auth_bad_key(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let bogus_admin_key = KeyBroker::new("bozo-fish-123", DEV_SECRET.try_into()?)?.issue_admin_key(MemberId(1)); sync_worker.send(ClientMessage::Authenticate { token: AuthenticationToken::Admin( bogus_admin_key.as_string(), Some(UserIdentityAttributes::test()), ), base_version: 0, })?; let err = sync_worker.receive().await.unwrap_err(); assert!( err.to_string().contains( "Sync worker unexpectedly hung up: Some(The provided admin key was invalid for this \ instance" ), "{err}" ); sync_worker.with_worker_error(|e| assert!(e.as_ref().unwrap().is_auth_update_failed())); Ok(()) } #[convex_macro::test_runtime] async fn test_acting_auth(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let admin_key = test.kb.issue_admin_key(MemberId(1)); sync_worker.send(ClientMessage::Authenticate { token: AuthenticationToken::Admin( admin_key.as_string(), Some(UserIdentityAttributes::test()), ), base_version: 0, })?; must_let!(let ServerMessage::Transition { start_version, end_version, .. } = sync_worker.receive().await?); assert_eq!(start_version.identity, 0); assert_eq!(end_version.identity, 1); sync_worker.shutdown().await?; Ok(()) } #[convex_macro::test_runtime] async fn test_idempotent_mutations(rt: TestRuntime) -> anyhow::Result<()> { // A test that confirms that sending the same mutation twice only causes // it to run once. let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; // Set up a balance of $0 for Alice let name = ConvexValue::try_from("Alice")?; sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 0, ) .await?; must_let!(let ServerMessage::Transition { end_version: v1, modifications, .. } = sync_worker.receive().await?); assert!(modifications.is_empty()); // Deposit $5 in a mutation let (result, _) = sync_worker .mutation( "sync:deposit", assert_obj!("name" => name.clone(), "balance" => 5.0), 1, ) .await?; assert_eq!(result, ConvexValue::try_from("Alice's balance is now 5")?); must_let!(let ServerMessage::Transition { start_version: v2, end_version: v3, modifications, .. } = sync_worker.receive().await?); assert_eq!(v1, v2); assert!(v2 < v3); assert!(modifications.is_empty()); // Rerun the same mutation (with the same mutation ID) and expect the same // result. let (result, _) = sync_worker .mutation( "sync:deposit", assert_obj!("name" => name.clone(), "balance" => 5.0), 1, ) .await?; assert_eq!(result, ConvexValue::try_from("Alice's balance is now 5")?); Ok(()) } #[convex_macro::test_runtime] async fn test_value_deduplication_success(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let name = ConvexValue::try_from("Alice")?; let (result, _) = sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 0, ) .await?; assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { .. } = sync_worker.receive().await?); let query = Query { query_id: QueryId::new(0), udf_path: "sync:discardQueryResults".parse()?, args: SerializedArgs::from_args(vec![ assert_obj!("throwError" => ConvexValue::from(false)).into(), ])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(query)], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { modifications, .. } = sync_worker.receive().await?); assert_eq!(modifications.len(), 1, "{modifications:?}"); must_let!(let StateModification::QueryUpdated { query_id, value, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert_eq!(value.unpack(), assert_val!("hi")); // Insert a new value into the "accounts" table, which will invalidate the query // but not change its result. let name = ConvexValue::try_from("Bob")?; sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 1, ) .await?; must_let!(let ServerMessage::Transition { modifications, .. } = sync_worker.receive().await?); assert!(modifications.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn test_value_deduplication_failure(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let name = ConvexValue::try_from("Alice")?; let (result, _) = sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 0, ) .await?; assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { .. } = sync_worker.receive().await?); let query = Query { query_id: QueryId::new(0), udf_path: "sync:discardQueryResults".parse()?, args: SerializedArgs::from_args(vec![ assert_obj!("throwError" => ConvexValue::from(true)).into(), ])?, journal: None, component_path: None, }; let msg = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(query)], }; sync_worker.send(msg)?; must_let!(let ServerMessage::Transition { modifications, .. } = sync_worker.receive().await?); assert_eq!(modifications.len(), 1, "{modifications:?}"); must_let!(let StateModification::QueryFailed { query_id, error_message, .. } = &modifications[0]); assert_eq!(*query_id, QueryId::new(0)); assert!(error_message.contains("Uncaught Error: bye")); // Insert a new value into the "accounts" table, which will invalidate the query // but not change its result. let name = ConvexValue::try_from("Bob")?; sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 1, ) .await?; must_let!(let ServerMessage::Transition { modifications, .. } = sync_worker.receive().await?); assert!(modifications.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn test_udf_cache_out_of_order(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let mut sync_worker = test.new_worker()?; let name = ConvexValue::try_from("Alice")?; let (result, _) = sync_worker .mutation( "sync:initialize", assert_obj!("name" => name.clone(), "balance" => 0.0), 0, ) .await?; assert_eq!(result, ConvexValue::Null); must_let!(let ServerMessage::Transition { .. } = sync_worker.receive().await?); let ts1 = *test.application.now_ts_for_reads(); sync_worker .mutation( "sync:deposit", assert_obj!("name" => name.clone(), "balance" => 5.0), 1, ) .await?; let ts2 = *test.application.now_ts_for_reads(); let path = CanonicalizedComponentFunctionPath { component: ComponentPath::test_user(), udf_path: "sync:accountBalance".parse()?, }; let result1 = test .application .read_only_udf_at_ts( RequestId::new(), PublicFunctionPath::Component(path.clone()), vec![assert_obj!("name" => name.clone()).into()], Identity::Unknown(None), ts2, None, FunctionCaller::SyncWorker(ClientVersion::unknown()), ) .await?; assert_eq!(result1.result?.unpack(), ConvexValue::from(5.0)); let result2 = test .application .read_only_udf_at_ts( RequestId::new(), PublicFunctionPath::Component(path), vec![assert_obj!("name" => name).into()], Identity::Unknown(None), ts1, None, FunctionCaller::SyncWorker(ClientVersion::unknown()), ) .await?; assert_eq!(result2.result?.unpack(), ConvexValue::from(0.0)); Ok(()) } #[convex_macro::test_runtime] async fn test_max_observed_timestamp(rt: TestRuntime) -> anyhow::Result<()> { let test = SyncTest::new(rt).await?; let config = SyncWorkerConfig::default(); let mut sync_worker = test.new_worker_with_config(config, Some(Timestamp::MAX))?; must_let!(let Err(err) = sync_worker.receive().await); assert!( format!("{err}") .contains("Sync worker unexpectedly hung up: Some(Client has observed a timestamp"), "{err}" ); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server