Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs26.7 kB
//! The synchronous state machine for Convex. It's //! recommended to use the higher level [`ConvexClient`] unless you are building //! a framework. //! //! See docs for [`BaseConvexClient`]. use std::{ cmp, collections::{ BTreeMap, BTreeSet, VecDeque, }, }; use convex_sync_types::{ types::SerializedArgs, AuthenticationToken, CanonicalizedUdfPath, ClientMessage, IdentityVersion, QueryId, QuerySetModification, QuerySetVersion, SessionRequestSeqNumber, StateModification, StateVersion, Timestamp, UdfPath, }; use serde_json::json; use tokio::sync::oneshot; #[cfg(doc)] use crate::ConvexClient; use crate::{ convex_logs, sync::{ ReconnectProtocolReason, ServerMessage, }, value::Value, ConvexError, }; mod request_manager; use request_manager::{ RequestId, RequestManager, }; mod query_result; pub use query_result::{ FunctionResult, QueryResults, }; use self::request_manager::RequestType; #[derive(Clone, Eq, PartialEq, PartialOrd, Ord, Debug)] struct QueryToken(String); #[derive(Clone, Debug)] struct LocalQuery { id: QueryId, canonicalized_udf_path: CanonicalizedUdfPath, args: BTreeMap<String, Value>, num_subscribers: usize, // TODO: remove /// A unique index value for each subscription to this query. /// /// Must be incremented each time a new subscription is added, and never /// decremented. subscription_index: usize, } #[derive(Clone, Debug)] struct Query { result: FunctionResult, _udf_path: CanonicalizedUdfPath, _args: BTreeMap<String, Value>, } /// An identifier for a single subscriber to a query. #[derive(Copy, Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord, Hash)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub struct SubscriberId(QueryId, usize); impl SubscriberId { #[cfg(test)] pub fn query_id(&self) -> QueryId { self.0 } } fn serialize_path_and_args(udf_path: UdfPath, args: BTreeMap<String, Value>) -> QueryToken { let json_path: String = udf_path.canonicalize().into(); let json_args: serde_json::Value = Value::Array(vec![Value::Object(args)]).into(); let json = json!({ "udfPath": json_path, "args": json_args, }); QueryToken(json.to_string()) } #[derive(Clone, Default)] struct LocalSyncState { next_query_id: QueryId, query_set_version: QuerySetVersion, query_set: BTreeMap<QueryToken, LocalQuery>, query_id_to_token: BTreeMap<QueryId, QueryToken>, latest_results: QueryResults, auth_token: AuthenticationToken, identity_version: IdentityVersion, } impl LocalSyncState { fn subscribe( &mut self, udf_path: UdfPath, args: BTreeMap<String, Value>, ) -> (Option<ClientMessage>, SubscriberId) { let canonicalized_udf_path = udf_path.clone().canonicalize(); let query_token = serialize_path_and_args(udf_path.clone(), args.clone()); if let Some(existing_entry) = self.query_set.get_mut(&query_token) { // This is a new subscription to an existing query. existing_entry.num_subscribers += 1; existing_entry.subscription_index += 1; let query_id = existing_entry.id; let subscription = SubscriberId(query_id, existing_entry.subscription_index); let prev = self.latest_results.subscribers.insert(subscription); assert!(prev.is_none(), "INTERNAL BUG: Subscriber ID already taken."); return (None, subscription); } let query_id = self.next_query_id; self.next_query_id = QueryId::new(self.next_query_id.get_id() + 1); let base_version = self.query_set_version; self.query_set_version += 1; let new_version = self.query_set_version; let add = QuerySetModification::Add(convex_sync_types::Query { query_id, udf_path, args: SerializedArgs::from_args(vec![Value::Object(args.clone()).into()]) .expect("Could not serialize query arguments"), journal: None, component_path: None, }); let message = ClientMessage::ModifyQuerySet { base_version, new_version, modifications: vec![add], }; let query = LocalQuery { id: query_id, canonicalized_udf_path, args, num_subscribers: 1, subscription_index: 0, }; self.query_set.insert(query_token.clone(), query); self.query_id_to_token.insert(query_id, query_token.clone()); let subscription = SubscriberId(query_id, 0); let prev = self.latest_results.subscribers.insert(subscription); assert!(prev.is_none(), "INTERNAL BUG: Subscriber ID already taken."); (Some(message), subscription) } fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Option<ClientMessage> { let query_id = self .latest_results .subscribers .remove(&subscriber_id) .expect("INTERNAL BUG: Dropped unknown Subscriber ID") .0; let query_token = match self.query_token(query_id) { None => panic!("INTERNAL BUG: Unknown query id {query_id}"), Some(t) => t, }; let local_query = match self.query_set.get_mut(&query_token) { None => panic!("INTERNAL BUG: No query found for query token {query_token:?}",), Some(q) => q, }; // Update local state if local_query.num_subscribers > 1 { local_query.num_subscribers -= 1; return None; } self.query_set.remove(&query_token); self.query_id_to_token.remove(&query_id); let base_version = self.query_set_version; self.query_set_version += 1; let new_version = self.query_set_version; let remove = QuerySetModification::Remove { query_id }; Some(ClientMessage::ModifyQuerySet { base_version, new_version, modifications: vec![remove], }) } fn query_token(&self, query_id: QueryId) -> Option<QueryToken> { self.query_id_to_token.get(&query_id).cloned() } fn query_args(&self, query_id: QueryId) -> Option<BTreeMap<String, Value>> { Some( self.query_set .get(&self.query_token(query_id)?)? .args .clone(), ) } fn query_path(&self, query_id: QueryId) -> Option<CanonicalizedUdfPath> { Some( self.query_set .get(&self.query_token(query_id)?)? .canonicalized_udf_path .clone(), ) } fn set_auth(&mut self, token: AuthenticationToken) -> ClientMessage { self.auth_token = token.clone(); let base_version = self.identity_version; self.identity_version += 1; ClientMessage::Authenticate { base_version, token, } } fn restart(&mut self) -> Vec<ClientMessage> { let mut modifications = Vec::new(); for local_query in self.query_set.values() { let add = QuerySetModification::Add(convex_sync_types::Query { query_id: local_query.id, udf_path: local_query.canonicalized_udf_path.clone().into(), args: SerializedArgs::from_args(vec![ Value::Object(local_query.args.clone()).into() ]) .expect("Could not serialize query arguments"), journal: None, component_path: None, }); modifications.push(add) } self.query_set_version = 1; let query_set = ClientMessage::ModifyQuerySet { base_version: 0, new_version: 1, modifications, }; self.identity_version = 0; if self.auth_token == AuthenticationToken::None { return vec![query_set]; }; let authenticate = ClientMessage::Authenticate { base_version: 0, token: self.auth_token.clone(), }; self.identity_version += 1; vec![authenticate, query_set] } } #[derive(Debug)] struct RemoteQuerySet { version: StateVersion, remote_query_set: BTreeMap<QueryId, FunctionResult>, } impl RemoteQuerySet { fn new() -> Self { Self { version: StateVersion::initial(), remote_query_set: Default::default(), } } fn transition(&mut self, transition: ServerMessage) -> Result<(), ReconnectProtocolReason> { let ServerMessage::Transition { start_version, end_version, modifications, client_clock_skew: _, server_ts: _, } = transition else { panic!("not transition"); }; if start_version != self.version { tracing::error!( "INTERNAL BUG: Protocol Error start_version {:?} is different from self.version \ {:?}", start_version, self.version ); return Err("StartVersionMismatch".into()); } for modification in modifications { match modification { StateModification::QueryUpdated { query_id, value, log_lines, journal: _, } => { for log_line in log_lines.0 { convex_logs!("{}", log_line); } self.remote_query_set .insert(query_id, FunctionResult::Value(value)); }, StateModification::QueryFailed { query_id, error_message, log_lines, journal: _, error_data, } => { for log_line in log_lines.0 { convex_logs!("{}", log_line); } let function_result = match error_data { Some(v) => FunctionResult::ConvexError(ConvexError { message: error_message, data: v, }), None => FunctionResult::ErrorMessage(error_message), }; self.remote_query_set.insert(query_id, function_result); }, StateModification::QueryRemoved { query_id } => { self.remote_query_set.remove(&query_id); }, } } self.version = end_version; Ok(()) } } #[derive(Default, Debug)] struct OptimisticQueryResults { query_results: BTreeMap<QueryId, Query>, } impl OptimisticQueryResults { fn ingest_query_results_from_server( &mut self, server_query_results: BTreeMap<QueryId, Query>, _optimistic_updates_to_drop: BTreeSet<RequestId>, ) -> BTreeMap<QueryId, FunctionResult> { // TODO: use optimistic_updates_to_drop let old_query_results = self.query_results.clone(); self.query_results = server_query_results; let mut changed_queries = BTreeMap::new(); for (query_id, query) in self.query_results.iter() { let old_query = old_query_results.get(query_id); if match old_query { Some(old_query) => old_query.result != query.result, None => true, } { let result = query.result.clone(); changed_queries.insert(*query_id, result); } } changed_queries } fn query_result(&self, query_id: QueryId) -> Option<FunctionResult> { self.query_results.get(&query_id).map(|q| q.result.clone()) } } /// The synchronous state machine for the `ConvexClient`. It's recommended to /// use the higher level `ConvexClient` unless you are building a framework. /// /// This struct should be used instead of the `ConvexClient` when you want the /// ability to build consistent client views. For example, in order to use your /// own websocket manager or make a client compatible with another language /// (e.g. Swift or Python). /// /// For the latter use case, we strongly recommend you to take a look at the /// implementation of the `ConvexClient`. The recommended pattern to use an /// [`BaseConvexClient`] is to create a background thread to manage actions on /// queries/mutations and incoming websocket connections, and use that to /// advance the BaseConvexClient's state. /// /// ## Managing Convex State /// The main methods, [`subscribe`](Self::subscribe()), /// [`unsubscribe`](Self::unsubscribe()), and /// [`mutation`](Self::mutation()) directly correspond to its /// equivalent for the external [ConvexClient]. /// /// The only different method is [`get_query`](Self::get_query()), which /// returns the current value for a query given its query id. This method can be /// used to synchronously request the current value, as opposed to a stream of /// values in [`subscribe`](crate::ConvexClient::subscribe()). /// /// **Note: these methods have the side effect of /// adding messages to be sent to the server, so you would need to flush all /// outgoing messages by looping on /// [`pop_next_message`](Self::pop_next_message()) after each call of the above /// functions.** /// /// ## Watching for consistent updates to queries /// To watch for consistent changes in query values, you can add the following /// code to the background thread: /// ```no_run /// use convex::base_client::BaseConvexClient; /// use convex::Value; /// use convex_sync_types::ServerMessage; /// /// fn on_receive_server_message(mut base_client: BaseConvexClient, msg: ServerMessage<Value>) { /// let res = base_client.receive_message(msg).expect("Base client error"); /// if let Some(latest_result_map) = res { /// for (subscriber_id, function_result) in latest_result_map.iter() { /// // Notify components of the updated_value /// } /// } /// } /// ``` /// /// ## Managing Web Socket States /// To manage websocket messages, use /// [`receive_message`](Self::receive_message()) (for incoming messages from the /// server) and [`pop_next_message`](Self::pop_next_message()) (for outgoing /// messages to send to the server). **The [`BaseConvexClient`] does not /// send these messages, so you will have to regularly monitor if there are /// messages to be sent by calling /// [`pop_next_message`](Self::pop_next_message()).** /// /// Additionally, when the websocket reconnects, you should call /// [`resend_ongoing_queries_mutations`](Self::resend_ongoing_queries_mutations()) and loop on /// [`pop_next_message`](Self::pop_next_message()) to resend requests to the /// Server to resubscribe to queries and perform ongoing mutations. /// /// #### [`pop_next_message`](Self::pop_next_message()) should be called after the following methods: /// - [`resend_ongoing_queries_mutations`](Self::resend_ongoing_queries_mutations()) /// - [`subscribe`](Self::unsubscribe()) /// - [`unsubscribe`](Self::unsubscribe()) /// - [`mutation`](Self::unsubscribe()) pub struct BaseConvexClient { state: LocalSyncState, remote_query_set: RemoteQuerySet, optimistic_query_results: OptimisticQueryResults, request_manager: RequestManager, next_request_id: SessionRequestSeqNumber, outgoing_message_queue: VecDeque<ClientMessage>, max_observed_timestamp: Option<Timestamp>, } impl BaseConvexClient { /// Construct a new [`BaseConvexClient`]. pub fn new() -> Self { let request_manager = RequestManager::new(); let state = LocalSyncState::default(); let remote_query_set = RemoteQuerySet::new(); let optimistic_query_results: OptimisticQueryResults = Default::default(); let next_request_id: SessionRequestSeqNumber = 0; BaseConvexClient { request_manager, state, remote_query_set, optimistic_query_results, next_request_id, outgoing_message_queue: VecDeque::new(), max_observed_timestamp: None, } } /// Update state to be subscribed to a query and add subscription request to /// the outgoing message queue. /// /// After calling this, it is highly recommended to loop on /// [`pop_next_message`](Self::pop_next_message()) to flush websocket /// messages to the server. pub fn subscribe(&mut self, udf_path: UdfPath, args: BTreeMap<String, Value>) -> SubscriberId { let (modification, subscription) = self.state.subscribe(udf_path, args); if let Some(modification) = modification { self.outgoing_message_queue.push_back(modification); } subscription } /// Update state to be unsubscribed to a query and add unsubscription /// request to the outgoing message queue. /// /// After calling this, it is highly recommended to loop on /// [`pop_next_message`](Self::pop_next_message()) to flush websocket /// messages to the server. pub fn unsubscribe(&mut self, subscriber_id: SubscriberId) { let unsubscribe_message = self.state.remove_subscriber(subscriber_id); if let Some(message) = unsubscribe_message { self.outgoing_message_queue.push_back(message); } } /// Return the local value of a query. pub fn get_query(&self, query_id: QueryId) -> Option<FunctionResult> { self.local_query_result(query_id) } /// Track mutation and add mutation request to the outgoing message queue. /// /// After calling this, it is highly recommended to loop on /// [`pop_next_message`](Self::pop_next_message()) to flush websocket /// messages to the server. pub fn mutation( &mut self, udf_path: UdfPath, args: BTreeMap<String, Value>, ) -> oneshot::Receiver<FunctionResult> { let request_id = self.next_request_id; self.next_request_id = request_id + 1; tracing::info!("Starting mutation {udf_path} with id {request_id}"); let message = ClientMessage::Mutation { request_id, udf_path, args: SerializedArgs::from_args(vec![Value::Object(args).into()]) .expect("Failed to serialize arguments"), component_path: None, }; let result_receiver = self.request_manager.track_request( &message, RequestId::new(request_id), RequestType::Mutation, ); self.outgoing_message_queue.push_back(message); result_receiver } /// Track action and add action request to the outgoing message queue. /// /// After calling this, it is highly recommended to loop on /// [`pop_next_message`](Self::pop_next_message()) to flush websocket /// messages to the server. pub fn action( &mut self, udf_path: UdfPath, args: BTreeMap<String, Value>, ) -> oneshot::Receiver<FunctionResult> { let request_id = self.next_request_id; self.next_request_id = request_id + 1; tracing::info!("Starting action {udf_path:?} with id {request_id:?}"); let message = ClientMessage::Action { request_id, udf_path, args: SerializedArgs::from_args(vec![Value::Object(args).into()]).unwrap(), component_path: None, }; let result_receiver = self.request_manager.track_request( &message, RequestId::new(request_id), RequestType::Action, ); self.outgoing_message_queue.push_back(message); result_receiver } /// Set auth on the sync protocol. pub fn set_auth(&mut self, token: AuthenticationToken) { let message = self.state.set_auth(token); self.outgoing_message_queue.push_back(message); } /// Pop the next message from the outgoing message queue. /// /// Note that this does not *send* the message because the Internal client /// has no awareness of websockets. After popping the next message, it is /// the caller's responsibility to actually send it. pub fn pop_next_message(&mut self) -> Option<ClientMessage> { self.outgoing_message_queue.pop_front() } fn observe_timestamp(&mut self, ts: Timestamp) { if let Some(max_observed_timestamp) = self.max_observed_timestamp { self.max_observed_timestamp = Some(cmp::max(ts, max_observed_timestamp)); } else { self.max_observed_timestamp = Some(ts); } } /// Returns the maximum timestamp observed by the client. pub fn max_observed_timestamp(&self) -> Option<Timestamp> { self.max_observed_timestamp } /// Given a message from a Server, update the base state accordingly. pub fn receive_message( &mut self, message: ServerMessage, ) -> Result<Option<QueryResults>, ReconnectProtocolReason> { match message { ServerMessage::Transition { end_version, .. } => { self.observe_timestamp(end_version.ts); self.remote_query_set.transition(message)?; let completed_requests = self .request_manager .remove_and_notify_completed(end_version.ts); let changed_query_ids = self.on_query_result_changes(completed_requests)?; for (id, result) in changed_query_ids { self.state.latest_results.results.insert(id, result); } return Ok(Some(self.state.latest_results.clone())); }, ServerMessage::MutationResponse { request_id, result, ts, log_lines, } => { for log_line in log_lines.0 { convex_logs!("{}", log_line); } if let Some(ts) = ts { self.observe_timestamp(ts); } let request_id = RequestId::new(request_id); self.request_manager.update_request( &request_id, RequestType::Mutation, result.into(), ts, )?; }, ServerMessage::AuthError { error_message, base_version, .. } => { tracing::error!( "AuthError: {error_message} for identity version {base_version:?}. Restarting \ protocol." ); return Err(format!( "AuthError: {error_message} for identity version {base_version:?}" )); }, ServerMessage::FatalError { error_message } => { tracing::error!("FatalError: {error_message}. Restarting protocol."); return Err(format!("FatalError: {error_message}")); }, ServerMessage::ActionResponse { request_id, result, log_lines, } => { for log_line in log_lines.0 { convex_logs!("{}", log_line); } let request_id = RequestId::new(request_id); self.request_manager.update_request( &request_id, RequestType::Action, result.into(), None, )?; }, ServerMessage::Ping => { // Do nothing }, } Ok(None) } /// Grab a snapshot of the latest query results to all subscribed queries. pub fn latest_results(&self) -> &QueryResults { &self.state.latest_results } /// Resend all subscribed queries and ongoing mutations. Should be used once /// the websocket closes and reconnects. pub fn resend_ongoing_queries_mutations(&mut self) { let state_restart_messages = self.state.restart(); let mut ongoing_mutation_messages = self.request_manager.restart(); self.remote_query_set = RemoteQuerySet::new(); for state_restart_message in state_restart_messages { self.outgoing_message_queue.push_back(state_restart_message); } self.outgoing_message_queue .append(&mut ongoing_mutation_messages); } fn on_query_result_changes( &mut self, completed_requests: BTreeSet<RequestId>, ) -> Result<BTreeMap<QueryId, FunctionResult>, ReconnectProtocolReason> { let remote_query_results = &self.remote_query_set.remote_query_set; let mut query_id_to_value = BTreeMap::new(); for (query_id, result) in remote_query_results.iter() { let Some(_udf_path) = self.state.query_path(*query_id) else { // It's possible that we've already unsubscribed to this query but // the server hasn't learned about that yet. If so, ignore this one. continue; }; let _args = self .state .query_args(*query_id) .expect("INTERNAL BUG: Query args exist, but not query path."); query_id_to_value.insert( *query_id, Query { result: result.clone(), _udf_path, _args, }, ); } Ok(self .optimistic_query_results .ingest_query_results_from_server(query_id_to_value, completed_requests)) } fn local_query_result(&self, query_id: QueryId) -> Option<FunctionResult> { self.optimistic_query_results.query_result(query_id) } } /// Macro used for piping UDF logs to a custom formatter that exposes /// just the log content, without any additional Rust metadata. #[macro_export] macro_rules! convex_logs { (target: $target:expr, $($arg:tt)+) => { tracing::event!(target: "convex_logs", tracing::Level::DEBUG, $($arg)+); // Additional custom behavior can be added here }; ($($arg:tt)+) => { tracing::event!(target: "convex_logs", tracing::Level::DEBUG, $($arg)+); // Additional custom behavior can be added here }; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server