Skip to main content
Glama
lib.rs25.8 kB
//! An executor which manages a Tokio runtime that is dedicated to a specific set of workloads. The //! futures and any spawned tasks will run on this runtime which can be purpose-tuned. //! //! The implementation of this crate comes from the [`executor`] crate in [InfluxData]'s //! [influxdb3_core] project which is collectively released under the [MIT] or [Apache v2.0] //! license. //! //! This implementation is based on the `executor` crate as of July 10, 2024: //! //! <https://github.com/influxdata/influxdb3_core/tree/78b4d56989410b30a3cc48020c1491405943b4ad/executor> //! //! # References //! //! - [The New Stack: Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/) //! - [Rustacean Station: Rebuilding InfluxDB with Rust with Andrew Lamb](https://rustacean-station.org/episode/andrew-lamb/) //! - [`executor`] crate //! //! [Apache v2.0]: https://github.com/influxdata/influxdb3_core/blob/main/LICENSE-APACHE //! [InfluxData]: https://www.influxdata.com/ //! [MIT]: https://github.com/influxdata/influxdb3_core/blob/main/LICENSE-MIT //! [`executor`]: https://github.com/influxdata/influxdb3_core/tree/main/executor //! [influxdb3_core]: https://github.com/influxdata/influxdb3_core #![warn( clippy::unwrap_in_result, clippy::indexing_slicing, clippy::arithmetic_side_effects, clippy::unwrap_used, clippy::panic, clippy::missing_panics_doc, clippy::panic_in_result_fn, missing_docs )] use std::{ fmt, future::Future, sync::Arc, thread, time::Duration, }; use futures::{ FutureExt, TryFutureExt, future::{ BoxFuture, Shared, }, }; use parking_lot::RwLock; use thiserror::Error; use thread_priority::{ ThreadPriority, set_current_thread_priority, }; use tokio::{ runtime, sync::oneshot, task::JoinSet, }; use tokio_util::sync::CancellationToken; use tracing::warn; mod parent; pub use parent::{ register_current_runtime_as_parent, register_parent_runtime, spawn_on_parent, }; /// Runs futures (and any [`tokio::spawn`]ed tasks) on a seperate & dedicated Tokio runtime. /// /// Such Tokio runtimes can be tuned for specific workloads, priorities, thread counts, etc. /// /// # Task Scheduling /// /// The work performed by this executor (and thus on the underlying Tokio runtime) may be /// particular and specific, for example performing CPU-intensive work in an executor thereby /// preventing the slow down of the main Tokio runtime. If such work requires tasks to be spawned /// back on the original Tokio runtime (referred to here as the "parent" Tokio runtime), this can /// be accomplished by using [`spawn_on_parent`]. #[derive(Clone)] pub struct DedicatedExecutor { state: Arc<RwLock<State>>, } impl fmt::Debug for DedicatedExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DedicatedExecutor").finish_non_exhaustive() } } impl DedicatedExecutor { /// Creates a [`DedicatedExecutor`] for work that is seperate and runtime-isolated from any /// other Tokio runtimes. /// /// # Implementation Notes /// /// The implementation uses a techique found on Stack Overflow to create a new Tokio runtime /// which may have been invoked *within* an existing Tokio runtime. /// /// See: https://stackoverflow.com/a/62536772 pub fn new( name: &str, mut tokio_rt_builder: runtime::Builder, thread_priority: impl Into<Option<u8>>, shutdown_timeout: Duration, ) -> Result<Self, DedicatedExecutorInitializeError> { let manaing_tokio_rt_thread_name = format!("{name}-dedicated-executor-manager"); let shutdown_token = CancellationToken::new(); let (shutdown_completed_tx, shutdown_completed_rx) = oneshot::channel(); let (handle_result_tx, handle_result_rx) = std::sync::mpsc::channel(); let parent_tokio_rt_handle = runtime::Handle::try_current().ok(); let executor_shutdown_token = shutdown_token.clone(); let maybe_thread_priority = match thread_priority.into() { Some(value) => Some(ThreadPriority::try_from(value).map_err(|_| { DedicatedExecutorInitializeError::new(ThreadPriorityParseError(value)) })?), None => None, }; let managing_tokio_rt_thread = thread::Builder::new() .name(manaing_tokio_rt_thread_name) .spawn(move || { // Register parent Tokio runtime for current thread parent::register_parent_runtime(parent_tokio_rt_handle.clone()); #[allow(clippy::blocks_in_conditions)] let tokio_rt = match tokio_rt_builder // Register parent Tokio runtime on new runtime threads .on_thread_start(move || { if let Some(thread_priority) = maybe_thread_priority { if let Err(err) = set_current_thread_priority(thread_priority) { warn!( error = ?err, ?thread_priority, "failed to set thread priority", ); } } parent::register_parent_runtime(parent_tokio_rt_handle.clone()); }) .build() { Ok(tokio_rt) => tokio_rt, Err(err) => { handle_result_tx.send(Err(err)).ok(); return; // Early return if we failed to build the Tokio runtime } }; tokio_rt.block_on(async move { // Send the [`runtime::Handle`] back to the constructor's thread if handle_result_tx .send(Ok(runtime::Handle::current())) .is_err() { return; // Early return if we failed to send back the handle } // Wait for a shutdown of the executor to be triggered executor_shutdown_token.cancelled().await; }); // Shutdown the Tokio runtime, waiting at most the given duration for all spawned // work to complete. tokio_rt.shutdown_timeout(shutdown_timeout); // Signal that executor shutdown has completed shutdown_completed_tx.send(()).ok(); }) // Failed to successfully spawn thread .map_err(DedicatedExecutorInitializeError::new)?; // Read the [`runtime::Handle`] from the managing thread let handle = handle_result_rx .recv() // RecvError .map_err(DedicatedExecutorInitializeError::new)? // Error from initializing inside thread .map_err(DedicatedExecutorInitializeError::new)?; let state = State { tokio_rt_handle: Some(handle), managing_tokio_rt_thread: Some(managing_tokio_rt_thread), shutdown_token, shutdown_completed: shutdown_completed_rx.map_err(Arc::new).boxed().shared(), }; Ok(Self { state: Arc::new(RwLock::new(state)), }) } /// Runs the [`Future`] (and any tasks it spawns) on the `DedicatedExecutor`. /// /// # Cancellation /// /// If the returned `Future` is dropped then the task is immediately aborted. #[allow(clippy::missing_panics_doc)] pub fn spawn<T>( &self, task: T, ) -> impl Future<Output = Result<T::Output, DedicatedExecutorError>> + use<T> where T: Future + Send + 'static, T::Output: Send + 'static, { let maybe_tokio_rt_handle = { let state = self.state.read(); state.tokio_rt_handle.clone() }; let Some(tokio_rt_handle) = maybe_tokio_rt_handle else { return futures::future::err(DedicatedExecutorError::WorkerGone).boxed(); }; // NOTE: we are using a [`JoinSet`] to benefit from its "cancel on drop" behavior: // // > When the JoinSet is dropped, all tasks in the JoinSet are immediately aborted. // See: https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html let mut join_set = JoinSet::new(); join_set.spawn_on(task, &tokio_rt_handle); async move { #[allow(clippy::expect_used)] // task spawned & immediately joined join_set .join_next() .await .expect("just spawned task; will not be none") .map_err(|err| match err.try_into_panic() { // Task had panicked Ok(err) => { let msg = if let Some(s) = err.downcast_ref::<String>() { s.clone() } else if let Some(s) = err.downcast_ref::<&str>() { s.to_string() } else { "unknown internal error".to_string() }; DedicatedExecutorError::TaskPanicked(msg) } // Not a panic, runtime has likely shut down Err(_) => DedicatedExecutorError::WorkerGone, }) } .boxed() } /// Triggers the shutdown of this executor and any clones. pub fn shutdown(&self) { let mut state = self.state.write(); state.tokio_rt_handle.take(); // Trigger the managing Tokio runtime's thread to shut down state.shutdown_token.cancel(); } /// Shuts down the executor and any clones. /// /// All subsequent tasks executions are stopped and the managing thread is await for its /// completion. /// /// NOTE: all clones of this `DedicatedExecutor` will be shut down as well. /// /// # Implementation Notes /// /// Only the first call to `join` will wait for the managing thread to complete. All subsequent /// calls to `join` will complete immediately. /// /// # Panic /// /// [`DedicatedExecutor`] implements shutdown on [`Drop`] (indirectly through dropping its /// internal state). You should rely on this behavior and *not* call `join` manually during /// [`Drop`] or panics as this may lead to another panic. For more detail, see: /// <https://github.com/rust-lang/futures-rs/issues/2575>. pub async fn join(&self) -> Result<(), DedicatedExecutorJoinError> { self.shutdown(); let shutdown_completed = { let state = self.state.read(); state.shutdown_completed.clone() }; shutdown_completed .await .map_err(|_| DedicatedExecutorJoinError) } } /// Error when running a spawned [`DedicatedExecutor`] task. #[remain::sorted] #[derive(Debug, Error)] pub enum DedicatedExecutorError { /// When a task panics #[error("task panicked: {0}")] TaskPanicked(String), /// When attempting to spawn a task and the executor has already shut down #[error("worker thread is gone, executor has likely shut down")] WorkerGone, } /// Error when initializing a [`DedicatedExecutor`]. #[derive(Debug, Error)] #[error("failed to initialize executor: {0}")] pub struct DedicatedExecutorInitializeError( #[source] Box<dyn std::error::Error + 'static + Sync + Send>, ); impl DedicatedExecutorInitializeError { /// Creates a new `DedicatedExecutorInitializeError`. pub fn new<E>(err: E) -> Self where E: std::error::Error + 'static + Sync + Send, { Self(Box::new(err)) } } /// Error when calling [`DedicatedExecutor::join`]. #[derive(Debug, Error)] #[error("error while awaiting shutdown; sender already closed")] pub struct DedicatedExecutorJoinError; /// Error when parsing a thread priority value #[derive(Debug, Error)] #[error("failed parse thread priority value: {0}")] struct ThreadPriorityParseError(u8); /// Interior state for [`DedicatedExecutor`]. struct State { /// Tokio Runtime handle. /// /// NOTE: value is `None` when runtime is shutting down. tokio_rt_handle: Option<runtime::Handle>, /// Managing thread is managing the Tokio runtime and can be joined during [`Drop`]. managing_tokio_rt_thread: Option<thread::JoinHandle<()>>, /// Token that when triggered will initiate a executor shutdown. shutdown_token: CancellationToken, /// Future that when ready signals that shutdown has completed. shutdown_completed: Shared<BoxFuture<'static, Result<(), Arc<oneshot::error::RecvError>>>>, } // NOTE: [`Drop`] should be implemented for [`State`] and *not* the [`DedicatedExecutor`] as the // the executor can be cloned, whereas there will only be one instance of [`State`] for all // executor clones. impl Drop for State { fn drop(&mut self) { if self.tokio_rt_handle.is_some() { warn!("a `DedicatedExecutor` was dropped without calling `shutdown()`"); self.tokio_rt_handle.take(); self.shutdown_token.cancel(); } // NOTE: ensure the thread is *not* panicking before polling the shared future // // See: https://github.com/rust-lang/futures-rs/issues/2575 if !thread::panicking() && self.shutdown_completed.clone().now_or_never().is_none() { warn!("a `DedicatedExecutor` was dropped without waiting for worker termination"); } // Join the thread but we don't about the result self.managing_tokio_rt_thread .take() .map(|thread| thread.join().ok()); } } #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used)] mod tests { use core::panic; use std::{ panic::panic_any, sync::Barrier, }; use tokio::sync::Barrier as AsyncBarrier; use super::*; const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); fn exec() -> DedicatedExecutor { exec_with_threads(1) } fn exec2() -> DedicatedExecutor { exec_with_threads(2) } fn exec_with_threads(threads: usize) -> DedicatedExecutor { let mut tokio_rt_builder = runtime::Builder::new_multi_thread(); tokio_rt_builder.worker_threads(threads); tokio_rt_builder.enable_all(); DedicatedExecutor::new( "Test DedicatedExecutor", tokio_rt_builder, None, RUNTIME_SHUTDOWN_TIMEOUT, ) .expect("failed to initialize runtime") } // Wait for the barrier and then return the `result` value async fn do_work(result: usize, barrier: Arc<Barrier>) -> usize { barrier.wait(); result } // Wait for the barrier and then return the `result` value async fn do_work_async(result: usize, barrier: Arc<AsyncBarrier>) -> usize { barrier.wait().await; result } async fn test_io_runtime_multi_thread_impl(executor: DedicatedExecutor) { let io_rt_thread_id = std::thread::current().id(); executor .spawn(async move { let rt_thread_id = std::thread::current().id(); let spawned_thread_id = parent::spawn_on_parent(async move { std::thread::current().id() }).await; assert_ne!(rt_thread_id, spawned_thread_id); assert_eq!(io_rt_thread_id, spawned_thread_id); }) .await .expect("task errored"); } #[tokio::test] async fn basic() { let barrier = Arc::new(Barrier::new(2)); let executor = exec(); let executor_task = executor.spawn(do_work(42, Arc::clone(&barrier))); // NOTE: the `executor_task` will never complete if it runs on the main Tokio thread (as // this test is not using the multi-threaded version of the runtime and the call // `barrier.wait()` blocks the Tokio thread) barrier.wait(); assert_eq!(executor_task.await.expect("task errored"), 42); executor.join().await.expect("join errored"); } #[tokio::test] async fn basic_clone() { let barrier = Arc::new(Barrier::new(2)); let executor = exec(); // Running task on a clone of the executor should work as normal let executor_task = executor.clone().spawn(do_work(42, Arc::clone(&barrier))); barrier.wait(); assert_eq!(executor_task.await.expect("task errored"), 42); executor.join().await.expect("join errored"); } #[tokio::test] async fn drop_empty_executor() { // Drop should not panic or fail on an exector not doing anything (i.e. "empty") exec(); } #[tokio::test] async fn drop_clone() { let barrier = Arc::new(Barrier::new(2)); let executor = exec(); // Clones should drop cleanly without dropping the executor drop(executor.clone()); let executor_task = executor.clone().spawn(do_work(42, Arc::clone(&barrier))); barrier.wait(); assert_eq!(executor_task.await.expect("task errored"), 42); executor.join().await.expect("join errored"); } #[tokio::test] #[should_panic(expected = "foo")] async fn just_panic() { struct Foobar(DedicatedExecutor); impl Drop for Foobar { fn drop(&mut self) { self.0.join().now_or_never(); } } let executor = exec(); let _foo = Foobar(executor); // This must not lead to a double-panic and `SIGILL` // // See: https://www.gnu.org/software/libc/manual/html_node/Program-Error-Signals.html panic!("foo"); } #[tokio::test] async fn multi_task() { let barrier = Arc::new(Barrier::new(3)); // Create a runtime with 2 threads let executor = exec2(); let executor_task1 = executor.spawn(do_work(21, Arc::clone(&barrier))); let executor_task2 = executor.spawn(do_work(42, Arc::clone(&barrier))); // Block main thread until completion of other 2 tasks barrier.wait(); assert_eq!(executor_task1.await.expect("task errored"), 21); assert_eq!(executor_task2.await.expect("task errored"), 42); executor.join().await.expect("join errored"); } #[tokio::test] async fn tokio_spawn() { let executor = exec2(); // Spawn a task that spawns another task to ensure that they both run on the compute // runtime let executor_task = executor.spawn(async move { // Spawn a seperate task let other_task = tokio::task::spawn(async { 25usize }); other_task.await.expect("join errored") }); // Validate that the inner task ran to completion and it did not panic assert_eq!(executor_task.await.expect("task errored"), 25); executor.join().await.expect("join errored"); } #[tokio::test] async fn panic_on_runtime_str() { let executor = exec(); let executor_task = executor.spawn(async move { if true { panic!("oh noes!"); } else { 42 } }); match executor_task.await.unwrap_err() { DedicatedExecutorError::TaskPanicked(msg) => assert_eq!("oh noes!", msg), DedicatedExecutorError::WorkerGone => panic!("unexpected error"), } } #[tokio::test] async fn panic_on_runtime_string() { let executor = exec(); let executor_task = executor.spawn(async move { if true { panic!("{}, {}", 1, 2); } else { 42 } }); match executor_task.await.unwrap_err() { DedicatedExecutorError::TaskPanicked(msg) => assert_eq!("1, 2", msg), DedicatedExecutorError::WorkerGone => panic!("unexpected error"), } } #[tokio::test] async fn panic_on_runtime_other() { let executor = exec(); let executor_task = executor.spawn(async move { if true { panic_any(1); } else { 42 } }); match executor_task.await.unwrap_err() { DedicatedExecutorError::TaskPanicked(msg) => assert_eq!("unknown internal error", msg), DedicatedExecutorError::WorkerGone => panic!("unexpected error"), } } #[tokio::test] async fn executor_shutdown_while_running_task() { let barrier1 = Arc::new(Barrier::new(2)); let captured1 = Arc::clone(&barrier1); let barrier2 = Arc::new(Barrier::new(2)); let captured2 = Arc::clone(&barrier2); let executor = exec(); let executor_task = executor.spawn(async move { captured1.wait(); do_work(42, captured2).await }); barrier1.wait(); executor.shutdown(); // Block main thread until completion of the outstanding task barrier2.wait(); assert_eq!(executor_task.await.expect("task errored"), 42); executor.join().await.expect("join errored"); } #[tokio::test] async fn executor_submit_task_after_clone_shutdown() { let executor = exec(); // Shut down the clone, but not the `exec` executor.clone().join().await.expect("join errored"); // Simulate trying to submit a task once runtime has shutdown let executor_task = executor.spawn(async { 11 }); assert!(matches!( executor_task.await.unwrap_err(), DedicatedExecutorError::WorkerGone )); executor.join().await.expect("join errored"); } #[tokio::test] async fn executor_join() { let executor = exec(); // Ensure join doesn't hang executor.join().await.expect("join errored"); } #[tokio::test] async fn executor_join2() { let executor = exec(); // Ensure join doesn't hang executor.join().await.expect("join errored"); executor.join().await.expect("join errored"); } #[tokio::test] async fn executor_clone_join() { let executor = exec(); // Ensure join doesn't hang executor.clone().join().await.expect("join errored"); executor.clone().join().await.expect("join errored"); executor.join().await.expect("join errored"); } #[tokio::test] async fn drop_receiver() { // Create an empty executor let executor = exec(); // Create first blocked task let barrier1_pre = Arc::new(AsyncBarrier::new(2)); let barrier1_pre_captured = Arc::clone(&barrier1_pre); let barrier1_post = Arc::new(AsyncBarrier::new(2)); let barrier1_post_captured = Arc::clone(&barrier1_post); let executor_task1 = executor.spawn(async move { barrier1_pre_captured.wait().await; do_work_async(11, barrier1_post_captured).await }); barrier1_pre.wait().await; // Create first blocked task let barrier2_pre = Arc::new(AsyncBarrier::new(2)); let barrier2_pre_captured = Arc::clone(&barrier2_pre); let barrier2_post = Arc::new(AsyncBarrier::new(2)); let barrier2_post_captured = Arc::clone(&barrier2_post); let executor_task2 = executor.spawn(async move { barrier2_pre_captured.wait().await; do_work_async(22, barrier2_post_captured).await }); barrier2_pre.wait().await; // Cancel a task 1 drop(executor_task1); // Wait on cancellation, evidient by `barrier2_post` Arc count going from 2 to 1 (this // might take a short while) tokio::time::timeout(Duration::from_secs(1), async { loop { if Arc::strong_count(&barrier1_post) == 1 { return; } tokio::time::sleep(Duration::from_millis(10)).await } }) .await .expect("timeout reached"); // Unblock task 2 barrier2_post.wait().await; assert_eq!(executor_task2.await.expect("task errored"), 22); tokio::time::timeout(Duration::from_secs(1), async { loop { if Arc::strong_count(&barrier2_post) == 1 { return; } tokio::time::sleep(Duration::from_millis(10)).await } }) .await .expect("timeout reached"); executor.join().await.expect("join errored"); } #[tokio::test] async fn io_runtime_multi_thread() { let mut tokio_rt_builder = runtime::Builder::new_multi_thread(); tokio_rt_builder.worker_threads(1); let executor = DedicatedExecutor::new( "Test DedicatedExecutor", tokio_rt_builder, None, RUNTIME_SHUTDOWN_TIMEOUT, ) .expect("failed to initialize runtime"); test_io_runtime_multi_thread_impl(executor).await; } #[tokio::test] async fn io_runtime_current_thread() { let tokio_rt_builder = runtime::Builder::new_current_thread(); let executor = DedicatedExecutor::new( "Test DedicatedExecutor", tokio_rt_builder, None, RUNTIME_SHUTDOWN_TIMEOUT, ) .expect("failed to initialize runtime"); test_io_runtime_multi_thread_impl(executor).await; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server