Skip to main content
Glama

Convex MCP server

Official
by get-convex
worker.rs43.6 kB
use std::{ collections::BTreeMap, sync::{ atomic::{ AtomicUsize, Ordering, }, Arc, }, time::Duration, }; use ::metrics::StatusTimer; use application::{ api::{ ApplicationApi, ExecuteQueryTimestamp, SubscriptionClient, SubscriptionTrait, SubscriptionValidity, }, redaction::{ RedactedJsError, RedactedLogLines, }, RedactedActionError, RedactedMutationError, }; use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentPath, ExportPath, }, fastrace_helpers::get_sampled_span, heap_size::HeapSize, http::ResolvedHostname, knobs::{ SEARCH_INDEXES_UNAVAILABLE_RETRY_DELAY, SYNC_MAX_SEND_TRANSITION_COUNT, }, runtime::{ try_join_buffer_unordered, Runtime, WithTimeout, }, types::{ FunctionCaller, UdfType, }, value::JsonPackedValue, version::ClientVersion, RequestId, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use fastrace::prelude::*; use futures::{ future::{ self, BoxFuture, Fuse, }, select_biased, stream::{ Buffered, FuturesUnordered, }, Future, FutureExt, StreamExt, }; use keybroker::Identity; use maplit::btreemap; use model::session_requests::types::SessionRequestIdentifier; use sync_types::{ ClientMessage, IdentityVersion, QueryId, QuerySetModification, QuerySetVersion, SerializedQueryJournal, SessionId, StateModification, StateVersion, Timestamp, UdfPath, }; use tokio::sync::{ mpsc, mpsc::error::{ SendError, TrySendError, }, }; use tokio_stream::wrappers::ReceiverStream; use crate::{ metrics::{ self, connect_timer, log_action_args_size, log_mutation_args_size, log_query_modification_args_size, modify_query_to_transition_timer, mutation_queue_timer, TypedClientEvent, }, state::SyncState, ServerMessage, }; // Buffer up to a thousand function and mutations executions. const OPERATION_QUEUE_BUFFER_SIZE: usize = 1000; const SYNC_WORKER_PROCESS_TIMEOUT: Duration = Duration::from_secs(60); #[derive(Clone, Debug)] pub struct SyncWorkerConfig { pub client_version: ClientVersion, } impl Default for SyncWorkerConfig { fn default() -> Self { Self { client_version: ClientVersion::unknown(), } } } /// Creates a channel which allows the sender to track the buffer size and /// opt-in to slow down if the buffer becomes too large. pub fn measurable_unbounded_channel() -> (SingleFlightSender, SingleFlightReceiver) { let buffer_size_bytes = Arc::new(AtomicUsize::new(0)); // The channel is used to send/receive "size reduced" notifications. let (size_reduced_tx, size_reduced_rx) = mpsc::channel(1); let (tx, rx) = mpsc::unbounded_channel(); ( SingleFlightSender { inner: tx, transition_count: buffer_size_bytes.clone(), count_reduced_rx: size_reduced_rx, }, SingleFlightReceiver { inner: rx, transition_count: buffer_size_bytes, size_reduced_tx, }, ) } /// Wrapper around UnboundedSender that counts Transition messages, /// allowing single-flighting, i.e. skipping transitions if the client is /// backlogged on receiving them. pub struct SingleFlightSender { inner: mpsc::UnboundedSender<(ServerMessage, tokio::time::Instant)>, transition_count: Arc<AtomicUsize>, count_reduced_rx: mpsc::Receiver<()>, } impl SingleFlightSender { pub fn send( &mut self, msg: (ServerMessage, tokio::time::Instant), ) -> Result<(), SendError<(ServerMessage, tokio::time::Instant)>> { if matches!(&msg.0, ServerMessage::Transition { .. }) { self.transition_count.fetch_add(1, Ordering::SeqCst); } self.inner.send(msg) } pub fn transition_count(&self) -> usize { self.transition_count.load(Ordering::SeqCst) } // Waits until a single message has been received implying the size of the // buffer have been reduced. Note that if multiple messages are received // between calls, this will fire only once. pub async fn message_consumed(&mut self) { self.count_reduced_rx.recv().await; } } pub struct SingleFlightReceiver { inner: mpsc::UnboundedReceiver<(ServerMessage, tokio::time::Instant)>, transition_count: Arc<AtomicUsize>, size_reduced_tx: mpsc::Sender<()>, } impl SingleFlightReceiver { pub async fn next(&mut self) -> Option<(ServerMessage, tokio::time::Instant)> { let result = self.inner.recv().await; if let Some(msg) = &result { if matches!(msg.0, ServerMessage::Transition { .. }) { self.transition_count.fetch_sub(1, Ordering::SeqCst); } // Don't block if channel is full. _ = self.size_reduced_tx.try_send(()); } result } pub fn try_next(&mut self) -> Option<(ServerMessage, tokio::time::Instant)> { let result = self.inner.try_recv().ok(); if let Some(msg) = &result { if matches!(msg.0, ServerMessage::Transition { .. }) { self.transition_count.fetch_sub(1, Ordering::SeqCst); } // Don't block if channel is full. _ = self.size_reduced_tx.try_send(()); } result } } const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15); pub struct SyncWorker<RT: Runtime> { api: Arc<dyn ApplicationApi>, config: SyncWorkerConfig, rt: RT, state: SyncState, host: ResolvedHostname, rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, // Queue of pending functions or mutations. For time being, we only execute // a single one since this is less error prone model for the developer. mutation_futures: Buffered<ReceiverStream<BoxFuture<'static, anyhow::Result<ServerMessage>>>>, mutation_sender: mpsc::Sender<BoxFuture<'static, anyhow::Result<ServerMessage>>>, action_futures: FuturesUnordered<BoxFuture<'static, anyhow::Result<ServerMessage>>>, transition_future: Option<Fuse<BoxFuture<'static, anyhow::Result<TransitionState>>>>, // Has an update been scheduled for the future? update_scheduled: bool, /// If we've seen a FeatureTemporarilyUnavailable error, wait for this /// Future to resolve before retrying unavailable_query_retry_future: Option<Fuse<BoxFuture<'static, ()>>>, /// Timers to track time between handling ModifyQuerySet message and sending /// the Transition with the update modify_query_to_transition_timers: BTreeMap<QuerySetVersion, StatusTimer>, on_connect: Option<(StatusTimer, Box<dyn FnOnce(SessionId) + Send>)>, partition_id: u64, /// The difference between the client's clock and the server's clock, in /// milliseconds. Includes latency between the client and server. client_clock_skew: Option<i64>, } enum QueryResult { Rerun { result: Result<JsonPackedValue, RedactedJsError>, log_lines: RedactedLogLines, journal: SerializedQueryJournal, }, /// Skip returning results of this query because search indexes or table /// summaries are unavailable TemporarilyUnavailable, Refresh, } struct TransitionState { udf_results: Vec<(QueryId, QueryResult, Box<dyn SubscriptionTrait>)>, state_modifications: BTreeMap<QueryId, StateModification<JsonPackedValue>>, current_version: StateVersion, new_version: StateVersion, timer: StatusTimer, temporarily_unavailable: bool, } impl<RT: Runtime> SyncWorker<RT> { pub fn new( api: Arc<dyn ApplicationApi>, rt: RT, host: ResolvedHostname, config: SyncWorkerConfig, rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, on_connect: Box<dyn FnOnce(SessionId) + Send>, partition_id: u64, ) -> Self { let (mutation_sender, receiver) = mpsc::channel(OPERATION_QUEUE_BUFFER_SIZE); let mutation_futures = ReceiverStream::new(receiver).buffered(1); // Execute at most one operation at a time. SyncWorker { api, config, rt, state: SyncState::new(partition_id), host, rx, tx, mutation_futures, mutation_sender, action_futures: FuturesUnordered::new(), transition_future: None, update_scheduled: false, unavailable_query_retry_future: None, modify_query_to_transition_timers: BTreeMap::new(), on_connect: Some((connect_timer(partition_id), on_connect)), partition_id, client_clock_skew: None, } } fn schedule_update(&mut self) { self.update_scheduled = true; } fn schedule_unavailable_query_retry(&mut self) { if self.unavailable_query_retry_future.is_none() { let rt = self.rt.clone(); self.unavailable_query_retry_future = Some( async move { rt.wait(*SEARCH_INDEXES_UNAVAILABLE_RETRY_DELAY).await; } .boxed() .fuse(), ); } } /// Run the sync protocol worker, returning `Ok(())` on clean exit and `Err` /// if there's an exceptional protocol condition that should shutdown /// the WebSocket. pub async fn go(&mut self) -> anyhow::Result<()> { let mut ping_timeout = self.rt.wait(HEARTBEAT_INTERVAL); let mut pending = future::pending().boxed().fuse(); let mut unavailable_retry_pending = future::pending().boxed().fuse(); // Create a new subscription client for every sync socket. Thus we don't require // the subscription client to auto-recover on connection failures. let subscription_client: Arc<dyn SubscriptionClient> = self.api.subscription_client(&self.host).await?.into(); // Starts off as a future that is never ready, as there's no identity that may // expire. 'top: loop { let rt = self.rt.clone(); self.state.validate()?; let maybe_response = select_biased! { message = self.rx.recv().fuse() => { let (message, received_time) = match message { Some(m) => m, None => break 'top, }; self.handle_message(message).await?; let delay = self.rt.monotonic_now() - received_time; metrics::log_process_client_message_delay(self.partition_id, delay); None }, // TODO(presley): If I swap this with futures below, tests break. // We need to provide a guarantee that we can't transition to a // timestamp past a pending mutation or otherwise optimistic updates // might be flaky. To do that, we need to behave differently if we // have pending operation future or not. result = self.mutation_futures.next().fuse() => { let message = match result { Some(m) => m?, None => panic!("mutation_futures sender dropped prematurely"), }; self.schedule_update(); Some(message) }, result = self.action_futures.select_next_some() => { self.schedule_update(); Some(result?) }, result = self.state.next_invalidated_query().fuse() => { let _ = result?; self.schedule_update(); None }, transition_state = self.transition_future.as_mut().unwrap_or(&mut pending) => { self.transition_future = None; Some(self.finish_update_queries(transition_state?)?) }, _ = self.unavailable_query_retry_future .as_mut() .unwrap_or(&mut unavailable_retry_pending) => { tracing::info!("Scheduling an update to queries after a query failed because of async bootstrapping."); self.unavailable_query_retry_future = None; self.schedule_update(); None }, _ = self.tx.message_consumed().fuse() => { // Wake up if any message is consumed from the send buffer // in case update_scheduled is True. None } _ = ping_timeout => Some(ServerMessage::Ping {}), }; // If there is a message to return to the client, send it. if let Some(response) = maybe_response { assert!( !matches!(response, ServerMessage::FatalError { .. }) && !matches!(response, ServerMessage::AuthError { .. }), "fatal errors are returned above when handling special error types", ); // Break and exit cleanly if the websocket is dead. ping_timeout = self.rt.wait(HEARTBEAT_INTERVAL); let transition_heap_size = response.heap_size(); metrics::log_transition_size(self.partition_id, transition_heap_size); if self.tx.send((response, self.rt.monotonic_now())).is_err() { break 'top; } } // Send update unless the send channel already contains enough transitions, // and unless we are already computing an update. if self.update_scheduled && self.tx.transition_count() < *SYNC_MAX_SEND_TRANSITION_COUNT && self.transition_future.is_none() { // Always transition to the latest timestamp. In the future, // when we have Sync Worker running on the edge, we can remove this // call by making self.update_scheduled to be a Option<Timestamp>, // and set it accordingly based on the operation that triggered the // Transition. We would choose the latest timestamp available at // the edge for the initial sync. let target_ts = *self .api .latest_timestamp(&self.host, RequestId::new()) .await?; let new_transition_future = self.begin_update_queries(target_ts, subscription_client.clone())?; self.transition_future = Some( async move { rt.with_timeout( "update_queries", SYNC_WORKER_PROCESS_TIMEOUT, new_transition_future, ) .await } .boxed() .fuse(), ); self.update_scheduled = false; } } Ok(()) } pub fn identity_version(&self) -> IdentityVersion { self.state.current_version().identity } pub fn parse_admin_component_path( component_path: &str, udf_path: &UdfPath, identity: &Identity, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { let path = ComponentPath::deserialize(Some(component_path))?; anyhow::ensure!( path.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); let path = CanonicalizedComponentFunctionPath { component: path, udf_path: udf_path.clone().canonicalize(), }; Ok(path) } async fn handle_message(&mut self, message: ClientMessage) -> anyhow::Result<()> { let timer = metrics::handle_message_timer(self.partition_id, &message); match message { ClientMessage::Connect { session_id, last_close_reason, max_observed_timestamp, connection_count, client_ts, } => { if let Some((timer, on_connect)) = self.on_connect.take() { timer.finish(); on_connect(session_id); } if let Some(ts) = client_ts { self.client_clock_skew = Some(ts as i64 - self.rt.unix_timestamp().as_ms_since_epoch()? as i64); } self.state.set_session_id(session_id); if let Some(max_observed_timestamp) = max_observed_timestamp { let latest_timestamp = *self .api .latest_timestamp(&self.host, RequestId::new()) .await?; if max_observed_timestamp > latest_timestamp { // Unless there is a bug, this means the client have communicated // with a backend that have database writes we are not aware of. If // we serve the request, we will get a linearizability violation. // Instead error and report. It is possible we have to eventually turn // into a client error if there are bogus custom client implementations // but lets keep it as server one for now. metrics::log_linearizability_violation( self.partition_id, max_observed_timestamp.secs_since_f64(latest_timestamp), ); anyhow::bail!( "Client has observed a timestamp {max_observed_timestamp:?} ahead of \ the backend latest known timestamp {latest_timestamp:?}", ); } } metrics::log_connect(self.partition_id, last_close_reason, connection_count) }, ClientMessage::ModifyQuerySet { base_version, new_version, modifications, } => { let total_args_size = modifications .iter() .filter_map(|m| match m { QuerySetModification::Add(q) => Some(q.args.0.get().len()), QuerySetModification::Remove { .. } => None, }) .sum(); log_query_modification_args_size(self.partition_id, total_args_size); self.state .modify_query_set(base_version, new_version, modifications)?; self.schedule_update(); self.modify_query_to_transition_timers.insert( new_version, modify_query_to_transition_timer(self.partition_id), ); }, ClientMessage::Mutation { request_id, udf_path, args, component_path, } => { log_mutation_args_size(self.partition_id, args.0.get().len()); let identity = self.state.identity(self.rt.system_time())?; let mutation_identifier = self.state.session_id().map(|id| SessionRequestIdentifier { session_id: id, request_id, }); let server_request_id = match self.state.session_id() { Some(id) => RequestId::new_for_ws_session(id, request_id), None => RequestId::new(), }; let root = get_sampled_span( &self.host.instance_name, "sync-worker/mutation", &mut self.rt.rng(), btreemap! { "udf_type".into() => UdfType::Mutation.to_lowercase_string().into(), "udf_path".into() => udf_path.clone().into(), }, ); let rt = self.rt.clone(); let client_version = self.config.client_version.clone(); let timer = mutation_queue_timer(self.partition_id); let api = self.api.clone(); let host = self.host.clone(); let caller = FunctionCaller::SyncWorker(client_version); let mutation_queue_size = self.mutation_sender.max_capacity() - self.mutation_sender.capacity(); root.add_property(|| ("mutation_queue_size", mutation_queue_size.to_string())); let future = async move { rt.with_timeout("mutation", SYNC_WORKER_PROCESS_TIMEOUT, async move { timer.finish(); let result = match component_path { None => { api.execute_public_mutation( &host, server_request_id, identity, ExportPath::from(udf_path.canonicalize()), args, caller, mutation_identifier, Some(mutation_queue_size), ) .in_span(root) .await? }, Some(ref p) => { let path = Self::parse_admin_component_path(p, &udf_path, &identity)?; api.execute_admin_mutation( &host, server_request_id, identity, path, args, caller, mutation_identifier, Some(mutation_queue_size), ) .in_span(root) .await? }, }; let response = match result { Ok(udf_return) => ServerMessage::MutationResponse { request_id, result: Ok(udf_return.value), ts: Some(udf_return.ts), log_lines: udf_return.log_lines.into(), }, Err(RedactedMutationError { error, log_lines }) => { ServerMessage::MutationResponse { request_id, result: Err(error.into_error_payload()), ts: None, log_lines: log_lines.into(), } }, }; Ok(response) }) .await } .boxed(); self.mutation_sender.try_send(future).map_err(|err| { if matches!(err, TrySendError::Full(..)) { anyhow::anyhow!(ErrorMetadata::rate_limited( "TooManyConcurrentMutations", format!( "Too many concurrent mutations. Only up to \ {OPERATION_QUEUE_BUFFER_SIZE} pending mutations allowed on a \ single websocket." ), )) } else { anyhow::anyhow!("Failed to send to mutation channel: {err}") } })?; }, ClientMessage::Action { request_id, udf_path, args, component_path, } => { log_action_args_size(self.partition_id, args.0.get().len()); let identity = self.state.identity(self.rt.system_time())?; let api = self.api.clone(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); let server_request_id = match self.state.session_id() { Some(id) => RequestId::new_for_ws_session(id, request_id), None => RequestId::new(), }; let root = get_sampled_span( &self.host.instance_name, "sync-worker/action", &mut self.rt.rng(), btreemap! { "udf_type".into() => UdfType::Action.to_lowercase_string().into(), "udf_path".into() => udf_path.clone().into(), }, ); let future = async move { let caller = FunctionCaller::SyncWorker(client_version); let result = match component_path { None => { api.execute_public_action( &host, server_request_id, identity, ExportPath::from(udf_path.canonicalize()), args, caller, ) .in_span(root) .await? }, Some(ref p) => { let path = Self::parse_admin_component_path(p, &udf_path, &identity)?; api.execute_admin_action( &host, server_request_id, identity, path, args, caller, ) .in_span(root) .await? }, }; let response = match result { Ok(udf_return) => ServerMessage::ActionResponse { request_id, result: Ok(udf_return.value), log_lines: udf_return.log_lines.into(), }, Err(RedactedActionError { error, log_lines }) => { ServerMessage::ActionResponse { request_id, result: Err(error.into_error_payload()), log_lines: log_lines.into(), } }, }; Ok(response) } .boxed(); anyhow::ensure!( self.action_futures.len() <= OPERATION_QUEUE_BUFFER_SIZE, "Inflight actions overloaded, max concurrency: {OPERATION_QUEUE_BUFFER_SIZE}" ); self.action_futures.push(future); }, ClientMessage::Authenticate { token: auth_token, base_version, } => { let identity_result = self .api .authenticate(&self.host, RequestId::new(), auth_token) .await; let identity = match identity_result { Ok(identity) => identity, Err(e) => { let short_msg = e.short_msg().to_string(); let msg = e.msg().to_string(); // If the auth token is invalid, we want to signal the client // that we tried to update the auth token but failed, which will // prompt the client to not try the same token again. return Err(ErrorMetadata::auth_update_failed(short_msg, msg).into()); }, }; self.state.modify_identity(identity, base_version)?; self.schedule_update(); }, ClientMessage::Event(client_event) => { tracing::info!( "Event with type {}: {}", client_event.event_type, client_event.event ); match TypedClientEvent::try_from(client_event) { Ok(typed_client_event) => match typed_client_event { TypedClientEvent::ClientConnect { marks } => { metrics::log_client_connect_timings(self.partition_id, marks) }, TypedClientEvent::ClientReceivedTransition { transition_transit_time, message_length, } => metrics::log_client_transition( self.partition_id, transition_transit_time, message_length, ), }, Err(_) => (), } }, }; timer.finish(); Ok(()) } fn begin_update_queries( &mut self, new_ts: Timestamp, subscriptions_client: Arc<dyn SubscriptionClient>, ) -> anyhow::Result<impl Future<Output = anyhow::Result<TransitionState>>> { let root = get_sampled_span( &self.host.instance_name, "sync-worker/update-queries", &mut self.rt.rng(), btreemap! { "udf_type".into() => UdfType::Query.to_lowercase_string().into(), }, ); let _guard = root.set_local_parent(); let timer = metrics::update_queries_timer(self.partition_id); let current_version = self.state.current_version(); let (modifications, new_query_version, pending_identity, new_identity_version) = self.state.take_modifications(); let mut identity_version = current_version.identity; if let Some(new_identity) = pending_identity { // If the identity version has changed, invalidate all existing tokens. // TODO(CX-737): Don't invalidate queries that don't examine auth state. // TODO(CX-737): Don't invalidate the queries if the User the is the same // only with refreshed token. This is a bit tricky because: // - We need to prove that query does not depend on token issue/expiration time. // - We need to make rpc to backend to compare the properties since Usher can't // validate auth tokens. Alternatively, we make Usher be able to validate tokens // long term. self.state.take_subscriptions(); self.state.insert_identity(new_identity); identity_version = new_identity_version; } let identity = self.state.identity(self.rt.system_time())?; // Step 1: Decide on a new target (query set version, identity version, ts) for // the system. let new_version = StateVersion { ts: new_ts, // We only bump the query set version when the client modifies the query set query_set: new_query_version, identity: identity_version, }; // Step 2: Add or remove queries from our query set. let mut state_modifications = BTreeMap::new(); for modification in modifications { match modification { QuerySetModification::Add(query) => { self.state.insert(query)?; }, QuerySetModification::Remove { query_id } => { self.state.remove(query_id)?; state_modifications .insert(query_id, StateModification::QueryRemoved { query_id }); }, } } // Step 3: Take all remaining subscriptions. let mut remaining_subscriptions = self.state.take_subscriptions(); // Step 4: Refresh subscriptions up to new_ts and run queries which // subscriptions are no longer current. let api = self.api.clone(); let need_fetch: Vec<_> = self.state.need_fetch().collect(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); let partition_id = self.partition_id; Ok(async move { let future_results: anyhow::Result<Vec<_>> = try_join_buffer_unordered( "update_query", need_fetch.into_iter().map(move |query| { let api = api.clone(); let host = host.clone(); let identity_ = identity.clone(); let client_version = client_version.clone(); let current_subscription = remaining_subscriptions.remove(&query.query_id); let subscriptions_client = subscriptions_client.clone(); async move { LocalSpan::add_property(|| ("udf_path", query.udf_path.to_string())); let new_subscription = match current_subscription { Some(subscription) => { match subscription.extend_validity(new_ts).await? { SubscriptionValidity::Valid => Some(subscription), SubscriptionValidity::Invalid { invalid_ts } => { metrics::log_query_invalidated( partition_id, invalid_ts, new_ts, ); None }, } }, None => None, }; let (query_result, subscription) = match new_subscription { Some(subscription) => (QueryResult::Refresh, Some(subscription)), None => { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. let caller = FunctionCaller::SyncWorker(client_version); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation // of a subscription. The sync worker is effectively the owner // of the query so we do not want to re-use the original query // request id. let request_id = RequestId::new(); let udf_return_result = match query.component_path { None => { api.execute_public_query( &host, request_id, identity_, ExportPath::from(query.udf_path.canonicalize()), query.args, caller, ts, query.journal, ) .await }, Some(ref p) => { let path = Self::parse_admin_component_path( p, &query.udf_path, &identity_, )?; api.execute_admin_query( &host, request_id, identity_, path, query.args, caller, ts, query.journal, ) .await }, }; match udf_return_result { Err(e) => { // TODO: use ErrorCode::FeatureTemporarilyUnavailable // instead if let Some(error) = e.downcast_ref::<ErrorMetadata>() && [ "SearchIndexesUnavailable", "TableSummariesUnavailable", ] .contains(&&*error.short_msg) { (QueryResult::TemporarilyUnavailable, None) } else { anyhow::bail!(e) } }, Ok(udf_return) => { let subscription = subscriptions_client .subscribe(udf_return.token) .await?; ( QueryResult::Rerun { result: udf_return.result, log_lines: udf_return.log_lines, journal: udf_return.journal, }, Some(subscription), ) }, } }, }; Ok::<_, anyhow::Error>((query.query_id, query_result, subscription)) } }), ) .await; let mut udf_results = vec![]; let mut temporarily_unavailable = false; for result in future_results? { let (query_id, result, maybe_subscription) = result; if matches!(result, QueryResult::TemporarilyUnavailable) { temporarily_unavailable = true; } if let Some(subscription) = maybe_subscription { udf_results.push((query_id, result, subscription)); } } Ok(TransitionState { udf_results, state_modifications, current_version, new_version, timer, temporarily_unavailable, }) } .in_span(root)) } fn finish_update_queries( &mut self, TransitionState { udf_results, mut state_modifications, current_version, new_version, timer, temporarily_unavailable, }: TransitionState, ) -> anyhow::Result<ServerMessage> { for (query_id, result, subscription) in udf_results { match result { QueryResult::Rerun { result, log_lines, journal, } => { let modification = self.state.complete_fetch( query_id, result, log_lines, journal, subscription, )?; let Some(modification) = modification else { continue; }; state_modifications.insert(query_id, modification); }, QueryResult::Refresh => { self.state.refill_subscription(query_id, subscription)?; }, QueryResult::TemporarilyUnavailable => { anyhow::bail!( "No QueryResult::TemporarilyUnavailable should have a udf result and \ subscription" ) }, } } if temporarily_unavailable { self.schedule_unavailable_query_retry(); } // Resubscribe for queries that don't have an active invalidation // future. self.state.fill_invalidation_futures()?; // Step 6: Send our transition to the client and update our version. self.state.advance_version(new_version)?; let transition = ServerMessage::Transition { start_version: current_version, end_version: new_version, modifications: state_modifications.into_values().collect(), client_clock_skew: self.client_clock_skew, server_ts: None, }; timer.finish(); metrics::log_query_set_size(self.partition_id, self.state.num_queries()); // Only retain timers for queries that haven't been updated yet. Finish the // timers for everything up through the new version. let finished_timers = self .modify_query_to_transition_timers .extract_if(.., |version, _| *version <= new_version.query_set); for (_, timer) in finished_timers { timer.finish(); } Ok(transition) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server