Skip to main content
Glama

Convex MCP server

Official
by get-convex
subscription.rs4.83 kB
use std::{ ops::Deref, pin::Pin, }; use futures::{ task, Stream, StreamExt, }; use tokio::sync::mpsc; use tokio_stream::wrappers::{ errors::BroadcastStreamRecvError, BroadcastStream, }; use crate::{ base_client::{ FunctionResult, QueryResults, SubscriberId, }, client::worker::{ ClientRequest, UnsubscribeRequest, }, }; #[cfg(doc)] use crate::{ ConvexClient, Value, }; /// This structure represents a single subscription to a query with args. /// For convenience, [`QuerySubscription`] also implements /// [`Stream`]<[`FunctionResult`]>, giving a stream of results to the query. /// /// It is returned by [`ConvexClient::subscribe`]. The subscription lives /// in the active query set for as long as this token stays in scope. /// /// For a consistent [`QueryResults`] of all your queries, use /// [`ConvexClient::watch_all()`] instead. pub struct QuerySubscription { pub(super) subscriber_id: SubscriberId, pub(super) request_sender: mpsc::UnboundedSender<ClientRequest>, pub(super) watch: BroadcastStream<QueryResults>, pub(super) initial: Option<FunctionResult>, } impl QuerySubscription { /// Returns an identifier for this subscription based on its query and args. /// This identifier can be used to find the result within a /// [`QuerySetSubscription`] as returned by [`ConvexClient::watch_all()`] pub fn id(&self) -> &SubscriberId { &self.subscriber_id } } impl std::fmt::Debug for QuerySubscription { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("QuerySubscription") .field("subscriber_id", &self.subscriber_id) .finish() } } impl Deref for QuerySubscription { type Target = SubscriberId; fn deref(&self) -> &SubscriberId { &self.subscriber_id } } impl Drop for QuerySubscription { fn drop(&mut self) { let _ = self .request_sender .send(ClientRequest::Unsubscribe(UnsubscribeRequest { subscriber_id: self.subscriber_id, })); } } impl Stream for QuerySubscription { type Item = FunctionResult; fn poll_next( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll<Option<Self::Item>> { if let Some(initial) = self.initial.take() { return task::Poll::Ready(Some(initial)); } loop { return match self.watch.poll_next_unpin(cx) { // Ok to be lagged (skip intermediate values) - since Convex // only guarantees a newer value than the previous value. task::Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_amt)))) => continue, task::Poll::Ready(Some(Ok(map))) => { let Some(value) = map.get(self.id()) else { // No result yet in the query result set. Keep polling. continue; }; task::Poll::Ready(Some(value.clone())) }, task::Poll::Ready(None) => task::Poll::Ready(None), task::Poll::Pending => task::Poll::Pending, }; } } } /// A subscription to a consistent view of multiple queries. /// /// [`QuerySetSubscription`] /// implements [`Stream`]<[`QueryResults`]>. /// Each item in the stream contains a consistent view /// of the results of all the queries in the query set. /// /// Queries can be added to the query set via [`ConvexClient::subscribe`]. /// Queries can be removed from the query set via dropping the /// [`QuerySubscription`] token returned by [`ConvexClient::subscribe`]. /// /// /// [`QueryResults`] is a copy-on-write mapping from [`SubscriberId`] to /// its latest result [`Value`]. pub struct QuerySetSubscription { watch: BroadcastStream<QueryResults>, } impl QuerySetSubscription { pub(super) fn new(watch: BroadcastStream<QueryResults>) -> Self { Self { watch } } } impl Stream for QuerySetSubscription { type Item = QueryResults; fn poll_next( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll<Option<Self::Item>> { loop { return match self.watch.poll_next_unpin(cx) { // Ok to be lagged (skip intermediate values) - since Convex // only guarantees a newer value than the previous value. task::Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_amt)))) => continue, task::Poll::Ready(Some(Ok(map))) => task::Poll::Ready(Some(map)), task::Poll::Ready(None) => task::Poll::Ready(None), task::Poll::Pending => task::Poll::Pending, }; } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server