Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs35.9 kB
use std::{ collections::BTreeMap, convert::Infallible, sync::Arc, }; use convex_sync_types::{ AuthenticationToken, UdfPath, UserIdentityAttributes, }; #[cfg(doc)] use futures::Stream; use futures::StreamExt; use tokio::{ sync::{ broadcast, mpsc, oneshot, }, task::JoinHandle, }; use tokio_stream::wrappers::BroadcastStream; use url::Url; use self::worker::AuthenticateRequest; #[cfg(doc)] use crate::SubscriberId; use crate::{ base_client::{ BaseConvexClient, QueryResults, }, client::{ subscription::{ QuerySetSubscription, QuerySubscription, }, worker::{ worker, ActionRequest, ClientRequest, MutationRequest, SubscribeRequest, }, }, sync::{ web_socket_manager::WebSocketManager, SyncProtocol, WebSocketState, }, value::Value, FunctionResult, }; pub mod subscription; mod worker; const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION"); /// An asynchronous client to interact with a specific project to perform /// mutations and manage query subscriptions using [`tokio`]. /// /// The Convex client requires a deployment url, /// which can be found in the [dashboard](https://dashboard.convex.dev/) settings tab. /// /// ```no_run /// use convex::ConvexClient; /// use futures::StreamExt; /// /// #[tokio::main] /// async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let mut sub = client.subscribe("listMessages", maplit::btreemap!{}).await?; /// while let Some(result) = sub.next().await { /// println!("{result:?}"); /// } /// Ok(()) /// } /// ``` /// /// The [`ConvexClient`] internally holds a connection and a [`tokio`] /// background task to manage it. It is advised that you create one and /// **reuse** it. You can safely clone with [`ConvexClient::clone()`] to share /// the connection and outstanding subscriptions. /// /// ## Examples /// For example code, please refer to the examples directory. pub struct ConvexClient { listen_handle: Option<Arc<JoinHandle<Infallible>>>, request_sender: mpsc::UnboundedSender<ClientRequest>, watch_receiver: broadcast::Receiver<QueryResults>, } /// Clone the [`ConvexClient`], sharing the connection and outstanding /// subscriptions. impl Clone for ConvexClient { fn clone(&self) -> Self { Self { listen_handle: self.listen_handle.clone(), request_sender: self.request_sender.clone(), watch_receiver: self.watch_receiver.resubscribe(), } } } /// Drop the [`ConvexClient`]. When the final reference to the [`ConvexClient`] /// is dropped, the connection is cleaned up. impl Drop for ConvexClient { fn drop(&mut self) { if let Ok(j_handle) = Arc::try_unwrap( self.listen_handle .take() .expect("INTERNAL BUG: listen handle should never be none"), ) { j_handle.abort() } } } impl ConvexClient { /// Constructs a new client for communicating with `deployment_url`. /// /// ```no_run /// # use convex::ConvexClient; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// # Ok(()) /// # } /// ``` pub async fn new(deployment_url: &str) -> anyhow::Result<Self> { ConvexClient::new_from_builder(ConvexClientBuilder::new(deployment_url)).await } #[doc(hidden)] pub async fn new_from_builder(builder: ConvexClientBuilder) -> anyhow::Result<Self> { let client_id = builder .client_id .unwrap_or_else(|| format!("rust-{}", VERSION.unwrap_or("unknown"))); let ws_url = deployment_to_ws_url(builder.deployment_url.as_str().try_into()?)?; // Channels for the `listen` background thread let (response_sender, response_receiver) = mpsc::channel(1); let (request_sender, request_receiver) = mpsc::unbounded_channel(); // Listener for when each transaction completes let (watch_sender, watch_receiver) = broadcast::channel(1); let base_client = BaseConvexClient::new(); let protocol = WebSocketManager::open( ws_url, response_sender, builder.on_state_change, client_id.as_str(), ) .await?; let listen_handle = tokio::spawn(worker( response_receiver, request_receiver, watch_sender, base_client, protocol, )); let client = ConvexClient { listen_handle: Some(Arc::new(listen_handle)), request_sender, watch_receiver, }; Ok(client) } /// Subscribe to the results of query `name` called with `args`. /// /// Returns a [`QuerySubscription`] which implements [`Stream`]< /// [`FunctionResult`]>. A new value appears on the stream each /// time the query function produces a new result. /// /// The subscription is automatically unsubscribed when it is dropped. /// /// ```no_run /// # use convex::ConvexClient; /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let mut sub = client.subscribe("listMessages", maplit::btreemap!{}).await?; /// while let Some(result) = sub.next().await { /// println!("{result:?}"); /// } /// # Ok(()) /// # } pub async fn subscribe( &mut self, name: &str, args: BTreeMap<String, Value>, ) -> anyhow::Result<QuerySubscription> { let (tx, rx) = oneshot::channel(); let udf_path = name.parse()?; let request = SubscribeRequest { udf_path, args }; self.request_sender.send(ClientRequest::Subscribe( request, tx, self.request_sender.clone(), ))?; let res = rx.await?; Ok(res) } /// Make a oneshot request to a query `name` with `args`. /// /// Returns a [`FunctionResult`] representing the result of the query. /// /// This method is syntactic sugar for waiting for a single result on /// a subscription. /// It is equivalent to `client.subscribe(name, /// args).await?.next().unwrap()` /// /// ```no_run /// # use convex::ConvexClient; /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let result = client.query("listMessages", maplit::btreemap!{}).await?; /// println!("{result:?}"); /// # Ok(()) /// # } pub async fn query( &mut self, name: &str, args: BTreeMap<String, Value>, ) -> anyhow::Result<FunctionResult> { Ok(self .subscribe(name, args) .await? .next() .await .expect("INTERNAL BUG: Convex Client dropped prematurely.")) } /// Perform a mutation `name` with `args` and return a future /// containing the return value of the mutation once it completes. /// /// ```no_run /// # use convex::ConvexClient; /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let result = client.mutation("sendMessage", maplit::btreemap!{ /// "body".into() => "Let it be.".into(), /// "author".into() => "The Beatles".into(), /// }).await?; /// println!("{result:?}"); /// # Ok(()) /// # } pub async fn mutation( &mut self, name: &str, args: BTreeMap<String, Value>, ) -> anyhow::Result<FunctionResult> { let (tx, rx) = oneshot::channel(); let udf_path: UdfPath = name.parse()?; let request = MutationRequest { udf_path, args }; self.request_sender .send(ClientRequest::Mutation(request, tx))?; let res = rx.await?; Ok(res.await?) } /// Perform an action `name` with `args` and return a future /// containing the return value of the action once it completes. /// /// ```no_run /// # use convex::ConvexClient; /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let result = client.action("sendGif", maplit::btreemap!{ /// "body".into() => "Tatooine Sunrise.".into(), /// "author".into() => "Luke Skywalker".into(), /// }).await?; /// println!("{result:?}"); /// # Ok(()) /// # } pub async fn action( &mut self, name: &str, args: BTreeMap<String, Value>, ) -> anyhow::Result<FunctionResult> { let (tx, rx) = oneshot::channel(); let udf_path: UdfPath = name.parse()?; let request = ActionRequest { udf_path, args }; self.request_sender .send(ClientRequest::Action(request, tx))?; let res = rx.await?; Ok(res.await?) } /// Get a consistent view of the results of multiple queries (query set). /// /// Returns a [`QuerySetSubscription`] which /// implements [`Stream`]<[`QueryResults`]>. /// Each item in the stream contains a consistent view /// of the results of all the queries in the query set. /// /// Queries can be added to the query set via [`ConvexClient::subscribe`]. /// Queries can be removed from the query set via dropping the /// [`QuerySubscription`] token returned by [`ConvexClient::subscribe`]. /// /// /// [`QueryResults`] is a copy-on-write mapping from [`SubscriberId`] to /// its latest result [`Value`]. /// /// ```no_run /// # use convex::ConvexClient; /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?; /// let mut watch = client.watch_all(); /// let sub1 = client.subscribe("listMessages", maplit::btreemap!{ /// "channel".into() => 1.into(), /// }).await?; /// let sub2 = client.subscribe("listMessages", maplit::btreemap!{ /// "channel".into() => 1.into(), /// }).await?; /// # Ok(()) /// # } pub fn watch_all(&self) -> QuerySetSubscription { QuerySetSubscription::new(BroadcastStream::new(self.watch_receiver.resubscribe())) } /// Set auth for use when calling Convex functions. /// /// Set it with a token that you get from your auth provider via their login /// flow. If `None` is passed as the token, then auth is unset (logging /// out). pub async fn set_auth(&mut self, token: Option<String>) { let req = AuthenticateRequest { token: match token { None => AuthenticationToken::None, Some(token) => AuthenticationToken::User(token), }, }; self.request_sender .send(ClientRequest::Authenticate(req)) .expect("INTERNAL BUG: Worker has gone away"); } /// Set admin auth for use when calling Convex functions as a deployment /// admin. Not typically required. /// /// You can get a deploy_key from the Convex dashboard's deployment settings /// page. Deployment admins can act as users as part of their /// development flow to see how a function would act. #[doc(hidden)] pub async fn set_admin_auth( &mut self, deploy_key: String, acting_as: Option<UserIdentityAttributes>, ) { let req = AuthenticateRequest { token: AuthenticationToken::Admin(deploy_key, acting_as), }; self.request_sender .send(ClientRequest::Authenticate(req)) .expect("INTERNAL BUG: Worker has gone away"); } } fn deployment_to_ws_url(mut deployment_url: Url) -> anyhow::Result<Url> { let ws_scheme = match deployment_url.scheme() { "http" | "ws" => "ws", "https" | "wss" => "wss", scheme => anyhow::bail!("Unknown scheme {scheme}. Expected http or https."), }; deployment_url .set_scheme(ws_scheme) .expect("Scheme not supported"); deployment_url.set_path("api/sync"); Ok(deployment_url) } /// A builder for creating a [`ConvexClient`] with custom configuration. pub struct ConvexClientBuilder { deployment_url: String, client_id: Option<String>, on_state_change: Option<mpsc::Sender<WebSocketState>>, } impl ConvexClientBuilder { /// Create a new [`ConvexClientBuilder`] with the given deployment URL. pub fn new(deployment_url: &str) -> Self { Self { deployment_url: deployment_url.to_string(), client_id: None, on_state_change: None, } } /// Set a custom client ID for this client. pub fn with_client_id(mut self, client_id: &str) -> Self { self.client_id = Some(client_id.to_string()); self } /// Set a channel to be notified of changes to the WebSocket connection /// state. pub fn with_on_state_change(mut self, on_state_change: mpsc::Sender<WebSocketState>) -> Self { self.on_state_change = Some(on_state_change); self } /// Build the [`ConvexClient`] with the configured options. /// /// ```no_run /// # use convex::ConvexClientBuilder; /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { /// let client = ConvexClientBuilder::new("https://cool-music-123.convex.cloud").build().await?; /// # Ok(()) /// # } /// ``` pub async fn build(self) -> anyhow::Result<ConvexClient> { ConvexClient::new_from_builder(self).await } } #[cfg(test)] pub mod tests { use std::{ str::FromStr, sync::Arc, time::Duration, }; use convex_sync_types::{ types::SerializedArgs, AuthenticationToken, ClientMessage, LogLinesMessage, Query, QueryId, QuerySetModification, SessionId, StateModification, StateVersion, UdfPath, UserIdentityAttributes, }; use futures::StreamExt; use maplit::btreemap; use pretty_assertions::assert_eq; use serde_json::json; use tokio::sync::{ broadcast, mpsc, }; use super::ConvexClient; use crate::{ base_client::FunctionResult, client::{ deployment_to_ws_url, worker::worker, BaseConvexClient, }, sync::{ testing::TestProtocolManager, ServerMessage, SyncProtocol, }, value::Value, QuerySubscription, }; impl ConvexClient { pub async fn with_test_protocol() -> anyhow::Result<(Self, TestProtocolManager)> { let _ = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init(); // Channels for the `listen` background thread let (response_sender, response_receiver) = mpsc::channel(1); let (request_sender, request_receiver) = mpsc::unbounded_channel(); // Listener for when each transaction completes let (watch_sender, watch_receiver) = broadcast::channel(1); let test_protocol = TestProtocolManager::open( "ws://test.com".parse()?, response_sender, None, "rust-0.0.1", ) .await?; let base_client = BaseConvexClient::new(); let listen_handle = tokio::spawn(worker( response_receiver, request_receiver, watch_sender, base_client, test_protocol.clone(), )); let client = ConvexClient { listen_handle: Some(Arc::new(listen_handle)), request_sender, watch_receiver, }; Ok((client, test_protocol)) } } fn fake_mutation_response(result: FunctionResult) -> (ServerMessage, ServerMessage) { let (transition_response, new_version) = fake_transition(StateVersion::initial(), vec![]); let mutation_response = ServerMessage::MutationResponse { request_id: 0, result: result.into(), ts: Some(new_version.ts), log_lines: LogLinesMessage(vec![]), }; (mutation_response, transition_response) } fn fake_action_response(result: FunctionResult) -> ServerMessage { ServerMessage::ActionResponse { request_id: 0, result: result.into(), log_lines: LogLinesMessage(vec![]), } } fn fake_transition( start_version: StateVersion, modifications: Vec<(QueryId, Value)>, ) -> (ServerMessage, StateVersion) { let end_version = StateVersion { ts: start_version.ts.succ().expect("Succ failed"), ..start_version }; ( ServerMessage::Transition { start_version, end_version, modifications: modifications .into_iter() .map(|(query_id, value)| StateModification::QueryUpdated { query_id, value, journal: None, log_lines: LogLinesMessage(vec![]), }) .collect(), client_clock_skew: None, server_ts: None, }, end_version, ) } #[tokio::test] async fn test_mutation() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; test_protocol.take_sent().await; let mut res = tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await }); test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Mutation { request_id: 0, udf_path: UdfPath::from_str("incrementCounter")?, args: SerializedArgs::from_args(vec![json!({})])?, component_path: None, }] ); let mutation_result = FunctionResult::Value(Value::Null); let (mut_resp, transition) = fake_mutation_response(mutation_result.clone()); test_protocol.fake_server_response(mut_resp).await?; // Should not be ready until transition completes. tokio::time::timeout(Duration::from_millis(50), &mut res) .await .unwrap_err(); // Once transition is sent, it is ready. test_protocol.fake_server_response(transition).await?; assert_eq!(res.await??, mutation_result); Ok(()) } #[tokio::test] async fn test_mutation_error() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; test_protocol.take_sent().await; let res = tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await }); test_protocol.wait_until_n_messages_sent(1).await; test_protocol.take_sent().await; let mutation_result = FunctionResult::ErrorMessage("JEEPERS".into()); let (mut_resp, _transition) = fake_mutation_response(mutation_result.clone()); test_protocol.fake_server_response(mut_resp).await?; // Errors should be ready immediately (no transition needed) assert_eq!(res.await??, mutation_result); Ok(()) } #[tokio::test] async fn test_action() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; test_protocol.take_sent().await; let action_result = FunctionResult::Value(Value::Null); let server_message = fake_action_response(action_result.clone()); let res = tokio::spawn(async move { client.action("runAction:hello", btreemap! {}).await }); test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Action { request_id: 0, udf_path: UdfPath::from_str("runAction:hello")?, args: SerializedArgs::from_args(vec![json!({})])?, component_path: None, }] ); test_protocol.fake_server_response(server_message).await?; assert_eq!(res.await??, action_result); Ok(()) } #[tokio::test] async fn test_auth() -> anyhow::Result<()> { let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?; test_protocol.take_sent().await; // Set token client.set_auth(Some("myauthtoken".into())).await; test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Authenticate { base_version: 0, token: AuthenticationToken::User("myauthtoken".into()), }] ); // Unset token client.set_auth(None).await; test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Authenticate { base_version: 1, token: AuthenticationToken::None, }] ); // Set admin auth client.set_admin_auth("myadminauth".into(), None).await; test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Authenticate { base_version: 2, token: AuthenticationToken::Admin("myadminauth".into(), None), }] ); // Set admin auth acting as user let acting_as = UserIdentityAttributes { name: Some("Barbara Liskov".into()), ..Default::default() }; client .set_admin_auth("myadminauth".into(), Some(acting_as.clone())) .await; test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::Authenticate { base_version: 3, token: AuthenticationToken::Admin("myadminauth".into(), Some(acting_as)), }] ); Ok(()) } #[tokio::test] async fn test_client_single_subscription() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?; let query_id = subscription1.query_id(); assert_eq!( test_protocol.take_sent().await, vec![ ClientMessage::Connect { session_id: SessionId::nil(), connection_count: 0, last_close_reason: "InitialConnect".to_string(), max_observed_timestamp: None, client_ts: None, }, ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(Query { query_id, udf_path: "getValue1".parse()?, args: SerializedArgs::from_args(vec![json!({})])?, journal: None, component_path: None, })] }, ] ); test_protocol .fake_server_response( fake_transition( StateVersion::initial(), vec![(subscription1.query_id(), 10.into())], ) .0, ) .await?; assert_eq!( subscription1.next().await, Some(FunctionResult::Value(10.into())) ); assert_eq!( client.query("getValue1", btreemap! {}).await?, FunctionResult::Value(10.into()) ); drop(subscription1); test_protocol.wait_until_n_messages_sent(1).await; assert_eq!( test_protocol.take_sent().await, vec![ClientMessage::ModifyQuerySet { base_version: 1, new_version: 2, modifications: vec![QuerySetModification::Remove { query_id }], }] ); Ok(()) } #[tokio::test] async fn test_client_subscribe_unsubscribe_subscribe() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; let subscription1b: QuerySubscription; { // This subscription goes out of scope and unsubscribes at the end of this // block. The internal num_subscribers value gets decremented. let _ignored = client.subscribe("getValue1", btreemap! {}).await?; subscription1b = client.subscribe("getValue1", btreemap! {}).await?; } // In the buggy scenario, this subscription gets an ID via num_subscribers ID // that matches subscription1b. That triggers a panic. let subscription1c = client.subscribe("getValue1", btreemap! {}).await?; test_protocol.take_sent().await; let mut watch = client.watch_all(); test_protocol .fake_server_response( fake_transition(StateVersion::initial(), vec![(QueryId::new(0), 10.into())]).0, ) .await?; let results = watch.next().await.expect("Watch should have results"); assert_eq!( results.get(&subscription1b), Some(&FunctionResult::Value(10.into())) ); assert_eq!( results.get(&subscription1c), Some(&FunctionResult::Value(10.into())) ); Ok(()) } #[tokio::test] async fn test_client_consistent_view_watch() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; let subscription1 = client.subscribe("getValue1", btreemap! {}).await?; let subscription2a = client.subscribe("getValue2", btreemap! {}).await?; let subscription2b = client.subscribe("getValue2", btreemap! {}).await?; let subscription3 = client.subscribe("getValue3", btreemap! {}).await?; test_protocol.take_sent().await; let mut watch = client.watch_all(); test_protocol .fake_server_response( fake_transition( StateVersion::initial(), vec![(QueryId::new(0), 10.into()), (QueryId::new(1), 20.into())], ) .0, ) .await?; let results = watch.next().await.expect("Watch should have results"); assert_eq!( results.get(&subscription1), Some(&FunctionResult::Value(10.into())) ); assert_eq!( results.get(&subscription2a), Some(&FunctionResult::Value(20.into())) ); assert_eq!( results.get(&subscription2b), Some(&FunctionResult::Value(20.into())) ); assert_eq!(results.get(&subscription3), None); assert_eq!( results.iter().collect::<Vec<_>>(), vec![ (subscription1.id(), Some(&FunctionResult::Value(10.into()))), (subscription2a.id(), Some(&FunctionResult::Value(20.into()))), (subscription2b.id(), Some(&FunctionResult::Value(20.into()))), (subscription3.id(), None,), ] ); // Ideally a new watch should immediately give you results, but we don't have // that yet. Need to replace tokio::broadcast with something that buffers 1 // item. //let mut watch2 = client.watch(); //let results = watch.next().await.expect("Watch should have results"); //assert_eq!(results.len(), 3); Ok(()) } #[tokio::test] async fn test_drop_client() -> anyhow::Result<()> { let (mut client, _test_protocol) = ConvexClient::with_test_protocol().await?; let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?; drop(client); tokio::task::yield_now().await; assert!(subscription1.next().await.is_none()); drop(subscription1); Ok(()) } #[tokio::test] async fn test_client_separate_queries() -> anyhow::Result<()> { let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?; // All three of these should be considered separate let subscription1 = client.subscribe("getValue1", btreemap! {}).await?; let subscription2 = client.subscribe("getValue2", btreemap! {}).await?; let subscription3 = client .subscribe("getValue2", btreemap! {"hello".into() => "world".into()}) .await?; assert_ne!(subscription1.query_id(), subscription2.query_id()); assert_ne!(subscription2.query_id(), subscription3.query_id()); assert_eq!( test_protocol.take_sent().await, vec![ ClientMessage::Connect { session_id: SessionId::nil(), connection_count: 0, last_close_reason: "InitialConnect".to_string(), max_observed_timestamp: None, client_ts: None, }, ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(Query { query_id: subscription1.query_id(), udf_path: "getValue1".parse()?, args: SerializedArgs::from_args(vec![json!({})])?, journal: None, component_path: None, })] }, ClientMessage::ModifyQuerySet { base_version: 1, new_version: 2, modifications: vec![QuerySetModification::Add(Query { query_id: subscription2.query_id(), udf_path: "getValue2".parse()?, args: SerializedArgs::from_args(vec![json!({})])?, journal: None, component_path: None, })] }, ClientMessage::ModifyQuerySet { base_version: 2, new_version: 3, modifications: vec![QuerySetModification::Add(Query { query_id: subscription3.query_id(), udf_path: "getValue2".parse()?, args: SerializedArgs::from_args(vec![json!({"hello": "world"})])?, journal: None, component_path: None, })] }, ] ); Ok(()) } #[tokio::test] async fn test_client_two_identical_queries() -> anyhow::Result<()> { let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?; // These two should be considered the same query. let mut subscription1 = client.subscribe("getValue", btreemap! {}).await?; let mut subscription2 = client.subscribe("getValue", btreemap! {}).await?; assert_ne!(subscription1.subscriber_id, subscription2.subscriber_id); assert_eq!(subscription1.query_id(), subscription2.query_id()); let query_id = subscription1.query_id(); assert_eq!( test_protocol.take_sent().await, vec![ ClientMessage::Connect { session_id: SessionId::nil(), connection_count: 0, last_close_reason: "InitialConnect".to_string(), max_observed_timestamp: None, client_ts: None, }, ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications: vec![QuerySetModification::Add(Query { query_id, udf_path: "getValue".parse()?, args: SerializedArgs::from_args(vec![json!({})])?, journal: None, component_path: None, })] }, ] ); let mut version = StateVersion::initial(); for i in 1..5 { let (transition, new_version) = fake_transition(version, vec![(query_id, i.into())]); test_protocol.fake_server_response(transition).await?; version = new_version; assert_eq!( subscription1.next().await, Some(FunctionResult::Value(i.into())) ); assert_eq!( subscription2.next().await, Some(FunctionResult::Value(i.into())) ); } // A new subscription should auto-initialize with the value if available let mut subscription3 = client.subscribe("getValue", btreemap! {}).await?; assert_eq!( subscription3.next().await, Some(FunctionResult::Value(4.into())), ); // Dropping sub1 and sub2 should still maintain subscription drop(subscription1); drop(subscription2); let (transition, _new_version) = fake_transition(version, vec![(query_id, 5.into())]); test_protocol.fake_server_response(transition).await?; assert_eq!( subscription3.next().await, Some(FunctionResult::Value(5.into())), ); Ok(()) } #[test] fn test_deployment_url() -> anyhow::Result<()> { assert_eq!( deployment_to_ws_url("http://flying-shark-123.convex.cloud".parse()?)?.to_string(), "ws://flying-shark-123.convex.cloud/api/sync", ); assert_eq!( deployment_to_ws_url("https://flying-shark-123.convex.cloud".parse()?)?.to_string(), "wss://flying-shark-123.convex.cloud/api/sync", ); assert_eq!( deployment_to_ws_url("ws://flying-shark-123.convex.cloud".parse()?)?.to_string(), "ws://flying-shark-123.convex.cloud/api/sync", ); assert_eq!( deployment_to_ws_url("wss://flying-shark-123.convex.cloud".parse()?)?.to_string(), "wss://flying-shark-123.convex.cloud/api/sync", ); assert_eq!( deployment_to_ws_url("ftp://flying-shark-123.convex.cloud".parse()?) .unwrap_err() .to_string(), "Unknown scheme ftp. Expected http or https.", ); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server