Skip to main content
Glama
pool_noodle.rs18.1 kB
//! [`pool_noodle`] implementations. //! //! --------------------------------------------------------------------- //! --------------------------------------------------------------------- //! ---------------------------:::::::::::::::::::::--------------------- //! ---------------------:::::::::::::::::::::-------::::::-------------- //! -----------------:::::::::::::::::::==========------::::::----------- //! ----#*+#-----::::::::::::::::::::::::---:::--==++++-:::::::::-------- //! ---+#%@@#----::::::::::::::::::===========++++***#*=::::::::::::----- //! --=+*@@@@@*::::::::::::::::::::========++++****###*=::::::::::::::--- //! --=@@@@@@@@%+::::::::::::::::::-=======+++++**###%#+:::::::::::::::-- //! ----=@@@@@@@@%=:::::::::::::::::=========++++*###%#+-:::::::::::::::: //! ------#@@@@@@@@%-:::::::::::::::-....:===:....:*#%#+-:::::::::::::::: //! -----:::#@@@@@@@@%-::::::::::::..:-=.::+.:=**:-==%#*=:::::::::::::::: //! ----::::::%@@@@@@@@%=::::::::::.-#%@@--=.#%@@#*#*%%*=:::::::::::::::: //! ---:::::::::@@@@@@@@@%=::::::..:-*@@***#+-+####*%%%#+:::::::::::::::: //! -:::::::::::%@@@@@@@@@@%#:.......+***#%%###**#%%%%%#+:::::::::::::::: //! :::::::::::*#**#@@@@@@@@@%:......=+*#%%%%%@%#%%%%%%#*-::::::::::::::: //! ::::::::::#@*+**%@@@@@@@@@=......:++*******####%%%%%*=::::::::::::::: //! :::::::::::=%##%@@@#%@%@@+........++++******###%%%%%*=::::::::::::::: //! :::::::::::=+*#@@%#%%%%@*.........++++++*****###%%%%#+.:::::::::::::: //! ::::::::::=++===+*%*===+=.........++++++*****####%%%#**=::::::::::::: //! :::::::::===++++*#@*+++**=........-+++++******###%%%%#%###*-::::::::: //! ::::::::-++===*%@@@@%%%##:........++++++******###%%%%%%%###**+::::::: //! :::::::::*%***%@@@@@%%%#:.......=+=+++++******####%%%%#%###****+::::: //! ::::::::::*@@@@@@@%%%%%#......-+==+++*********####%%%%+:.=##****+-::: //! :::::::::::::%@@@-%%%%%#*:.:++++++************####%%%%+::::-***+**-:: //! --::::::::::::::::-#%%%###********#-**********####%%%#*.:::::**++**-: //! --::::::::::::::::::*#%%%#######*:..**********####%%%#*.:::::=*++**=: //! ---:::::::::::::::::::-#%%###*-.....**********#####%%##::::::=*++**+: //! ---::::::::::::::::::::::::::.......+*********#####%%%#-:::::+++**#=: //! ---::::::::::::::::::::::::::::.....-********######%%%#+::::*++**##:: //! ----:::::::::::::::::::::::::::::::::#*******######%%%#*****++**##=:: //! ------::::::::::::::::::::::::::::::.********######%%%%##**+***#%=::: //! ---------::::::::::::::::::::::::::::********######%%%%%*****##%-:::: //! ---------::::::::::::::::::::::::::::+*******######%%%%%#**##%=:::--- //! ----------:::::::::::::::::::::::::::-#*****#######%%%%%-:+#=::::---- //! :------------:::::::::::::::::::::::::##****#######%%%%%+:::::::----- //! =:----------::::::::::::::::::::::::::*#***########%%%%%*::::::------ use std::{ fmt::Display, result, sync::Arc, }; use crossbeam_queue::ArrayQueue; use telemetry_utils::metric; use tokio::{ sync::{ Mutex, Semaphore, mpsc::{ self, Receiver, Sender, }, }, time::{ Duration, sleep, timeout, }, }; use tokio_util::sync::CancellationToken; use tracing::{ debug, info, warn, }; use crate::{ Instance, Spec, errors::PoolNoodleError, lifeguard::LifeGuard, task::{ PoolNoodleTask, PoolNoodleTaskType, }, }; type Result<T, E> = result::Result<T, PoolNoodleError<E>>; #[derive(Clone, Debug)] /// Configuration object for setting up pool noodle pub struct PoolNoodleConfig<S> { /// Verify instances can be started and stopped before starting the pool management tasks pub check_health: bool, /// Max number of worker threads to run at once. Defaults to available_parallelism() or 16 pub max_concurrency: u32, /// Maximum number of instances to manage at once pub pool_size: u32, /// Number of attempts to get from the pool before giving up with 10 ms between attempts pub retry_limit: u32, /// Shuts down the pool management tasks pub shutdown_token: CancellationToken, /// The spec for the type of instance to manage pub spec: S, } impl<S> Default for PoolNoodleConfig<S> where S: Spec + Default, { fn default() -> Self { Self { check_health: false, max_concurrency: 1000, pool_size: 100, retry_limit: 120, // * 100ms between tries, we will try for 2 minutes before giving up shutdown_token: CancellationToken::new(), spec: S::default(), } } } /// Pool Noodle is a tool for ensuring that we maintain a bare minimum number of Firecracker Jails /// for function execution. We wrap it in an Arc Mutex so we can update the queues it manages /// across threads. #[derive(Debug)] pub struct PoolNoodle<I, S: Spec>(Arc<PoolNoodleInner<I, S>>); impl<I, E, S> Clone for PoolNoodle<I, S> where I: Instance<Error = E> + Send + Sync + 'static, S: Spec<Error = E, Instance = I> + Send + Sync + 'static, E: Send, { fn clone(&self) -> Self { PoolNoodle(self.0.clone()) } } impl<I, E, S> PoolNoodle<I, S> where I: Instance<Error = E> + Send + Sync + 'static, S: Spec<Error = E, Instance = I> + Clone + Send + Sync + 'static, E: Send + Sync + Display + 'static, { /// Creates a new instance of PoolNoodle pub async fn new(config: PoolNoodleConfig<S>) -> Self { PoolNoodle(Arc::new(PoolNoodleInner::new(config))) } /// do the thing pub fn run(&mut self) -> Result<(), E> { if self.inner().check_health { if let Some(err) = futures::executor::block_on(timeout(Duration::from_secs(60), self.check_health())) .err() { return Err(PoolNoodleError::UnhealthyTimeout(err)); } } let inner = self.inner(); tokio::spawn(async move { let mut q = inner.queue_rx.lock().await; let semaphore = Arc::new(Semaphore::new(inner.max_concurrency as usize)); loop { tokio::select! { _ = inner.shutdown_token.cancelled() => { debug!("main loop received cancellation"); break; } Some(task_type) = q.recv() => { let inner = inner.clone(); let permit = semaphore.clone().acquire_owned().await; tokio::spawn(async move { inner.handle_task(task_type).await; drop(permit); }); } } } }); // start by cleaning jails just to make sure let inner = self.inner(); tokio::spawn(async move { for id in 1..=inner.pool_size { inner.push_clean_task_to_work_queue(id).await; } }); Ok(()) } fn inner(&self) -> Arc<PoolNoodleInner<I, S>> { Arc::clone(&self.0) } /// This will attempt to get a ready, healthy instance from the pool. /// If there are no instances, it will give the main loop a chance to fill the pool and try /// again. It will throw an error if there are no available instances after enough retries. pub async fn get(&self) -> Result<LifeGuard<I, E, S>, E> { metric!(counter.pool_noodle.get_requests = 1); let inner = self.inner(); let max_retries = self.inner().retry_limit; // Set the maximum number of retries let mut retries = 0; loop { if retries >= max_retries { metric!(counter.pool_noodle.get_requests = -1); return Err(PoolNoodleError::ExecutionPoolStarved); } match inner.ready_queue.pop() { Some(mut instance) => { metric!(counter.pool_noodle.ready = -1); // Try to ensure the item is healthy match &mut instance.ensure_healthy().await { Ok(_) => { metric!(counter.pool_noodle.get_requests = -1); metric!(counter.pool_noodle.active = 1); return Ok(LifeGuard::new( Some(instance), inner.queue_tx.clone(), inner.spec.clone(), )); } Err(_) => { debug!("PoolNoodle: not healthy, cleaning up and getting a new one."); drop(instance); } } } _ => { retries += 1; debug!( "Failed to get from pool, retry ({} of {})", retries, max_retries ); sleep(Duration::from_millis(100)).await; } } } } async fn check_health(&mut self) -> Result<(), E> { info!("verifying instance lifecycle health"); let id = 0; let mut task = PoolNoodleTask::new(None, id, self.inner().spec.clone()); info!("cleaning..."); task.clean().await?; info!("preparing..."); task.prepare().await?; info!("spawning..."); let mut i = task.spawn().await?; info!("checking..."); i.ensure_healthy() .await .map_err(|err| PoolNoodleError::Unhealthy(err))?; info!("terminating..."); task.set_instance(Some(i)); task.terminate().await?; self.inner() .spec .clean(id) .await .map_err(|err| PoolNoodleError::InstanceClean(err))?; info!("instance lifecycle is good!"); Ok(()) } } #[derive(Debug)] pub(crate) struct PoolNoodleInner<I, S> where S: Spec, { check_health: bool, max_concurrency: u32, pool_size: u32, ready_queue: ArrayQueue<I>, retry_limit: u32, shutdown_token: CancellationToken, spec: S, queue_rx: Mutex<Receiver<PoolNoodleTaskType<I, S>>>, queue_tx: Sender<PoolNoodleTaskType<I, S>>, } impl<I, E, S> PoolNoodleInner<I, S> where I: Instance<Error = E> + Send + Sync + 'static, S: Spec<Error = E, Instance = I> + Clone + Send + Sync + 'static, E: Send + Display + 'static, { fn new(config: PoolNoodleConfig<S>) -> Self { info!( "creating a pool of size {} with concurrency of {} ", config.pool_size, config.max_concurrency ); let (queue_tx, queue_rx) = mpsc::channel(config.pool_size as usize); Self { check_health: config.check_health, max_concurrency: config.max_concurrency, pool_size: config.pool_size, ready_queue: ArrayQueue::new(config.pool_size as usize), retry_limit: config.retry_limit, shutdown_token: config.shutdown_token, spec: config.spec, queue_rx: queue_rx.into(), queue_tx, } } async fn handle_task(self: Arc<Self>, task_type: PoolNoodleTaskType<I, S>) { match task_type { PoolNoodleTaskType::Clean(task) => self.handle_clean(task).await, PoolNoodleTaskType::Drop(task) => self.handle_drop(task).await, PoolNoodleTaskType::Prepare(task) => self.handle_prepare(task).await, } } // clean instances with backoff to handle intermittent failures. If an instance fails enough it // will be abandoned async fn handle_clean(&self, task: PoolNoodleTask<I, S>) { metric!(counter.pool_noodle.task.clean = -1); let id = task.id(); let max_retries = 5; let mut attempts = 0; loop { match task.clean().await { Ok(_) => { self.push_prepare_task_to_work_queue(id).await; break; } Err(e) => { if attempts >= max_retries { warn!( "Failed to clean instance {} after {} attempts. Abandoning this instance", id, max_retries ); break; } warn!("PoolNoodle: failed to clean instance: {}", id); warn!("{}", e); warn!("Trying again, {} of {}", attempts, max_retries); attempts += 1; tokio::time::sleep(Duration::from_millis(100 * (attempts * attempts))).await; } } } } async fn handle_drop(&self, task: PoolNoodleTask<I, S>) { metric!(counter.pool_noodle.task.drop = -1); let id = task.id(); match task.terminate().await { Ok(_) => { self.push_clean_task_to_work_queue(id).await; } Err(e) => { warn!("PoolNoodle: failed to drop instance: {}", id); warn!("{}", e); } } } async fn handle_prepare(&self, task: PoolNoodleTask<I, S>) { metric!(counter.pool_noodle.task.prepare = -1); let id = task.id(); match &task.prepare().await { Ok(_) => match task.spawn().await { Ok(instance) => { self.push_to_ready_queue(instance).await; } Err(e) => { warn!("PoolNoodle: failed to start instance: {}", id); warn!("{}", e); self.push_clean_task_to_work_queue(id).await; } }, Err(e) => { warn!("PoolNoodle: failed to ready instance: {}", id); warn!("{}", e); self.push_clean_task_to_work_queue(id).await; } } } async fn push_clean_task_to_work_queue(&self, id: u32) { let task = PoolNoodleTaskType::Clean(PoolNoodleTask::new(None, id, self.spec.clone())); if self.queue_tx.send(task).await.is_err() { warn!("failed to push instance to clean: {}", id); }; metric!(counter.pool_noodle.task.clean = 1); } async fn push_prepare_task_to_work_queue(&self, id: u32) { let task = PoolNoodleTaskType::Prepare(PoolNoodleTask::new(None, id, self.spec.clone())); if self.queue_tx.send(task).await.is_err() { warn!("failed to push instance to prepare: {}", id); }; metric!(counter.pool_noodle.task.prepare = 1); } async fn push_to_ready_queue(&self, instance: I) { let id = instance.id(); if self.ready_queue.push(instance).is_err() { warn!("failed to push to ready queue: {}", id); } metric!(counter.pool_noodle.ready = 1); } } #[cfg(test)] mod tests { use std::fmt::{ self, Formatter, }; use async_trait::async_trait; use derive_builder::Builder; use tokio::time::{ Duration, sleep, }; use super::*; use crate::instance::SpecBuilder; pub struct DummyInstance {} #[derive(Clone)] pub struct DummyInstanceSpec {} #[async_trait] impl Spec for DummyInstanceSpec { type Instance = DummyInstance; type Error = DummyInstanceError; async fn clean(&self, _id: u32) -> result::Result<(), Self::Error> { Ok(()) } async fn prepare(&self, _id: u32) -> result::Result<(), Self::Error> { Ok(()) } async fn setup(&mut self) -> result::Result<(), Self::Error> { Ok(()) } async fn spawn(&self, _id: u32) -> result::Result<Self::Instance, Self::Error> { Ok(DummyInstance {}) } } #[derive(Builder, Default, Clone)] pub struct DummyInstanceBuilder {} impl SpecBuilder for DummyInstanceBuilder { type Spec = DummyInstanceSpec; type Error = DummyInstanceError; fn build(&self) -> result::Result<Self::Spec, Self::Error> { Ok(DummyInstanceSpec {}) } } #[derive(Debug)] pub struct DummyInstanceError {} impl Display for DummyInstanceError { fn fmt(&self, _f: &mut Formatter) -> fmt::Result { Ok(()) } } #[async_trait] impl Instance for DummyInstance { type SpecBuilder = DummyInstanceBuilder; type Error = DummyInstanceError; async fn terminate(&mut self) -> result::Result<(), Self::Error> { Ok(()) } async fn ensure_healthy(&mut self) -> result::Result<(), Self::Error> { Ok(()) } fn id(&self) -> u32 { 0 } } #[tokio::test] async fn pool_noodle_lifecycle() { let shutdown_token = CancellationToken::new(); let spec = DummyInstanceSpec {}; let config = PoolNoodleConfig { check_health: false, max_concurrency: 10, pool_size: 3, retry_limit: 3, shutdown_token: shutdown_token.clone(), spec, }; let mut pool = PoolNoodle::new(config).await; pool.run().expect("failed to start"); // give the pool time to create some instances sleep(Duration::from_millis(500)).await; // go get an instance let mut instance = pool.get().await.expect("should be able to get an instance"); instance.ensure_healthy().await.expect("failed healthy"); drop(instance); let a = pool.get().await.expect("should be able to get an instance"); let b = pool.get().await.expect("should be able to get an instance"); let c = pool.get().await.expect("should be able to get an instance"); drop(a); drop(b); drop(c); shutdown_token.cancel(); assert!(pool.get().await.is_err()); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server