Skip to main content
Glama

Convex MCP server

Official
by get-convex
prod.rs7.63 kB
//! Production implementation of the Runtime trait. use std::{ future::Future, pin::Pin, sync::LazyLock, task::{ ready, Poll, }, thread, time::{ Instant, SystemTime, }, }; use async_trait::async_trait; use common::{ knobs::{ RUNTIME_DISABLE_LIFO_SLOT, RUNTIME_STACK_SIZE, RUNTIME_WORKER_THREADS, }, pause::PauseClient, runtime::{ propagate_tracing, propagate_tracing_blocking, JoinError, Runtime, SpawnHandle, TokioSpawnHandle, GLOBAL_TASK_MANAGER, }, }; use futures::{ future::FusedFuture, FutureExt, }; use rand::RngCore; use tokio::{ runtime::{ Builder, Handle as TokioRuntimeHandle, Runtime as TokioRuntime, }, sync::oneshot, time::{ sleep, Duration, }, }; use tokio_metrics_collector::TaskMonitor; static INSTANT_EPOCH: LazyLock<Instant> = LazyLock::new(Instant::now); pub struct ThreadHandle { cancel: Option<oneshot::Sender<()>>, done: Option<oneshot::Receiver<bool>>, handle: Option<std::thread::JoinHandle<()>>, } impl SpawnHandle for ThreadHandle { fn shutdown(&mut self) { if let Some(cancel) = self.cancel.take() { let _ = cancel.send(()); } } fn poll_join(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), JoinError>> { if let Some(done) = &mut self.done { let result = ready!(Pin::new(done).poll(cx)); self.done = None; match result { // the future was not canceled Ok(false) => Poll::Ready(Ok(())), // the future was canceled by `.shutdown()` Ok(true) => Poll::Ready(Err(JoinError::Canceled)), Err(_) => { let join_r = self.handle.take().expect("Future completed twice?").join(); // Otherwise look at the result from `std::thread` to see if it panicked. let join_err = join_r.expect_err("Future didn't exit cleanly but didn't panic?"); Poll::Ready(Err(JoinError::Panicked(anyhow::anyhow!("{:?}", join_err)))) }, } } else { Poll::Ready(Ok(())) } } fn detach(mut self: Box<Self>) { self.cancel.take(); } } impl Drop for ThreadHandle { fn drop(&mut self) { self.shutdown(); } } impl ThreadHandle { fn spawn<Fut, F>(name: String, tokio_handle: TokioRuntimeHandle, f: F) -> Self where Fut: Future<Output = ()>, F: FnOnce() -> Fut + Send + 'static, { let (cancel_tx, cancel_rx) = oneshot::channel(); let (done_tx, done_rx) = oneshot::channel(); let thread_handle = thread::Builder::new() .stack_size(*RUNTIME_STACK_SIZE) .name(name) .spawn(move || { let _guard = tokio_handle.enter(); let thread_body = async move { let future = f(); tokio::pin!(future); let was_canceled = tokio::select! { r = cancel_rx => { if r.is_ok() { true } else { future.await; false } }, _ = &mut future => false, }; let _ = done_tx.send(was_canceled); }; tokio_handle.block_on(thread_body); }) .expect("Failed to start thread"); ThreadHandle { handle: Some(thread_handle), cancel: Some(cancel_tx), done: Some(done_rx), } } } /// Runtime for running in production that sleeps for wallclock time, doesn't /// mock out any functionality, etc. #[derive(Clone)] pub struct ProdRuntime { rt: TokioRuntimeHandle, } impl ProdRuntime { pub fn init_tokio() -> anyhow::Result<TokioRuntime> { assert!( TokioRuntimeHandle::try_current().is_err(), "Tried to create a `ProdRuntime` from within a Tokio context. Are you using \ `#[tokio::main]` or `#[tokio::test]`?" ); let mut tokio_builder = Builder::new_multi_thread(); tokio_builder.thread_stack_size(*RUNTIME_STACK_SIZE); if *RUNTIME_WORKER_THREADS > 0 { tokio_builder.worker_threads(*RUNTIME_WORKER_THREADS); } if *RUNTIME_DISABLE_LIFO_SLOT { tokio_builder.disable_lifo_slot(); } let tokio_rt = tokio_builder.enable_all().build()?; Ok(tokio_rt) } pub fn task_monitor(name: &'static str) -> TaskMonitor { GLOBAL_TASK_MANAGER.lock().get(name) } /// Create a new tokio-based runtime. /// Expected usage: /// ```rust /// use runtime::prod::ProdRuntime; /// fn main() -> anyhow::Result<()> { /// let tokio = ProdRuntime::init_tokio()?; /// let rt = ProdRuntime::new(&tokio); /// rt.block_on(async {}); /// Ok(()) /// } /// ``` /// The `tokio_rt` should live for the duration of `main`. /// At the end of `main` its `Drop` will run and join all spawned futures, /// which should include all references to the handle `ProdRuntime`. /// If `ProdRuntime` is used after the associated `TokioRuntime` has been /// dropped, it will panic. pub fn new(tokio_rt: &TokioRuntime) -> Self { let handle = tokio_rt.handle().clone(); Self { rt: handle } } pub fn block_on<F: Future>(&self, name: &'static str, f: F) -> F::Output { let monitor = GLOBAL_TASK_MANAGER.lock().get(name); self.rt.block_on(monitor.instrument(f)) } } #[async_trait] impl Runtime for ProdRuntime { fn wait(&self, duration: Duration) -> Pin<Box<dyn FusedFuture<Output = ()> + Send + 'static>> { Box::pin(sleep(duration).fuse()) } fn spawn( &self, name: &'static str, f: impl Future<Output = ()> + Send + 'static, ) -> Box<dyn SpawnHandle> { let monitor = GLOBAL_TASK_MANAGER.lock().get(name); let handle = self.rt.spawn(propagate_tracing(monitor.instrument(f))); Box::new(TokioSpawnHandle::from(handle)) } fn spawn_thread<Fut: Future<Output = ()>, F: FnOnce() -> Fut + Send + 'static>( &self, name: &str, f: F, ) -> Box<dyn SpawnHandle> { Box::new(ThreadHandle::spawn( name.to_owned(), self.rt.clone(), propagate_tracing_blocking(move || propagate_tracing(f())), )) } fn system_time(&self) -> SystemTime { SystemTime::now() } fn monotonic_now(&self) -> tokio::time::Instant { // Guarantee that all `ProdInstant`s handed out are after `SYNC_EPOCH`. LazyLock::force(&INSTANT_EPOCH); tokio::time::Instant::now() } fn rng(&self) -> Box<dyn RngCore> { // `rand`'s default RNG is designed to be cryptographically secure: // > The PRNG algorithm in StdRng is chosen to be efficient on the current // platform, to be > statistically strong and unpredictable (meaning a // cryptographically secure PRNG). (Source: https://docs.rs/rand/latest/rand/rngs/struct.StdRng.html) Box::new(rand::rng()) } fn pause_client(&self) -> PauseClient { PauseClient::new() } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server