Skip to main content
Glama
lib.rs14.2 kB
use cyclone_core::CycloneRequestable; pub use cyclone_core::{ ActionRunRequest, ActionRunResultSuccess, BeforeFunction, ComponentKind, ComponentView, ComponentViewWithGeometry, DebugRequest, DebugResultSuccess, FunctionResult, FunctionResultFailure, FunctionResultFailureErrorKind, KillExecutionRequest, ManagementFuncStatus, ManagementRequest, ManagementResultSuccess, OutputStream, ResolverFunctionComponent, ResolverFunctionRequest, ResolverFunctionResponseType, ResolverFunctionResultSuccess, ResourceStatus, SchemaVariantDefinitionRequest, SchemaVariantDefinitionResultSuccess, SensitiveContainer, ValidationRequest, ValidationResultSuccess, }; use futures::{ StreamExt, TryStreamExt, }; use nats_std::header; use nats_subscriber::{ Subscriber, SubscriberError, }; use serde::{ Serialize, de::DeserializeOwned, }; use si_data_nats::{ NatsClient, Subject, jetstream, }; use telemetry::prelude::*; use telemetry_nats::propagation; use thiserror::Error; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use veritech_core::{ FINAL_MESSAGE_HEADER_KEY, GetNatsSubjectFor, reply_mailbox_for_output, reply_mailbox_for_result, }; pub use veritech_core::{ VeritechValueEncryptError, encrypt_value_tree, }; #[remain::sorted] #[derive(Error, Debug)] pub enum ClientError { #[error("failed to serialize json message")] JSONSerialize(#[source] serde_json::Error), #[error("nats error")] Nats(#[from] si_data_nats::NatsError), #[error("no function result from cyclone; bug!")] NoResult, #[error("unable to publish message: {0:?}")] PublishingFailed(si_data_nats::Message), #[error("root connection closed")] RootConnectionClosed, #[error("subscriber error: {0}")] Subscriber(#[from] SubscriberError), #[error("tokio join error: {0}")] TokioJoinError(#[from] tokio::task::JoinError), #[error("transport error: {0}")] Transport(Box<dyn std::error::Error + Sync + Send + 'static>), } pub type ClientResult<T> = Result<T, ClientError>; /// This _private_ enum helps dictate what NATS technology should be used in communicating with veritech. enum RequestMode { /// Publish messages using core NATS to communicate with veritech. Core, /// Publish messages using NATS Jetstream to communicate with veritech. Jetstream, } #[derive(Clone, Debug)] pub struct Client { nats: NatsClient, context: jetstream::Context, } impl Client { pub fn new(nats: NatsClient) -> Self { let context = jetstream::new(nats.clone()); Self { nats, context } } fn nats_subject_prefix(&self) -> Option<&str> { self.nats.metadata().subject_prefix() } #[instrument( name = "veritech_client.execute_action_run", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_action_run( &self, output_tx: mpsc::Sender<OutputStream>, request: &ActionRunRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<ActionRunResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.execute_resolver_function", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_resolver_function( &self, output_tx: mpsc::Sender<OutputStream>, request: &ResolverFunctionRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<ResolverFunctionResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.execute_schema_variant_definition", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_schema_variant_definition( &self, output_tx: mpsc::Sender<OutputStream>, request: &SchemaVariantDefinitionRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<SchemaVariantDefinitionResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.execute_validation", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_validation( &self, output_tx: mpsc::Sender<OutputStream>, request: &ValidationRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<ValidationResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.execute_management", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_management( &self, output_tx: mpsc::Sender<OutputStream>, request: &ManagementRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<ManagementResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.execute_debug", level = "info", skip_all, fields( si.change_set.id = change_set_id, si.workspace.id = workspace_id, ), )] pub async fn execute_debug( &self, output_tx: mpsc::Sender<OutputStream>, request: &DebugRequest, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<DebugResultSuccess>> { self.execute_jetstream_request(output_tx, request, workspace_id, change_set_id) .await } #[instrument( name = "veritech_client.kill_execution", level = "info", skip_all, fields() )] pub async fn kill_execution( &self, request: &KillExecutionRequest, ) -> ClientResult<FunctionResult<()>> { self.execute_request( request.nats_subject(self.nats_subject_prefix(), None, None), None, request, RequestMode::Core, ) .await } async fn execute_jetstream_request<R>( &self, output_tx: mpsc::Sender<OutputStream>, request: &R, workspace_id: &str, change_set_id: &str, ) -> ClientResult<FunctionResult<R::Response>> where R: Serialize + CycloneRequestable + GetNatsSubjectFor, R::Response: DeserializeOwned, { self.execute_request( request.nats_subject( self.nats_subject_prefix(), Some(workspace_id), Some(change_set_id), ), Some(output_tx), request, RequestMode::Jetstream, ) .await } async fn execute_request<R: Serialize + CycloneRequestable<Response: DeserializeOwned>>( &self, subject: Subject, output_tx: Option<mpsc::Sender<OutputStream>>, request: &R, request_mode: RequestMode, ) -> ClientResult<FunctionResult<R::Response>> { // Subscribe to responses and send the request. These unsubscribe when dropped. let (root_subscriber, mut result_subscriber, output_subscriber) = self.send_request(subject, request, request_mode).await?; // Forward output messages in the background let cancel_output = CancellationToken::new(); if let Some(output_tx) = output_tx { let cancel_output = cancel_output.clone(); tokio::spawn(cancel_output.run_until_cancelled_owned(forward_output( output_subscriber, output_tx, Span::current(), ))); } tokio::select! { result = result_subscriber.try_next() => { let result = result?.ok_or(ClientError::NoResult)?; Span::current().follows_from(result.process_span); Ok(result.payload) } // Because the channel never responds on success, we have to await this simultaneously // with the result. If it *does* fail, we cancel the output forwarder as well. err = check_for_send_error(root_subscriber) => { cancel_output.cancel(); Err(err) } } } async fn send_request<R: Serialize + CycloneRequestable<Response: DeserializeOwned>>( &self, subject: Subject, request: &R, request_mode: RequestMode, ) -> ClientResult<( si_data_nats::Subscriber, Subscriber<FunctionResult<R::Response>>, Subscriber<OutputStream>, )> { let msg = serde_json::to_vec(request).map_err(ClientError::JSONSerialize)?; let reply_mailbox_root = self.nats.new_inbox(); // Subscribe first! So we're already listening when the messages come in. // Subscribe to the result let result_subscriber_subject = reply_mailbox_for_result(&reply_mailbox_root); trace!( messaging.destination = &result_subscriber_subject.as_str(), "subscribing for result messages" ); let result_subscriber = Subscriber::create(result_subscriber_subject) .final_message_header_key(FINAL_MESSAGE_HEADER_KEY) .start(&self.nats) .await?; // Construct a subscriber stream for output messages let output_subscriber_subject = reply_mailbox_for_output(&reply_mailbox_root); trace!( messaging.destination = &output_subscriber_subject.as_str(), "subscribing for output messages" ); let output_subscriber = Subscriber::create(output_subscriber_subject) .final_message_header_key(FINAL_MESSAGE_HEADER_KEY) .start(&self.nats) .await?; // Root reply mailbox will receive a reply if nobody is listening to the channel `subject` let root_subscriber = self.nats.subscribe(reply_mailbox_root.clone()).await?; // Submit the request message trace!( messaging.destination = &subject.as_str(), "publishing message" ); // NOTE(nick,fletcher): based on the provided request mode, we will either communicate user core nats or // jetstream. We neither like nor endorse this behavior. This method should probably be broken up in the // future to cleanly separate core nats and jetstream use. match request_mode { RequestMode::Core => { self.nats .publish_with_reply_and_headers( subject, reply_mailbox_root, propagation::empty_injected_headers(), msg.into(), ) .await? } RequestMode::Jetstream => { let mut headers = propagation::empty_injected_headers(); header::insert_reply_inbox(&mut headers, &reply_mailbox_root); self.context .publish_with_headers(subject, headers, msg.into()) .await // If `Err` then message failed to publish .map_err(|err| ClientError::Transport(Box::new(err)))? .await // If `Err` then NATS server failed to ack .map_err(|err| ClientError::Transport(Box::new(err)))?; } } Ok((root_subscriber, result_subscriber, output_subscriber)) } } async fn check_for_send_error(mut root_subscriber: si_data_nats::Subscriber) -> ClientError { // Return an error if no one is listening. match root_subscriber.next().await { Some(msg) => { propagation::associate_current_span_from_headers(msg.headers()); error!( subject = root_subscriber.subject(), msg = ?msg, "received an unexpected message or error on reply subject prefix" ); ClientError::PublishingFailed(msg) } None => { error!( subject = root_subscriber.subject(), "reply subject prefix subscriber unexpectedly closed" ); ClientError::RootConnectionClosed } } } async fn forward_output( mut output_subscriber: Subscriber<OutputStream>, output_tx: mpsc::Sender<OutputStream>, request_span: Span, ) { // Listen for messages while let Some(msg) = tokio::select! { next = output_subscriber.next() => next, // Exit early if the output channel is closed; no one cares about the output! _ = output_tx.closed() => { warn!("cancelling output forwarder: receiver closed early"); None } } { match msg { Ok(output) => { output.process_span.follows_from(&request_span); if let Err(err) = output_tx.send(output.payload).await { warn!(si.error.message = ?err, "output forwarder failed to send message on channel"); } } Err(err) => { warn!(si.error.message = ?err, "output forwarder received an error on its subscriber"); } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server