Skip to main content
Glama

Convex MCP server

Official
by get-convex
client.rs7.27 kB
use std::{ sync::Arc, time::{ Duration, Instant, }, }; use common::{ runtime::Runtime, types::UdfType, }; use deno_core::ModuleSpecifier; use serde_json::Value as JsonValue; use sync_types::CanonicalizedUdfPath; use tokio::sync::{ mpsc::{ self, error::TrySendError, }, oneshot, Semaphore, }; use value::{ ConvexArray, ConvexValue, }; use super::{ environment::EnvironmentOutcome, FunctionId, PromiseId, }; use crate::{ environment::{ action::TaskResponseEnum, AsyncOpRequest, }, timeout::FunctionExecutionTime, }; pub enum IsolateThreadRequest { RegisterModule { name: ModuleSpecifier, source: String, source_map: Option<String>, response: oneshot::Sender<anyhow::Result<Vec<ModuleSpecifier>>>, }, EvaluateModule { name: ModuleSpecifier, response: oneshot::Sender<anyhow::Result<()>>, }, StartFunction { udf_type: UdfType, udf_path: CanonicalizedUdfPath, arguments: ConvexArray, response: oneshot::Sender<anyhow::Result<(FunctionId, EvaluateResult)>>, }, PollFunction { function_id: FunctionId, completions: Completions, response: oneshot::Sender<anyhow::Result<EvaluateResult>>, }, Shutdown { response: oneshot::Sender<anyhow::Result<EnvironmentOutcome>>, }, } pub enum EvaluateResult { Ready(ConvexValue), Pending(Pending), } #[derive(Debug)] pub struct Pending { pub async_syscalls: Vec<PendingAsyncSyscall>, pub async_ops: Vec<PendingAsyncOp>, pub dynamic_imports: Vec<PendingDynamicImport>, } impl Pending { pub fn is_empty(&self) -> bool { self.async_syscalls.is_empty() && self.async_ops.is_empty() && self.dynamic_imports.is_empty() } } #[derive(Debug)] pub struct Completions { pub async_syscalls: Vec<AsyncSyscallCompletion>, pub async_ops: Vec<AsyncOpCompletion>, } impl Completions { pub fn new() -> Self { Self { async_syscalls: vec![], async_ops: vec![], } } } #[derive(Debug)] pub struct PendingAsyncSyscall { pub promise_id: PromiseId, pub name: String, pub args: JsonValue, } #[derive(Debug)] pub struct AsyncSyscallCompletion { pub promise_id: PromiseId, pub result: anyhow::Result<String>, } #[derive(Debug)] pub struct PendingAsyncOp { pub promise_id: PromiseId, pub request: AsyncOpRequest, } #[derive(Debug)] pub struct AsyncOpCompletion { pub promise_id: PromiseId, pub result: anyhow::Result<TaskResponseEnum>, } #[derive(Debug)] pub struct PendingDynamicImport { pub promise_id: PromiseId, pub specifier: ModuleSpecifier, } pub type QueryId = u32; pub struct IsolateThreadClient<RT: Runtime> { rt: RT, sender: mpsc::Sender<IsolateThreadRequest>, user_timeout: Duration, user_time_remaining: Duration, semaphore: Arc<Semaphore>, } impl<RT: Runtime> IsolateThreadClient<RT> { pub fn new( rt: RT, sender: mpsc::Sender<IsolateThreadRequest>, user_timeout: Duration, semaphore: Arc<Semaphore>, ) -> Self { Self { rt, sender, user_timeout, user_time_remaining: user_timeout, semaphore, } } pub fn execution_time(&self) -> anyhow::Result<FunctionExecutionTime> { if self.user_time_remaining.is_zero() { anyhow::bail!("User time exhausted"); } let elapsed = self.user_timeout - self.user_time_remaining; Ok(FunctionExecutionTime { elapsed, limit: self.user_timeout, }) } pub async fn send<T>( &mut self, request: IsolateThreadRequest, rx: oneshot::Receiver<anyhow::Result<T>>, ) -> anyhow::Result<T> { if self.user_time_remaining.is_zero() { anyhow::bail!("User time exhausted"); } // Use the semaphore to ensure that a bounded number of isolate // threads are executing at any point in time. let permit = self.semaphore.clone().acquire_owned().await?; // Start the user timer after we acquire the permit. let user_start = Instant::now(); let user_timeout = self.rt.wait(self.user_time_remaining); if let Err(e) = self.sender.try_send(request) { match e { TrySendError::Full(_) => anyhow::bail!("Isolate thread queue is full"), TrySendError::Closed(_) => anyhow::bail!("Isolate thread was dropped"), } } let result = tokio::select! { result = rx => result, _ = user_timeout => { // XXX: We need to terminate the isolate handle here in // case user code is in an infinite loop. anyhow::bail!("User time exhausted"); }, }; // Deduct the time spent in the isolate thread from our remaining user time. let user_elapsed = user_start.elapsed(); self.user_time_remaining = self.user_time_remaining.saturating_sub(user_elapsed); // Drop the permit once we've received the response, allowing another // Tokio thread to talk to its V8 thread. drop(permit); result? } pub async fn register_module( &mut self, name: ModuleSpecifier, source: String, source_map: Option<String>, ) -> anyhow::Result<Vec<ModuleSpecifier>> { let (tx, rx) = oneshot::channel(); self.send( IsolateThreadRequest::RegisterModule { name, source, source_map, response: tx, }, rx, ) .await } pub async fn evaluate_module(&mut self, name: ModuleSpecifier) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); self.send( IsolateThreadRequest::EvaluateModule { name, response: tx }, rx, ) .await } pub async fn start_function( &mut self, udf_type: UdfType, udf_path: CanonicalizedUdfPath, arguments: ConvexArray, ) -> anyhow::Result<(FunctionId, EvaluateResult)> { let (tx, rx) = oneshot::channel(); self.send( IsolateThreadRequest::StartFunction { udf_type, udf_path, arguments, response: tx, }, rx, ) .await } pub async fn poll_function( &mut self, function_id: FunctionId, completions: Completions, ) -> anyhow::Result<EvaluateResult> { let (tx, rx) = oneshot::channel(); self.send( IsolateThreadRequest::PollFunction { function_id, completions, response: tx, }, rx, ) .await } pub async fn shutdown(&mut self) -> anyhow::Result<EnvironmentOutcome> { let (tx, rx) = oneshot::channel(); self.send(IsolateThreadRequest::Shutdown { response: tx }, rx) .await } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server