Skip to main content
Glama

Convex MCP server

Official
by get-convex
timeout.rs8.44 kB
use std::{ sync::Arc, time::Duration, }; use async_broadcast::broadcast; use common::runtime::{ Runtime, SpawnHandle, }; use futures::{ future, future::Either, select_biased, Future, FutureExt, }; use parking_lot::Mutex; use crate::{ metrics, termination::{ ContextHandle, TerminationReason, }, }; /// A `Timeout` is an asynchronous background job that terminates an /// `IsolateHandle` after some time has passed. The holder of a `Timeout` can /// temporarily pause the termination countdown with [`Timeout::pause`] and then /// resume time tracking by dropping the returned guard. /// /// If the higher level operation succeeds, call `Timeout::finish` to cancel the /// background job and prevent it from terminating the isolate. pub struct Timeout<RT: Runtime> { handle: Box<dyn SpawnHandle>, inner: Arc<Mutex<TimeoutInner<RT>>>, done_rx: async_broadcast::Receiver<()>, } struct TimeoutInner<RT: Runtime> { rt: RT, start: tokio::time::Instant, timeout: Option<Duration>, // How long has the timeout been in the paused state? pause_elapsed: Duration, max_time_paused: Option<Duration>, state: TimeoutState, } impl<RT: Runtime> TimeoutInner<RT> { fn termination_reason_or_wait( &mut self, timeout: Duration, ) -> Result<Option<TerminationReason>, impl Future<Output = ()> + 'static> { let initial_deadline = self.start + timeout; match self.state { TimeoutState::Running => { // Extend the deadline by the time spent paused. let deadline = initial_deadline + self.pause_elapsed; let now = self.rt.monotonic_now(); if now >= deadline { metrics::log_user_timeout(); return Ok(Some(TerminationReason::UserTimeout(timeout))); } // Wait on our current deadline to pass. // TODO: Cancel the timer on `Timeout::finish` so we don't keep an // `IsolateHandle` alive for the wait duration. Err(Either::Left(self.rt.wait(deadline - now))) }, TimeoutState::Paused { ref mut pause_done, ref pause_start, } => { let expired = if let Some(max_time_paused) = self.max_time_paused { let total_pause_elapsed = self.pause_elapsed + pause_start.elapsed(); if total_pause_elapsed >= max_time_paused { metrics::log_system_timeout(); return Ok(Some(TerminationReason::SystemTimeout(max_time_paused))); } Either::Left(self.rt.wait(max_time_paused - total_pause_elapsed)) } else { Either::Right(future::pending()) }; let mut pause_done = pause_done.clone(); Err(Either::Right(async move { let _ = future::select(expired, pause_done.recv()).await; })) }, TimeoutState::Finished => Ok(None), } } } #[derive(Debug)] enum TimeoutState { Running, Paused { pause_start: tokio::time::Instant, pause_done: async_broadcast::Receiver<()>, }, Finished, } impl<RT: Runtime> Drop for Timeout<RT> { fn drop(&mut self) { self.finish(); } } // We default to counting everything as user time but we exempt async syscalls // from the user timeout and count them as system time instead. impl<RT: Runtime> Timeout<RT> { pub fn new( rt: RT, handle: ContextHandle, timeout: Option<Duration>, max_time_paused: Option<Duration>, ) -> Self { let start = rt.monotonic_now(); let inner = TimeoutInner { rt: rt.clone(), start, timeout, pause_elapsed: Duration::ZERO, max_time_paused, state: TimeoutState::Running, }; let inner = Arc::new(Mutex::new(inner)); let (done_tx, done_rx) = broadcast(1); let handle = rt.spawn("isolate_timeout", Self::go(handle, inner.clone(), done_tx)); Self { handle, inner, done_rx, } } // Returns a future that resolves when the background timeout thread has // completed. This can either happen if the isolate has been terminated // due to timeout or the Timeout has been dropped due to an error. pub fn wait_until_completed(&self) -> impl Future<Output = ()> + 'static { let mut done_rx = self.done_rx.clone(); async move { let _ = done_rx.recv().await; } } /// Runs the future until completely or until the timeout has expired. /// Returns an error in the latter case. pub fn with_timeout<'a, T>( &self, f: impl Future<Output = T> + 'a, ) -> impl Future<Output = anyhow::Result<T>> + 'a { let completed = self.wait_until_completed(); async move { select_biased! { result = f.fuse() => Ok(result), // NOTE: When the background thread returns, it either means we have // terminated due to timeout or have been dropped. Either way, it // is ok to stop executing. The exact error we throw here doesn't // matter since we know the isolate layer overrides the error if there // is a termination reason. _ = completed.fuse() => anyhow::bail!("Timed out"), } } } pub fn pause(&mut self) -> PauseGuard<'_, RT> { let (tx, rx) = broadcast(1); let pause_start = { let mut inner = self.inner.lock(); let TimeoutState::Running = inner.state else { panic!("Overlapping calls to timeout.pause()"); }; let pause_start = inner.rt.monotonic_now(); inner.state = TimeoutState::Paused { pause_done: rx, pause_start, }; pause_start }; PauseGuard { timeout: self, pause_start, pause_done: Some(tx), } } pub fn finish(&mut self) { { let mut inner = self.inner.lock(); if matches!(inner.state, TimeoutState::Finished) { return; } inner.state = TimeoutState::Finished; } self.handle.shutdown(); } async fn go( handle: ContextHandle, inner: Arc<Mutex<TimeoutInner<RT>>>, done_tx: async_broadcast::Sender<()>, ) { let timeout = match inner.lock().timeout { None => return, Some(timeout) => timeout, }; let termination_reason = loop { let future = match inner.lock().termination_reason_or_wait(timeout) { Ok(termination_reason) => break termination_reason, Err(future) => future, }; future.await; }; if let Some(reason) = termination_reason { handle.terminate(reason); } let _ = done_tx.try_broadcast(()); } pub fn get_function_execution_time(&self) -> FunctionExecutionTime { let inner = self.inner.lock(); let elapsed = inner.rt.monotonic_now() - inner.start - inner.pause_elapsed; let limit = inner.timeout.unwrap_or(Duration::ZERO); FunctionExecutionTime { elapsed, limit } } } pub struct FunctionExecutionTime { pub elapsed: Duration, pub limit: Duration, } pub struct PauseGuard<'a, RT: Runtime> { timeout: &'a mut Timeout<RT>, pause_start: tokio::time::Instant, pause_done: Option<async_broadcast::Sender<()>>, } impl<RT: Runtime> PauseGuard<'_, RT> { pub fn resume(self) { drop(self); } } impl<RT: Runtime> Drop for PauseGuard<'_, RT> { fn drop(&mut self) { let Some(tx) = self.pause_done.take() else { return; }; { let mut inner = self.timeout.inner.lock(); assert!(matches!(inner.state, TimeoutState::Paused { .. })); inner.pause_elapsed += self.pause_start.elapsed(); inner.state = TimeoutState::Running; } let _ = tx.try_broadcast(()); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server