Skip to main content
Glama
main.rs3.81 kB
use std::env; use futures::StreamExt; use si_data_nats::{ Message, NatsClient, NatsConfig, Subscriber, }; use telemetry::prelude::*; use tracing_subscriber::{ EnvFilter, Registry, fmt::{ self, format::FmtSpan, }, prelude::*, }; const TRACING_LOG_ENV_VAR: &str = "SI_LOG"; const DEFAULT_TRACING_DIRECTIVES: &str = "nats_subscribe=trace,si_data=trace,info"; #[tokio::main] async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> { Registry::default() .with( EnvFilter::try_from_env(TRACING_LOG_ENV_VAR) .unwrap_or_else(|_| EnvFilter::new(DEFAULT_TRACING_DIRECTIVES)), ) .with( fmt::layer() .with_thread_ids(true) .with_thread_names(true) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE), ) .try_init()?; run().await } #[instrument(name = "main", level = "info", skip_all)] async fn run() -> Result<(), Box<(dyn std::error::Error + 'static)>> { let subject = env::args() .nth(1) .expect("usage: nats-subscribe SUBJECT [MAX_READ]"); let max_read = env::args() .nth(2) .map(|i| i.parse::<u32>().expect("MAX_READ must be a positive int")); if let Some(max) = max_read { info!( "reading maximum of {} messages on subject '{}'", max, &subject ); } let config = NatsConfig::default(); let nats = NatsClient::new(&config).await?; let mut subscriber = nats.subscribe(subject).await?; let mut count = 0; while let Some(message) = subscriber.next().await { count += 1; process_message(message, count, &subscriber); if let Some(max) = max_read { if count >= max { debug!("hit max read, closing subscriber and ending"); subscriber.unsubscribe_after(0).await?; break; } } } info!("subscriber stream completed"); Ok(()) } #[instrument( skip_all, level = "debug", fields( messaging.client_id = sub.metadata().messaging_client_id(), messaging.destination.name = sub.metadata().messaging_destination_name(), messaging.message.body.size = message.payload().len(), messaging.nats.server.id = sub.metadata().messaging_nats_server_id(), messaging.nats.server.name = sub.metadata().messaging_nats_server_name(), messaging.nats.server.version = sub.metadata().messaging_nats_server_version(), messaging.operation = MessagingOperation::Receive.as_str(), messaging.system = sub.metadata().messaging_system(), messaging.url = sub.metadata().messaging_url(), network.peer.address = sub.metadata().network_peer_address(), network.protocol.name = sub.metadata().network_protocol_name(), network.protocol.version = sub.metadata().network_protocol_version(), network.transport = sub.metadata().network_transport(), otel.kind = SpanKind::Consumer.as_str(), // similar to an RPC operation otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = sub.metadata().server_address(), server.port = sub.metadata().server_port(), ) )] fn process_message(message: Message, count: u32, sub: &Subscriber) { let span = current_span_for_instrument_at!("debug"); span.follows_from(sub.span()); span.record( "otel.name", format!( "{} {}", message.subject(), MessagingOperation::Receive.as_str() ) .as_str(), ); let data = String::from_utf8_lossy(message.payload()); info!(message = ?message, data = data.as_ref(), count); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server