Skip to main content
Glama
message.rs9.6 kB
use std::{ error, fmt, ops, }; use async_nats::{ HeaderMap, StatusCode, Subject, }; use bytes::Bytes; mod extensions; pub use extensions::Extensions; use crate::response::{ IntoResponse, Response, }; #[derive(Clone)] pub struct Message<T> { inner: T, extensions: Extensions, } impl<T> Message<T> where T: MessageHead, { #[inline] pub(crate) fn new(inner: T) -> Self { Self { inner, extensions: Extensions::new(), } } #[inline] pub(crate) fn new_with_extensions(inner: T, extensions: Extensions) -> Self { Self { inner, extensions } } #[inline] pub fn from_parts(head: Head, payload: Bytes) -> Result<Self, FromPartsError> { let (inner, extensions) = T::from_head_and_payload(head, payload)?; Ok(Self::new_with_extensions(inner, extensions)) } #[inline] pub fn subject(&self) -> &Subject { self.inner.subject() } #[inline] pub fn reply(&self) -> Option<&Subject> { self.inner.reply() } #[inline] pub fn headers(&self) -> Option<&HeaderMap> { self.inner.headers() } #[inline] pub fn status(&self) -> Option<StatusCode> { self.inner.status() } #[inline] pub fn description(&self) -> Option<&str> { self.inner.description() } #[inline] pub fn length(&self) -> usize { self.inner.length() } #[inline] pub fn payload_length(&self) -> usize { self.inner.payload_length() } #[inline] pub fn extensions(&self) -> &Extensions { &self.extensions } #[inline] pub fn extensions_mut(&mut self) -> &mut Extensions { &mut self.extensions } pub fn head(&self) -> HeadRef<'_> { HeadRef { subject: self.subject(), reply: self.reply(), headers: self.headers(), status: self.status(), description: self.description(), length: self.length(), payload_length: self.payload_length(), extensions: self.extensions(), } } #[inline] pub fn into_inner(self) -> T { self.inner } #[inline] pub fn split(self) -> (T, Extensions) { (self.inner, self.extensions) } #[inline] pub fn into_parts(self) -> (Head, Bytes) { let (mut head, payload) = self.inner.into_head_and_payload(); head.extensions.extend(self.extensions); (head, payload) } } impl<T: fmt::Debug> fmt::Debug for Message<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Message") .field("parts", &self.extensions) .field("inner", &self.inner) .finish() } } impl<T> ops::Deref for Message<T> { type Target = T; fn deref(&self) -> &Self::Target { &self.inner } } impl<T> ops::DerefMut for Message<T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } impl<T> From<T> for Message<T> where T: MessageHead, { fn from(value: T) -> Self { Self::new(value) } } #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct FromPartsError(&'static str); impl fmt::Display for FromPartsError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to build message from parts: {}", self.0) } } impl error::Error for FromPartsError {} impl IntoResponse for FromPartsError { fn into_response(self) -> Response { Response::default_internal_server_error() } } pub trait MessageHead { /// Subject to which message is published to. fn subject(&self) -> &Subject; /// Optional reply subject to which response can be published by a subscriber or consumer. fn reply(&self) -> Option<&Subject>; /// Optional headers. fn headers(&self) -> Option<&HeaderMap>; /// Optional Status of the message. Used mostly for internal handling. fn status(&self) -> Option<StatusCode>; /// Optional status description. fn description(&self) -> Option<&str>; /// Length of message in bytes. fn length(&self) -> usize; /// Length of message's payload in bytes. fn payload_length(&self) -> usize; fn from_head_and_payload( head: Head, payload: Bytes, ) -> Result<(Self, Extensions), FromPartsError> where Self: Sized; fn into_head_and_payload(self) -> (Head, Bytes); } impl MessageHead for async_nats::Message { fn subject(&self) -> &Subject { &self.subject } fn reply(&self) -> Option<&Subject> { self.reply.as_ref() } fn headers(&self) -> Option<&HeaderMap> { self.headers.as_ref() } fn status(&self) -> Option<StatusCode> { self.status } fn description(&self) -> Option<&str> { self.description.as_deref() } fn length(&self) -> usize { self.length } fn payload_length(&self) -> usize { self.payload.len() } fn from_head_and_payload( head: Head, payload: Bytes, ) -> Result<(Self, Extensions), FromPartsError> { let Head { subject, reply, headers, status, description, length, extensions, } = head; Ok(( Self { subject, reply, payload, headers, status, description, length, }, extensions, )) } fn into_head_and_payload(self) -> (Head, Bytes) { let Self { subject, reply, payload, headers, status, description, length, } = self; ( Head { subject, reply, headers, status, description, length, extensions: Extensions::new(), }, payload, ) } } impl MessageHead for async_nats::jetstream::Message { fn subject(&self) -> &Subject { &self.message.subject } fn reply(&self) -> Option<&Subject> { self.message.reply.as_ref() } fn headers(&self) -> Option<&HeaderMap> { self.message.headers.as_ref() } fn status(&self) -> Option<StatusCode> { self.message.status } fn description(&self) -> Option<&str> { self.description.as_deref() } fn length(&self) -> usize { self.message.length } fn payload_length(&self) -> usize { self.message.payload.len() } fn from_head_and_payload( head: Head, payload: Bytes, ) -> Result<(Self, Extensions), FromPartsError> { let Head { subject, reply, headers, status, description, length, mut extensions, } = head; let message = async_nats::Message { subject, reply, payload, headers, status, description, length, }; let context = extensions .remove::<async_nats::jetstream::Context>() .ok_or(FromPartsError( "jetstream context not found in message head extensions", ))?; Ok((Self { message, context }, extensions)) } fn into_head_and_payload(self) -> (Head, Bytes) { let Self { message, context } = self; let async_nats::Message { subject, reply, payload, headers, status, description, length, } = message; let mut extensions = Extensions::new(); extensions.insert(context); ( Head { subject, reply, headers, status, description, length, extensions, }, payload, ) } } #[derive(Clone, Debug)] pub struct Head { /// Subject to which message is published to. pub subject: Subject, /// Optional reply subject to which response can be published by a subscriber or consumer. pub reply: Option<Subject>, /// Optional headers. pub headers: Option<HeaderMap>, /// Optional Status of the message. Used mostly for internal handling. pub status: Option<StatusCode>, /// Optional status description. pub description: Option<String>, /// Length of message in bytes. pub length: usize, /// The message's extensions pub extensions: Extensions, } #[derive(Clone, Debug)] pub struct HeadRef<'a> { /// Subject to which message is published to. pub subject: &'a Subject, /// Optional reply subject to which response can be published by a subscriber or consumer. pub reply: Option<&'a Subject>, /// Optional headers. pub headers: Option<&'a HeaderMap>, /// Optional Status of the message. Used mostly for internal handling. pub status: Option<StatusCode>, /// Optional status description. pub description: Option<&'a str>, /// Length of message in bytes. pub length: usize, /// Length of message's payload in bytes. pub payload_length: usize, /// The message's extensions pub extensions: &'a Extensions, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server