Skip to main content
Glama
lib.rs11.8 kB
use std::result; use futures::{ StreamExt as _, future::BoxFuture, }; use nats_std::header; pub use pinga_core::{ api_types, api_types::RequestId, }; use pinga_core::{ api_types::{ Container, ContentInfo, HeaderMapParseMessageInfoError, Negotiate, NegotiateError, SerializeContainer, SerializeError, job_execution_request::{ JobArgsVCurrent, JobExecutionRequest, JobExecutionRequestVCurrent, }, job_execution_response::JobExecutionResponse, }, nats, }; use si_data_nats::{ HeaderMap, Message, NatsClient, Subject, async_nats::{ self, jetstream::context::PublishError, }, jetstream::{ self, Context, }, }; use si_events::{ ActionId, AttributeValueId, ChangeSetId, ComponentId, DebugFuncJobStateId, ManagementPrototypeId, ViewId, WorkspacePk, ulid::CoreUlid, }; use telemetry::prelude::*; use telemetry_nats::propagation; use thiserror::Error; #[remain::sorted] #[derive(Debug, Error)] pub enum ClientError { #[error("error creating jetstream stream: {0}")] CreateStream(#[source] async_nats::jetstream::context::CreateStreamError), #[error("request publish error: {0}")] Publish(#[from] PublishError), #[error("error parsing reply headers: {0}")] ReplyHeadersParse(#[from] HeaderMapParseMessageInfoError), #[error("reply message is missing headers")] ReplyMissingHeaders, #[error("negotiate error deserializing reply: {0}")] ReplyNegotiate(#[from] NegotiateError), #[error("reply subscription closed before receiving reply message")] ReplySubscriptionClosed, #[error("error serializing request: {0}")] Serialize(#[from] SerializeError), #[error("reply subscribe error: {0}")] Subscribe(#[source] si_data_nats::Error), } type Error = ClientError; type Result<T> = result::Result<T, ClientError>; pub type PingaClient = Client; /// A client which can submit job execution requests to Pinga services. #[derive(Clone, Debug)] pub struct Client { nats: NatsClient, context: Context, } impl Client { /// Builds and returns a new [`Client`]. pub async fn new(nats: NatsClient) -> Result<Self> { let context = jetstream::new(nats.clone()); // Ensure that the stream is already created let _ = nats::pinga_work_queue(&context) .await .map_err(Error::CreateStream)?; Ok(Self { nats, context }) } /// Requests an action job execution and returns an awaitable response future. pub async fn await_action_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, action_id: ActionId, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { self.call_with_reply( workspace_id, change_set_id, JobArgsVCurrent::Action { action_id }, is_job_blocking, ) .await } /// Requests a dependent values update job execution and returns an awaitable response future. pub async fn await_dependent_values_update_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { self.call_with_reply( workspace_id, change_set_id, JobArgsVCurrent::DependentValuesUpdate, is_job_blocking, ) .await } /// Requests a validation job execution and returns an awaitable response future. pub async fn await_validation_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, attribute_value_ids: Vec<AttributeValueId>, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { self.call_with_reply( workspace_id, change_set_id, JobArgsVCurrent::Validation { attribute_value_ids, }, is_job_blocking, ) .await } /// Requests a management job execution and returns an awaitable response future. #[allow(clippy::too_many_arguments)] pub async fn await_management_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, component_id: ComponentId, prototype_id: ManagementPrototypeId, view_id: ViewId, request_ulid: Option<CoreUlid>, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { self.call_with_reply( workspace_id, change_set_id, JobArgsVCurrent::ManagementFunc { component_id, prototype_id, view_id, request_ulid, }, is_job_blocking, ) .await } /// Requests a debug func job execution and returns an awaitable response future pub async fn await_debug_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, debug_func_job_state_id: DebugFuncJobStateId, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { self.call_with_reply( workspace_id, change_set_id, JobArgsVCurrent::DebugFunc { debug_func_job_state_id, }, is_job_blocking, ) .await } /// Requests an action job execution and doesnt't wait for a response. pub async fn dispatch_action_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, action_id: ActionId, is_job_blocking: bool, ) -> Result<RequestId> { self.call_async( workspace_id, change_set_id, JobArgsVCurrent::Action { action_id }, is_job_blocking, None, ) .await } /// Requests a dependent values update job execution and doesnt't wait for a response. pub async fn dispatch_dependent_values_update_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, is_job_blocking: bool, ) -> Result<RequestId> { self.call_async( workspace_id, change_set_id, JobArgsVCurrent::DependentValuesUpdate, is_job_blocking, None, ) .await } /// Requests a validation job execution and doesnt't wait for a response. pub async fn dispatch_validation_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, attribute_value_ids: Vec<AttributeValueId>, is_job_blocking: bool, ) -> Result<RequestId> { self.call_async( workspace_id, change_set_id, JobArgsVCurrent::Validation { attribute_value_ids, }, is_job_blocking, None, ) .await } /// Requests a management job execution and doesnt't wait for a response. #[allow(clippy::too_many_arguments)] pub async fn dispatch_management_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, component_id: ComponentId, prototype_id: ManagementPrototypeId, view_id: ViewId, request_ulid: Option<CoreUlid>, is_job_blocking: bool, ) -> Result<RequestId> { self.call_async( workspace_id, change_set_id, JobArgsVCurrent::ManagementFunc { component_id, prototype_id, view_id, request_ulid, }, is_job_blocking, None, ) .await } /// Requests a debug func job execution and doesn't wait pub async fn dispatch_debug_job( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, debug_func_job_state_id: DebugFuncJobStateId, is_job_blocking: bool, ) -> Result<RequestId> { self.call_async( workspace_id, change_set_id, JobArgsVCurrent::DebugFunc { debug_func_job_state_id, }, is_job_blocking, None, ) .await } async fn call_async( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, args: JobArgsVCurrent, is_job_blocking: bool, maybe_reply_inbox: Option<&Subject>, ) -> Result<RequestId> { let id = RequestId::new(); let kind: &'static str = (&args).into(); let request = JobExecutionRequest::new(JobExecutionRequestVCurrent { id, workspace_id, change_set_id, args, is_job_blocking, }); // Cut down on the amount of `String` allocations dealing with ids let mut wid_buf = [0; WorkspacePk::ID_LEN]; let mut csid_buf = [0; ChangeSetId::ID_LEN]; let requests_subject = nats::subject::pinga_job( self.context.metadata().subject_prefix(), workspace_id.array_to_str(&mut wid_buf), change_set_id.array_to_str(&mut csid_buf), kind, ); let mut info = ContentInfo::from(&request); let (content_type, payload) = request.to_vec()?; info.content_type = content_type.into(); let mut headers = HeaderMap::new(); propagation::inject_headers(&mut headers); info.inject_into_headers(&mut headers); header::insert_nats_msg_id(&mut headers, id.to_string()); header::insert_maybe_reply_inbox(&mut headers, maybe_reply_inbox); self.context .publish_with_headers(requests_subject, headers, payload.into()) .await? .await?; Ok(id) } async fn call_with_reply( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, args: JobArgsVCurrent, is_job_blocking: bool, ) -> Result<(RequestId, BoxFuture<'static, Result<JobExecutionResponse>>)> { let reply_inbox: Subject = self.nats.new_inbox().into(); trace!( messaging.destination = &reply_inbox.as_str(), "subscribing for reply message" ); let mut subscription = self .nats .subscribe(reply_inbox.clone()) .await .map_err(Error::Subscribe)?; subscription .unsubscribe_after(1) .await .map_err(Error::Subscribe)?; let id = self .call_async( workspace_id, change_set_id, args, is_job_blocking, Some(&reply_inbox), ) .await?; let fut = Box::pin(async move { let reply = subscription .next() .await .ok_or(Error::ReplySubscriptionClosed)?; propagation::associate_current_span_from_headers(reply.headers()); response_from_reply(reply) }); Ok((id, fut)) } } fn response_from_reply<T>(message: Message) -> Result<T> where T: Negotiate, { let headers = message.headers().ok_or(Error::ReplyMissingHeaders)?; let content_info = ContentInfo::try_from(headers)?; T::negotiate(&content_info, message.payload()).map_err(Into::into) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server