Skip to main content
Glama
lib.rs17 kB
#![warn( clippy::unwrap_in_result, clippy::unwrap_used, clippy::panic, clippy::missing_panics_doc, clippy::panic_in_result_fn )] // TODO(fnichol): document all, then drop `missing_errors_doc` #![allow(clippy::missing_errors_doc)] use std::{ borrow::Cow, env, fmt::{ Debug, Display, }, ops::{ Deref, DerefMut, }, result::Result, sync::Arc, }; use async_trait::async_trait; pub use opentelemetry::{ self, trace::SpanKind, }; use thiserror::Error; use tokio::sync::{ Mutex, mpsc, oneshot, }; use tokio_util::sync::CancellationToken; pub use tracing; use tracing::warn; pub mod prelude { pub use tracing::{ self, Id as SpanId, Instrument, Level, Span, debug, debug_span, enabled, error, error_span, event, event_enabled, field::Empty, info, info_span, instrument, span, span_enabled, trace, trace_span, warn, warn_span, }; pub use super::{ MessagingOperation, SpanExt, SpanKind, SpanKindExt, current_span_for_instrument_at, }; } #[remain::sorted] #[derive(Clone, Copy, Debug)] pub enum OtelStatusCode { Error, Ok, // Unset is not currently used, although represents a valid state. #[allow(dead_code)] Unset, } impl OtelStatusCode { pub fn as_str(&self) -> &'static str { match self { Self::Error => "ERROR", Self::Ok => "OK", Self::Unset => "", } } } /// Represents valied states for OpenTelemetry's `messaging.operation` field. /// /// `messaging.operation` has the following list of well-known values. If one of them applies, then /// the respective value MUST be used, otherwise a custom value MAY be used. /// /// See: <https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging/> #[remain::sorted] #[derive(Clone, Copy, Debug)] pub enum MessagingOperation { /// A message is created. “Create” spans always refer to a single message and are used to /// provide a unique creation context for messages in batch publishing scenarios. Create, /// One or more messages are passed to a consumer. This operation refers to push-based /// scenarios, where consumer register callbacks which get called by messaging SDKs. Deliver, /// One or more messages are provided for publishing to an intermediary. If a single message is /// published, the context of the “Publish” span can be used as the creation context and no /// “Create” span needs to be created. Publish, /// One or more messages are requested by a consumer. This operation refers to pull-based /// scenarios, where consumers explicitly call methods of messaging SDKs to receive messages. Receive, } impl MessagingOperation { pub const CREATE_STR: &'static str = "create"; pub const DELIVER_STR: &'static str = "deliver"; pub const PUBLISH_STR: &'static str = "publish"; pub const RECEIVE_STR: &'static str = "receive"; pub fn as_str(&self) -> &'static str { match self { Self::Create => Self::CREATE_STR, Self::Deliver => Self::DELIVER_STR, Self::Publish => Self::PUBLISH_STR, Self::Receive => Self::RECEIVE_STR, } } } /// An extention trait for [`SpanKind`] providing string representations. pub trait SpanKindExt { /// Returns a static str representation. fn as_str(&self) -> &'static str; /// Returns an allocated string representation. fn to_string(&self) -> String { self.as_str().to_string() } } impl SpanKindExt for SpanKind { fn as_str(&self) -> &'static str { match self { SpanKind::Client => "client", SpanKind::Server => "server", SpanKind::Producer => "producer", SpanKind::Consumer => "consumer", SpanKind::Internal => "internal", } } } pub trait SpanExt { fn record_ok(&self); fn record_err<E>(&self, err: E) -> E where E: Debug + Display; // fn record_status<F, T, E>(&self, f: F) -> std::result::Result<T, E> // where // F: Fn() -> std::result::Result<T, E>, // E: Debug + Display, // { // match f() { // Ok(ok) => { // self.record_ok(); // Ok(ok) // } // Err(err) => Err(self.record_err(err)), // } // } } impl SpanExt for tracing::Span { fn record_ok(&self) { self.record("otel.status_code", OtelStatusCode::Ok.as_str()); } fn record_err<E>(&self, err: E) -> E where E: Debug + Display, { self.record("otel.status_code", OtelStatusCode::Error.as_str()); self.record("otel.status_message", err.to_string().as_str()); err } } /// A telemetry client trait which can update tracing verbosity. /// /// It is designed to be consumed by library authors without the need to depend on the entire /// binary/server infrastructure of tracing-rs/OpenTelemetry/etc. #[async_trait] pub trait TelemetryClient: Clone + Send + Sync + 'static { async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError>; async fn modify_verbosity(&mut self) -> Result<(), ClientError>; async fn set_custom_tracing( &mut self, directives: impl Into<String> + Send + 'async_trait, ) -> Result<(), ClientError>; } /// A telemetry type that can report its tracing level. #[async_trait] pub trait TelemetryLevel: Send + Sync { async fn is_debug_or_lower(&self) -> bool; } /// A telemetry client which holds handles to a process' tracing and OpenTelemetry setup. #[derive(Clone, Debug)] pub struct ApplicationTelemetryClient { service_name: Box<str>, app_modules: Arc<Vec<&'static str>>, interesting_modules: Arc<Vec<&'static str>>, never_modules: Arc<Vec<&'static str>>, tracing_level: Arc<Mutex<TracingLevel>>, update_telemetry_tx: mpsc::UnboundedSender<TelemetryCommand>, } impl ApplicationTelemetryClient { pub fn new( service_name: Box<str>, app_modules: Vec<&'static str>, interesting_modules: Vec<&'static str>, never_modules: Vec<&'static str>, tracing_level: TracingLevel, update_telemetry_tx: mpsc::UnboundedSender<TelemetryCommand>, ) -> Self { Self { service_name, app_modules: Arc::new(app_modules), interesting_modules: Arc::new(interesting_modules), never_modules: Arc::new(never_modules), tracing_level: Arc::new(Mutex::new(tracing_level)), update_telemetry_tx, } } pub fn service_name(&self) -> &str { &self.service_name } pub async fn set_verbosity_and_wait(&mut self, updated: Verbosity) -> Result<(), ClientError> { let (tx, rx) = oneshot::channel(); self.set_verbosity_inner(updated, Some(tx)).await?; if let Err(err) = rx.await { warn!(error = ?err, "sender already closed while waiting on verbosity change"); } Ok(()) } pub async fn modify_verbosity_and_wait(&mut self) -> Result<(), ClientError> { let (tx, rx) = oneshot::channel(); self.modify_verbosity_inner(Some(tx)).await?; if let Err(err) = rx.await { warn!( error = ?err, "sender already closed while waiting on verbosity increase change", ); } Ok(()) } pub async fn set_custom_tracing_and_wait( &mut self, directives: impl Into<String> + Send, ) -> Result<(), ClientError> { let (tx, rx) = oneshot::channel(); self.set_custom_tracing_inner(directives, Some(tx)).await?; if let Err(err) = rx.await { warn!(error = ?err, "sender already closed while waiting on custom tracing change"); } Ok(()) } async fn set_verbosity_inner( &mut self, updated: Verbosity, wait: Option<oneshot::Sender<()>>, ) -> Result<(), ClientError> { let mut guard = self.tracing_level.lock().await; let tracing_level = guard.deref_mut(); match tracing_level { TracingLevel::Verbosity { verbosity, .. } => { *verbosity = updated; } TracingLevel::Custom(_) => { *tracing_level = TracingLevel::new( updated, Some(self.app_modules.as_slice()), Some(self.interesting_modules.as_slice()), Some(self.never_modules.as_slice()), ); } } self.update_telemetry_tx .send(TelemetryCommand::TracingLevel { level: tracing_level.clone(), wait, })?; Ok(()) } async fn modify_verbosity_inner( &mut self, wait: Option<oneshot::Sender<()>>, ) -> Result<(), ClientError> { let guard = self.tracing_level.lock().await; match guard.deref() { TracingLevel::Verbosity { verbosity, .. } => { let updated = match verbosity.is_max() { true => Verbosity::InfoAll, false => verbosity.increase(), }; drop(guard); self.set_verbosity_inner(updated, wait).await } TracingLevel::Custom(_) => Err(ClientError::CustomHasNoVerbosity), } } async fn set_custom_tracing_inner( &mut self, directives: impl Into<String> + Send, wait: Option<oneshot::Sender<()>>, ) -> Result<(), ClientError> { let mut guard = self.tracing_level.lock().await; let tracing_level = guard.deref_mut(); let updated = TracingLevel::custom(directives); *tracing_level = updated; self.update_telemetry_tx .send(TelemetryCommand::TracingLevel { level: tracing_level.clone(), wait, })?; Ok(()) } } #[async_trait] impl TelemetryClient for ApplicationTelemetryClient { async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError> { self.set_verbosity_inner(updated, None).await } async fn modify_verbosity(&mut self) -> Result<(), ClientError> { self.modify_verbosity_inner(None).await } async fn set_custom_tracing( &mut self, directives: impl Into<String> + Send + 'async_trait, ) -> Result<(), ClientError> { self.set_custom_tracing_inner(directives, None).await } } #[async_trait] impl TelemetryLevel for ApplicationTelemetryClient { async fn is_debug_or_lower(&self) -> bool { self.tracing_level.lock().await.is_debug_or_lower() } } /// A "no-nothing" telemetry client suitable for test code. /// /// Note that it will respond to `is_debug_or_lower` if the `SI_TEST_VERBOSE` environment variable /// is set which may increase logging output in tests (typically useful when debugging). #[derive(Clone, Copy, Debug)] pub struct NoopClient; #[async_trait] impl TelemetryClient for NoopClient { async fn set_verbosity(&mut self, _updated: Verbosity) -> Result<(), ClientError> { Ok(()) } async fn modify_verbosity(&mut self) -> Result<(), ClientError> { Ok(()) } async fn set_custom_tracing( &mut self, _directives: impl Into<String> + Send + 'async_trait, ) -> Result<(), ClientError> { Ok(()) } } #[async_trait] impl TelemetryLevel for NoopClient { async fn is_debug_or_lower(&self) -> bool { #[allow(clippy::disallowed_methods)] // NoopClient is only used in testing and this // environment variable is prefixed with `SI_TEST_` to denote that it's only for testing // purposes. env::var("SI_TEST_VERBOSE").is_ok() } } #[remain::sorted] #[derive(Debug, Error)] pub enum ClientError { #[error("custom tracing level has no verbosity")] CustomHasNoVerbosity, #[error("error while updating tracing level")] UpdateTracingLevel(#[from] mpsc::error::SendError<TelemetryCommand>), } #[remain::sorted] #[derive(Debug)] pub enum TelemetryCommand { Shutdown(CancellationToken), TracingLevel { level: TracingLevel, wait: Option<oneshot::Sender<()>>, }, } #[remain::sorted] #[derive(Clone, Debug)] pub enum TracingLevel { Custom(String), Verbosity { verbosity: Verbosity, app_modules: Option<Vec<Cow<'static, str>>>, interesting_modules: Option<Vec<Cow<'static, str>>>, never_modules: Option<Vec<Cow<'static, str>>>, }, } impl TracingLevel { pub fn new( verbosity: Verbosity, app_modules: Option<impl IntoAppModules>, interesting_modules: Option<impl IntoAppModules>, never_modules: Option<impl IntoAppModules>, ) -> Self { Self::Verbosity { verbosity, app_modules: app_modules.map(IntoAppModules::into_app_modules), interesting_modules: interesting_modules.map(IntoAppModules::into_app_modules), never_modules: never_modules.map(IntoAppModules::into_app_modules), } } pub fn custom(directives: impl Into<String>) -> Self { Self::Custom(directives.into()) } pub fn is_debug_or_lower(&self) -> bool { match self { Self::Verbosity { verbosity, .. } => verbosity.is_debug_or_lower(), Self::Custom(string) => string.contains("debug") || string.contains("trace"), } } } #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd)] #[allow(clippy::enum_variant_names)] pub enum Verbosity { InfoAll, DebugAppInfoInterestingInfoAll, DebugAppDebugInterestingInfoAll, TraceAppDebugInterestingInfoAll, TraceAppTraceInterestingInfoAll, TraceAppTraceInterestingDebugAll, TraceAll, } impl Verbosity { #[must_use] pub fn increase(self) -> Self { self.as_u8().saturating_add(1).into() } #[must_use] pub fn decrease(self) -> Self { self.as_u8().saturating_sub(1).into() } fn is_debug_or_lower(&self) -> bool { !matches!(self, Self::InfoAll) } fn is_max(&self) -> bool { matches!(self, Self::TraceAll) } #[inline] fn as_u8(self) -> u8 { self.into() } } impl Default for Verbosity { fn default() -> Self { Self::InfoAll } } impl From<u8> for Verbosity { fn from(value: u8) -> Self { match value { 0 => Self::InfoAll, 1 => Self::DebugAppInfoInterestingInfoAll, 2 => Self::DebugAppDebugInterestingInfoAll, 3 => Self::TraceAppDebugInterestingInfoAll, 4 => Self::TraceAppTraceInterestingInfoAll, 5 => Self::TraceAppTraceInterestingDebugAll, _ => Self::TraceAll, } } } impl From<Verbosity> for u8 { fn from(value: Verbosity) -> Self { match value { Verbosity::InfoAll => 0, Verbosity::DebugAppInfoInterestingInfoAll => 1, Verbosity::DebugAppDebugInterestingInfoAll => 2, Verbosity::TraceAppDebugInterestingInfoAll => 3, Verbosity::TraceAppTraceInterestingInfoAll => 4, Verbosity::TraceAppTraceInterestingDebugAll => 5, Verbosity::TraceAll => 6, } } } pub trait IntoAppModules { fn into_app_modules(self) -> Vec<Cow<'static, str>>; } impl IntoAppModules for Vec<String> { fn into_app_modules(self) -> Vec<Cow<'static, str>> { self.into_iter().map(Cow::Owned).collect() } } impl IntoAppModules for Vec<&'static str> { fn into_app_modules(self) -> Vec<Cow<'static, str>> { self.into_iter().map(Cow::Borrowed).collect() } } impl IntoAppModules for &[&'static str] { fn into_app_modules(self) -> Vec<Cow<'static, str>> { self.iter().map(|e| Cow::Borrowed(*e)).collect() } } #[macro_export] macro_rules! current_span_for_instrument_at { ("error") => { if enabled!(Level::ERROR) { Span::current() } else { Span::none() } }; ("warn") => { if enabled!(Level::WARN) { Span::current() } else { Span::none() } }; ("info") => { if enabled!(Level::INFO) { Span::current() } else { Span::none() } }; ("debug") => { if enabled!(Level::DEBUG) { Span::current() } else { Span::none() } }; ("trace") => { if enabled!(Level::TRACE) { Span::current() } else { Span::none() } }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server