Skip to main content
Glama
lib.rs12.5 kB
//! This crate supports metrics for how loaded the tokio reactor is. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::{ io, thread, time::{ Duration, Instant, }, }; use telemetry::prelude::*; use telemetry_utils::metric; use tokio::{ runtime::Handle, sync::mpsc::{ self, Receiver, Sender, error::TrySendError, }, }; use tokio_util::sync::CancellationToken; /// How frequently to emit metrics about the tokio reactor. const DEFAULT_MEASUREMENT_FREQUENCY: Duration = Duration::from_secs(30); /// The time before we record a warning that the watchdog hasn't received a response. const DEFAULT_WARN_THRESHOLD: Duration = Duration::from_millis(200); /// The maximum delay between being cancelled and the watchdog thread actually stopping. /// The smaller this value, the more the watchdog will "spin" the thread, which could take /// work away from other threads. const MAX_CANCELLATION_DELAY: Duration = Duration::from_millis(200); /// Spawn a new TokioWatchdog for the main tokio runtime. /// /// # Metrics /// /// - `count.tokio_watchdog.hang`: Whether or not a hang is currently happening. /// - `histogram.tokio_watchdog.request_time_nanos`: The tokio reactor lag (time it took /// for the watchdog receiver to wake when a new request was sent) /// - `histogram.tokio_watchdog.global_queue_depth`: The depth of the tokio global queue /// (should not generally have anything in it unless the system is heavily loaded) /// - `histogram.tokio_watchdog.num_workers`: The number of workers in the tokio runtime /// - `histogram.tokio_watchdog.num_alive_tasks`: The number of tokio tasks that are currently alive pub fn spawn( bin_name: impl Into<Box<str>>, cancellation_token: CancellationToken, ) -> io::Result<thread::JoinHandle<()>> { TokioWatchdog::new(bin_name, cancellation_token).spawn() } /// Spawn a new TokioWatchdog for the current tokio runtime. /// /// # Metrics /// /// - `count.tokio_watchdog.hang`: Whether or not a hang is currently happening. /// - `histogram.tokio_watchdog.request_time_nanos`: The tokio reactor lag (time it took /// for the watchdog receiver to wake when a new request was sent) /// - `histogram.tokio_watchdog.global_queue_depth`: The depth of the tokio global queue /// (should not generally have anything in it unless the system is heavily loaded) /// - `histogram.tokio_watchdog.num_workers`: The number of workers in the tokio runtime /// - `histogram.tokio_watchdog.num_alive_tasks`: The number of tokio tasks that are currently alive pub fn spawn_for_runtime( runtime_name: impl Into<Box<str>>, handle: Handle, cancellation_token: CancellationToken, ) -> io::Result<thread::JoinHandle<()>> { TokioWatchdog::new_for_runtime(runtime_name, handle, cancellation_token).spawn() } /// Watchdog for measuring the delay in the tokio reactor. #[derive(Debug)] pub struct TokioWatchdog { runtime_name: Box<str>, handle: Handle, measurement_frequency: Duration, warn_threshold: Duration, cancellation_token: CancellationToken, } /// Used as an error to signal that the watchdog was cancelled. enum WatchdogError { Cancelled, InternalError, } impl TokioWatchdog { /// Create a new TokioWatchdog for the current Tokio runtime. pub fn new(runtime_name: impl Into<Box<str>>, cancellation_token: CancellationToken) -> Self { Self::new_for_runtime(runtime_name, Handle::current(), cancellation_token) } /// Create a new TokioWatchdog for a specific Tokio runtime. pub fn new_for_runtime( runtime_name: impl Into<Box<str>>, handle: Handle, cancellation_token: CancellationToken, ) -> Self { Self { runtime_name: runtime_name.into(), handle, measurement_frequency: DEFAULT_MEASUREMENT_FREQUENCY, warn_threshold: DEFAULT_WARN_THRESHOLD, cancellation_token, } } /// Set the frequency at which we measure the delay in the tokio reactor. pub fn with_measurement_frequency(self, measurement_frequency: Duration) -> Self { Self { measurement_frequency, ..self } } /// Set warn threshold pub fn with_warn_threshold(self, warn_threshold: Duration) -> Self { Self { warn_threshold, ..self } } /// Start a thread which instruments the delay in the tokio reactor. /// /// The thread periodically sends to a tokio channel and measures how long it takes before the /// tokio task receives the message. /// /// Returns a CancellationToken that can be used to cancel the watchdog. pub fn spawn(self) -> io::Result<thread::JoinHandle<()>> { if self.warn_threshold.is_zero() { warn!( runtime = self.runtime_name, "warn threshold must be above 0" ); return Err(io::Error::other("invalid watchdog configuration")); } if self.measurement_frequency <= self.warn_threshold { warn!( runtime = self.runtime_name, "measurement frequency must be longer than warn threshold" ); return Err(io::Error::other("invalid watchdog configuration")); } self.spawn_watchdog_loop() } fn spawn_watchdog_loop(self) -> io::Result<thread::JoinHandle<()>> { thread::Builder::new() .name(format!("tokio watchdog for {}", self.runtime_name)) .spawn(move || { // Throw away the error that tells us whether it was cancelled let _ = self.watchdog_loop(); }) } // This blocks forever (at least until cancellation happens) fn watchdog_loop(self) -> Result<(), WatchdogError> { // Set up an async task in tokio that will receive a request and respond as quickly as // possible. let tx_request = self.spawn_responder_loop(); debug!( runtime = self.runtime_name, measurement_frequency_secs = self.measurement_frequency.as_secs_f64(), warn_threshold_secs = self.warn_threshold.as_secs_f64(), "tokio watchdog started", ); let mut sent_at = Instant::now(); loop { self.send_tokio_metrics(); // Sleep thread until it's time to measure again self.blocking_sleep(self.measurement_frequency.saturating_sub(sent_at.elapsed()))?; // Send the request, including the time the request was sent so the receiver // can figure out the delay. sent_at = Instant::now(); match tx_request.try_send(sent_at) { Ok(()) => {} // It should be impossible for the channel to be full. The fact that we looped // back around means we waited for the receiver to pick up the last request. Err(TrySendError::Full(_)) => { error!("tokio watchdog internal error: channel is full"); return Err(WatchdogError::InternalError); } Err(TrySendError::Closed(_)) => { error!( "tokio watchdog shutting down because responder closed early; shutting down" ); return Err(WatchdogError::InternalError); } } // If the request isn't processed within the warn_threshold, we'll log a warning. self.blocking_sleep(self.warn_threshold)?; if Self::not_received_yet(&tx_request) { // It's taking forever. Mark it as a hang! warn!( runtime = self.runtime_name, elapsed = sent_at.elapsed().as_millis(), capacity = tx_request.capacity(), is_closed = tx_request.is_closed(), "tokio watchdog hang detected" ); metric!(counter.tokio_watchdog.hang = 1); // Wait for the responder to pick up the work (at which point the hang is over). // If we don't do this, the next iteration of the loop will have to handle // "channel is full" anyway, so may as well do it here. // TODO waiting until the responder picks up will delay us emitting tokio // metrics. We should cancel and maybe even restart the responder if we take // too long (or something like that). while Self::not_received_yet(&tx_request) { self.blocking_sleep(MAX_CANCELLATION_DELAY)?; } // The responder finally picked up the work; we know we're not hanging now. info!( runtime = self.runtime_name, hang_nanos = sent_at.elapsed().as_nanos(), "tokio watchdog hang ended" ); metric!(counter.tokio_watchdog.hang = -1); } } } /// true if the request channel still has a value that hasn't been received, *and* /// the receiver is alive to receive it. fn not_received_yet(tx_request: &Sender<Instant>) -> bool { tx_request.capacity() == 0 && !tx_request.is_closed() } /// Blocks the thread for the given duration without invoking the tokio reactor. /// Will cancel the sleep early if the watchdog is cancelled. fn blocking_sleep(&self, mut duration: Duration) -> Result<(), WatchdogError> { // Sleep for small increments, wake up, and check for cancellation while duration > MAX_CANCELLATION_DELAY && self.check_cancelled()? { thread::sleep(MAX_CANCELLATION_DELAY); duration -= MAX_CANCELLATION_DELAY; } if self.check_cancelled()? && duration > Duration::ZERO { thread::sleep(duration); } Ok(()) } // Check if we've been cancelled and toss an error if so. // Returns true if we haven't been cancelled, so it can be used in while loops. fn check_cancelled(&self) -> Result<bool, WatchdogError> { if self.cancellation_token.is_cancelled() { return Err(WatchdogError::Cancelled); } Ok(true) } fn spawn_responder_loop(&self) -> Sender<Instant> { let (tx_request, rx_request) = mpsc::channel::<Instant>(1); self.handle.spawn(Self::responder_loop( rx_request, self.runtime_name.to_string(), )); tx_request } /// Loop that receives requests on the tokio threads and records a metric of how long /// they took async fn responder_loop(mut rx_request: Receiver<Instant>, runtime_name: String) { loop { match rx_request.recv().await { Some(start) => { // Record the metric let request_time_nanos = start.elapsed().as_nanos() as u64; metric!( histogram.tokio_watchdog.request_time_nanos = request_time_nanos, runtime = runtime_name ); } None => { debug!( "tokio watchdog responder task shutting down because watchdog tx closed" ); return; } }; } } fn send_tokio_metrics(&self) { let metrics = self.handle.metrics(); metric!( histogram.tokio_watchdog.global_queue_depth = metrics.global_queue_depth(), runtime = self.runtime_name ); metric!( histogram.tokio_watchdog.num_workers = metrics.num_workers(), runtime = self.runtime_name ); metric!( histogram.tokio_watchdog.num_alive_tasks = metrics.num_alive_tasks(), runtime = self.runtime_name ); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server