Skip to main content
Glama
lib.rs3.48 kB
//! This crate provides a client for streaming data directly or eventually to a data warehouse. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use aws_sdk_firehose::{ operation::put_record::PutRecordError, primitives::Blob, types::Record, }; use si_aws_config::{ AwsConfig, AwsConfigError, }; use telemetry::prelude::*; use thiserror::Error; #[allow(missing_docs)] #[remain::sorted] #[derive(Debug, Error)] pub enum DataWarehouseStreamClientError { #[error("AWS Config Error error: {0}")] AwsConfig(#[from] AwsConfigError), #[error("firehose error: {0}")] Firehose(#[from] aws_sdk_firehose::Error), #[error("firehose build error: {0}")] FirehoseBuild(#[from] Box<aws_sdk_firehose::error::BuildError>), #[error("firehose put record error: {0}")] FirehosePutRecord(#[from] Box<aws_sdk_firehose::error::SdkError<PutRecordError>>), } impl From<aws_sdk_firehose::error::BuildError> for DataWarehouseStreamClientError { fn from(value: aws_sdk_firehose::error::BuildError) -> Self { Box::new(value).into() } } impl From<aws_sdk_firehose::error::SdkError<PutRecordError>> for DataWarehouseStreamClientError { fn from(value: aws_sdk_firehose::error::SdkError<PutRecordError>) -> Self { Box::new(value).into() } } type DataWarehouseStreamClientResult<T> = Result<T, DataWarehouseStreamClientError>; /// A client for communicating with a stream to a data warehouse. #[derive(Debug, Clone)] pub struct DataWarehouseStreamClient { delivery_stream_name: String, inner: Box<aws_sdk_firehose::Client>, } impl DataWarehouseStreamClient { /// Creates a new [client for communicating with a stream to a data warehouse](DataWarehouseStreamClient). #[instrument( name = "data_warehouse_stream_client.new", level = "info", skip(delivery_stream_name) )] pub async fn new( delivery_stream_name: impl Into<String>, ) -> DataWarehouseStreamClientResult<Self> { let config = AwsConfig::from_env().await?; let client = aws_sdk_firehose::Client::new(&config); Ok(Self { inner: Box::new(client), delivery_stream_name: delivery_stream_name.into(), }) } /// Publishes a message to a stream to a data warehouse. #[instrument( name = "data_warehouse_stream_client.publish", level = "debug", skip(raw_data) )] pub async fn publish(&self, raw_data: impl AsRef<[u8]>) -> DataWarehouseStreamClientResult<()> { let record = Record::builder() .data(Blob::new(raw_data.as_ref())) .build()?; let output = self .inner .put_record() .delivery_stream_name(&self.delivery_stream_name) .record(record) .send() .await?; debug!( ?output, "output from sending put record request to kinesis firehose stream" ); Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server