Skip to main content
Glama

Convex MCP server

Official
by get-convex
server.rs7.33 kB
use std::{ sync::Arc, time::Duration, }; use application::{ api::ApplicationApi, RedactedMutationError, RedactedMutationReturn, }; use common::{ http::{ RequestDestination, ResolvedHostname, }, runtime::{ JoinSet, Runtime, SpawnHandle, }, types::FunctionCaller, value::ConvexObject, RequestId, }; use keybroker::Identity; use rand_distr::{ Distribution, Geometric, }; use runtime::testing::TestRuntime; use sync::{ worker::{ measurable_unbounded_channel, SingleFlightSender, }, ServerMessage, SyncWorker, SyncWorkerConfig, }; use sync_types::{ types::SerializedArgs, ClientMessage, Timestamp, UdfPath, }; use tokio::{ sync::{ mpsc, oneshot, }, time::Instant, }; pub enum ServerRequest { Subscribe { incoming: mpsc::UnboundedReceiver<(ClientMessage, Instant)>, outgoing: SingleFlightSender, }, Mutation { udf_path: UdfPath, args: ConvexObject, result: oneshot::Sender<Result<RedactedMutationReturn, RedactedMutationError>>, }, LatestTimestamp { result: oneshot::Sender<Timestamp>, }, } #[derive(Clone)] pub struct ServerThread { rt: TestRuntime, tx: mpsc::UnboundedSender<ServerRequest>, expected_delay_duration: Option<Duration>, } impl ServerThread { pub fn new( rt: TestRuntime, application: Arc<dyn ApplicationApi>, expected_delay_duration: Option<Duration>, ) -> (Self, Box<dyn SpawnHandle>) { let (tx, rx) = mpsc::unbounded_channel(); let rt_clone = rt.clone(); let handle = rt.spawn("ServerThread", async move { Self::go(rt_clone, application, rx) .await .expect("Server thread crashed") }); ( Self { rt, tx, expected_delay_duration, }, handle, ) } pub fn connect( &self, ) -> anyhow::Result<( mpsc::UnboundedSender<(ClientMessage, Instant)>, mpsc::UnboundedReceiver<(ServerMessage, Instant)>, )> { let (client_tx, client_rx) = mpsc::unbounded_channel(); let (server_tx, mut server_rx) = measurable_unbounded_channel(); self.tx.send(ServerRequest::Subscribe { incoming: client_rx, outgoing: server_tx, })?; let (faulty_client_tx, mut faulty_client_rx) = mpsc::unbounded_channel(); let (faulty_server_tx, faulty_server_rx) = mpsc::unbounded_channel(); let delay_distribution = match self.expected_delay_duration { Some(duration) => Some(Geometric::new(1.0 / duration.as_secs_f64())?), None => None, }; let rt = self.rt.clone(); tokio::task::spawn(async move { while let Some(msg) = faulty_client_rx.recv().await { if let Some(delay_distribution) = delay_distribution { let delay = delay_distribution.sample(&mut *rt.rng()); tokio::time::sleep(Duration::from_secs(delay)).await; } if client_tx.send(msg).is_err() { tracing::debug!("Server receiver closed"); return; } } tracing::debug!("Client sender closed"); }); let rt = self.rt.clone(); tokio::task::spawn(async move { while let Some(msg) = server_rx.next().await { if let Some(delay_distribution) = delay_distribution { let delay = delay_distribution.sample(&mut *rt.rng()); tokio::time::sleep(Duration::from_secs(delay)).await; } if faulty_server_tx.send(msg).is_err() { tracing::debug!("Client receiver closed"); return; } } tracing::debug!("Server sender closed"); }); Ok((faulty_client_tx, faulty_server_rx)) } pub async fn mutation( &self, udf_path: UdfPath, args: ConvexObject, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> { let (tx, rx) = oneshot::channel(); self.tx.send(ServerRequest::Mutation { udf_path, args, result: tx, })?; Ok(rx.await?) } pub async fn latest_timestamp(&self) -> anyhow::Result<Timestamp> { let (tx, rx) = oneshot::channel(); self.tx .send(ServerRequest::LatestTimestamp { result: tx })?; Ok(rx.await?) } async fn go( rt: TestRuntime, api: Arc<dyn ApplicationApi>, mut rx: mpsc::UnboundedReceiver<ServerRequest>, ) -> anyhow::Result<()> { let host = ResolvedHostname { instance_name: "suadero".to_string(), destination: RequestDestination::ConvexCloud, }; let mut join_set = JoinSet::new(); loop { tokio::select! { Some(next) = join_set.join_next() => { if let Err(e) = next { tracing::error!("Sync worker failed: {e:?}"); } } request = rx.recv() => { let Some(request) = request else { tracing::info!("Server thread shutting down..."); return Ok(()); }; match request { ServerRequest::Subscribe { incoming, outgoing } => { tracing::info!("Received subscribe..."); let mut w = SyncWorker::new( api.clone(), rt.clone(), host.clone(), SyncWorkerConfig::default(), incoming, outgoing, Box::new(|_session_id| ()), 0, ); join_set.spawn("sync_worker", async move { w.go().await }); }, ServerRequest::Mutation { udf_path, args, result } => { let res = api.execute_public_mutation( &host, RequestId::new(), Identity::system(), udf_path.canonicalize().into(), SerializedArgs::from_args(vec![args.into()])?, FunctionCaller::Test, None, None, ).await?; let _ = result.send(res); }, ServerRequest::LatestTimestamp { result } => { let res = api.latest_timestamp(&host, RequestId::new()).await?; let _ = result.send(*res); }, } } } } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server