Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs8.38 kB
use std::time::Duration; use common::{ errors::JsError, runtime::{ Runtime, SpawnHandle, }, value::{ ConvexObject, ConvexValue, }, }; use js_protocol::SyncMutationStatus; use runtime::testing::TestRuntime; use serde::Serialize; use sync_types::{ Timestamp, UdfPath, }; use tokio::sync::{ mpsc, oneshot, }; use uuid::Uuid; use super::server::ServerThread; mod environment; mod go; mod js_protocol; mod state; pub type QueryToken = String; pub type SyncQuerySubscriptionId = String; pub type MutationId = String; #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct MutationInfo { pub mutation_path: String, pub opt_update_args: ConvexObject, pub server_args: ConvexObject, } pub enum JsClientThreadRequest { AddQuery { udf_path: UdfPath, args: ConvexObject, sender: oneshot::Sender<QueryToken>, }, QueryResult { token: QueryToken, sender: oneshot::Sender<Option<ConvexValue>>, }, RemoveQuery { token: QueryToken, sender: oneshot::Sender<()>, }, RunMutation { udf_path: UdfPath, args: ConvexObject, sender: oneshot::Sender<Result<ConvexValue, JsError>>, }, AddSyncQuery { id: String, name: String, args: ConvexObject, sender: oneshot::Sender<()>, }, SyncQueryResult { id: SyncQuerySubscriptionId, sender: oneshot::Sender<Option<Result<ConvexValue, JsError>>>, }, RemoveSyncQuery { id: SyncQuerySubscriptionId, sender: oneshot::Sender<()>, }, RequestSyncMutation { id: String, mutation_info: MutationInfo, sender: oneshot::Sender<()>, }, GetSyncMutationStatus { id: MutationId, sender: oneshot::Sender<Option<SyncMutationStatus>>, }, MaxObservedTimestamp { sender: oneshot::Sender<Option<Timestamp>>, }, DisconnectNetwork { sender: oneshot::Sender<bool>, }, ReconnectNetwork { sender: oneshot::Sender<bool>, }, } #[derive(Clone)] pub struct JsClientThread { rt: TestRuntime, tx: mpsc::UnboundedSender<JsClientThreadRequest>, } impl JsClientThread { pub fn new(rt: TestRuntime, server: ServerThread) -> (Self, Box<dyn SpawnHandle>) { let (tx, rx) = mpsc::unbounded_channel(); let rt_ = rt.clone(); let handle = rt.spawn_thread("js_thread", move || async move { Self::go(rt_, server, rx).await.expect("JsThread failed"); }); (Self { rt, tx }, handle) } pub async fn add_query( &self, udf_path: UdfPath, args: ConvexObject, ) -> anyhow::Result<QueryToken> { let (sender, receiver) = oneshot::channel(); self.tx.send(JsClientThreadRequest::AddQuery { udf_path, args, sender, })?; Ok(receiver.await?) } pub async fn query_result(&self, token: QueryToken) -> anyhow::Result<Option<ConvexValue>> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::QueryResult { token, sender })?; Ok(receiver.await?) } pub async fn remove_query(&self, token: QueryToken) -> anyhow::Result<()> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::RemoveQuery { token, sender })?; Ok(receiver.await?) } pub async fn run_mutation( &self, udf_path: UdfPath, args: ConvexObject, ) -> anyhow::Result<Result<ConvexValue, JsError>> { let (sender, receiver) = oneshot::channel(); self.tx.send(JsClientThreadRequest::RunMutation { udf_path, args, sender, })?; Ok(receiver.await?) } pub async fn add_sync_query( &self, name: &str, args: ConvexObject, ) -> anyhow::Result<SyncQuerySubscriptionId> { let (sender, receiver) = oneshot::channel(); let id = Uuid::new_v4().to_string(); self.tx.send(JsClientThreadRequest::AddSyncQuery { id: id.clone(), name: name.to_string(), args, sender, })?; receiver.await?; Ok(id) } pub async fn sync_query_result( &self, id: SyncQuerySubscriptionId, ) -> anyhow::Result<Option<Result<ConvexValue, JsError>>> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::SyncQueryResult { id, sender })?; Ok(receiver.await?) } pub async fn remove_sync_query(&self, id: SyncQuerySubscriptionId) -> anyhow::Result<()> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::RemoveSyncQuery { id, sender })?; Ok(receiver.await?) } pub async fn request_sync_mutation( &self, mutation_info: MutationInfo, ) -> anyhow::Result<MutationId> { let (sender, receiver) = oneshot::channel(); let id = Uuid::new_v4().to_string(); self.tx.send(JsClientThreadRequest::RequestSyncMutation { id: id.clone(), mutation_info, sender, })?; receiver.await?; Ok(id) } pub async fn get_sync_mutation_status( &self, id: MutationId, ) -> anyhow::Result<Option<SyncMutationStatus>> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::GetSyncMutationStatus { id, sender })?; Ok(receiver.await?) } pub async fn max_observed_timestamp(&self) -> anyhow::Result<Option<Timestamp>> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::MaxObservedTimestamp { sender })?; Ok(receiver.await?) } pub async fn wait_for_server_ts(&self, ts: Timestamp) -> anyhow::Result<()> { while self.max_observed_timestamp().await? < Some(ts) { // Since we virtualize time, the actual duration we sleep here doesn't matter // much. self.rt.wait(Duration::from_secs(1)).await; } Ok(()) } pub async fn wait_for_sync_mutation_reflected_locally( &self, id: MutationId, ) -> anyhow::Result<()> { while !matches!( self.get_sync_mutation_status(id.clone()).await?, Some(SyncMutationStatus::ReflectedLocallyButWaitingForNetwork) | Some(SyncMutationStatus::Reflected) ) { // Since we virtualize time, the actual duration we sleep here doesn't matter // much. self.rt.wait(Duration::from_secs(1)).await; } Ok(()) } pub async fn wait_for_sync_mutation_reflected(&self, id: MutationId) -> anyhow::Result<()> { while !matches!( self.get_sync_mutation_status(id.clone()).await?, Some(SyncMutationStatus::Reflected) ) { // Since we virtualize time, the actual duration we sleep here doesn't matter // much. self.rt.wait(Duration::from_secs(1)).await; } Ok(()) } pub async fn wait_for_sync_mutation_reflected_on_network( &self, id: MutationId, ) -> anyhow::Result<()> { while !matches!( self.get_sync_mutation_status(id.clone()).await?, Some(SyncMutationStatus::ReflectedOnNetworkButNotLocally) | Some(SyncMutationStatus::Reflected) ) { // Since we virtualize time, the actual duration we sleep here doesn't matter // much. self.rt.wait(Duration::from_secs(1)).await; } Ok(()) } pub async fn disconnect_network(&self) -> anyhow::Result<bool> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::DisconnectNetwork { sender })?; Ok(receiver.await?) } pub async fn reconnect_network(&self) -> anyhow::Result<bool> { let (sender, receiver) = oneshot::channel(); self.tx .send(JsClientThreadRequest::ReconnectNetwork { sender })?; Ok(receiver.await?) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server