Skip to main content
Glama
lib.rs8.52 kB
#![warn( clippy::unwrap_in_result, clippy::unwrap_used, clippy::panic, clippy::missing_panics_doc, clippy::panic_in_result_fn )] #![allow( clippy::missing_errors_doc, clippy::module_inception, clippy::module_name_repetitions )] use cyclone_core::{ ActionRunRequest, DebugRequest, KillExecutionRequest, ManagementRequest, ResolverFunctionRequest, SchemaVariantDefinitionRequest, ValidationRequest, }; use si_data_nats::{ Subject, async_nats, jetstream, }; mod crypto; pub use crypto::{ VeritechValueDecryptError, VeritechValueEncryptError, decrypt_value_tree, encrypt_value_tree, }; const NATS_WORK_QUEUE_STREAM_NAME: &str = "VERITECH_REQUESTS"; const NATS_WORK_QUEUE_STREAM_SUBJECTS: &[&str] = &["veritech.requests.>"]; const NATS_ACTION_RUN_DEFAULT_SUBJECT_SUFFIX: &str = "actionrun"; const NATS_RESOLVER_FUNCTION_DEFAULT_SUBJECT_SUFFIX: &str = "resolverfunction"; const NATS_SCHEMA_VARIANT_DEFINITION_DEFAULT_SUBJECT_SUFFIX: &str = "schemavariantdefinition"; const NATS_VALIDATION_DEFAULT_SUBJECT_SUFFIX: &str = "validation"; const NATS_MANAGEMENT_DEFAULT_SUBJECT_SUFFIX: &str = "management"; const NATS_DEBUG_DEFAULT_SUBJECT_SUFFIX: &str = "debug"; const NATS_KILL_EXECUTION_DEFAULT_SUBJECT: &str = "veritech.meta.killexecution"; const INCOMING_SUBJECT: &str = "veritech.requests.*.*.*"; const SUBJECT_PREFIX: &str = "veritech.requests"; pub const FINAL_MESSAGE_HEADER_KEY: &str = "X-Final-Message"; // NOTE(nick,fletcher): we can probably take this type formalization a step further, but this is // essentially the "FuncRunId" from the "dal". pub type ExecutionId = String; pub async fn veritech_work_queue( context: &jetstream::Context, prefix: Option<&str>, ) -> Result<async_nats::jetstream::stream::Stream, async_nats::jetstream::context::CreateStreamError> { let subjects: Vec<_> = NATS_WORK_QUEUE_STREAM_SUBJECTS .iter() .map(|suffix| nats_std::subject::prefixed(prefix, suffix).to_string()) .collect(); let stream = context .get_or_create_stream(async_nats::jetstream::stream::Config { name: nats_std::jetstream::prefixed(prefix, NATS_WORK_QUEUE_STREAM_NAME), description: Some("Veritech work queue of requests".to_owned()), retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue, discard: async_nats::jetstream::stream::DiscardPolicy::New, allow_direct: true, subjects, ..Default::default() }) .await?; Ok(stream) } pub fn reply_mailbox_for_output(reply_mailbox: &str) -> String { format!("{reply_mailbox}.output") } pub fn reply_mailbox_for_result(reply_mailbox: &str) -> String { format!("{reply_mailbox}.result") } pub trait GetNatsSubjectFor { fn subject_suffix(&self) -> &str; fn nats_subject( &self, prefix: Option<&str>, workspace_id: Option<&str>, change_set_id: Option<&str>, ) -> Subject { let subject_with_workspace_and_change_set = format!( "{SUBJECT_PREFIX}.{}.{}.{}", workspace_id.unwrap_or("NONE"), change_set_id.unwrap_or("NONE"), self.subject_suffix() ); nats_std::subject::prefixed(prefix, subject_with_workspace_and_change_set) } } impl GetNatsSubjectFor for ActionRunRequest { fn subject_suffix(&self) -> &str { NATS_ACTION_RUN_DEFAULT_SUBJECT_SUFFIX } } impl GetNatsSubjectFor for KillExecutionRequest { fn subject_suffix(&self) -> &str { NATS_KILL_EXECUTION_DEFAULT_SUBJECT } fn nats_subject( &self, prefix: Option<&str>, _workspace_id: Option<&str>, _change_set_id: Option<&str>, ) -> Subject { nats_std::subject::prefixed(prefix, self.subject_suffix()) } } impl GetNatsSubjectFor for ManagementRequest { fn subject_suffix(&self) -> &str { NATS_MANAGEMENT_DEFAULT_SUBJECT_SUFFIX } } impl GetNatsSubjectFor for ResolverFunctionRequest { fn subject_suffix(&self) -> &str { NATS_RESOLVER_FUNCTION_DEFAULT_SUBJECT_SUFFIX } } impl GetNatsSubjectFor for SchemaVariantDefinitionRequest { fn subject_suffix(&self) -> &str { NATS_SCHEMA_VARIANT_DEFINITION_DEFAULT_SUBJECT_SUFFIX } } impl GetNatsSubjectFor for ValidationRequest { fn subject_suffix(&self) -> &str { NATS_VALIDATION_DEFAULT_SUBJECT_SUFFIX } } impl GetNatsSubjectFor for DebugRequest { fn subject_suffix(&self) -> &str { NATS_DEBUG_DEFAULT_SUBJECT_SUFFIX } } #[derive(Debug, Clone)] pub enum VeritechRequest { ActionRun(ActionRunRequest), KillExecution(KillExecutionRequest), Management(Box<ManagementRequest>), Resolver(ResolverFunctionRequest), // Resolvers are JsAttribute functions SchemaVariantDefinition(SchemaVariantDefinitionRequest), Validation(ValidationRequest), Debug(DebugRequest), } #[derive(Debug, thiserror::Error)] pub enum VeritechRequestError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("subject suffix {0} not a recognized veritech request type")] UnknownSubjectSuffix(String), } impl VeritechRequest { pub fn from_subject_and_payload( subject: &str, payload: &[u8], ) -> Result<Self, VeritechRequestError> { Ok(match subject { NATS_ACTION_RUN_DEFAULT_SUBJECT_SUFFIX => { Self::ActionRun(serde_json::from_slice(payload)?) } NATS_KILL_EXECUTION_DEFAULT_SUBJECT => { Self::KillExecution(serde_json::from_slice(payload)?) } NATS_MANAGEMENT_DEFAULT_SUBJECT_SUFFIX => { Self::Management(Box::new(serde_json::from_slice(payload)?)) } NATS_RESOLVER_FUNCTION_DEFAULT_SUBJECT_SUFFIX => { Self::Resolver(serde_json::from_slice(payload)?) } NATS_SCHEMA_VARIANT_DEFINITION_DEFAULT_SUBJECT_SUFFIX => { Self::SchemaVariantDefinition(serde_json::from_slice(payload)?) } NATS_VALIDATION_DEFAULT_SUBJECT_SUFFIX => { Self::Validation(serde_json::from_slice(payload)?) } NATS_DEBUG_DEFAULT_SUBJECT_SUFFIX => Self::Debug(serde_json::from_slice(payload)?), _ => { return Err(VeritechRequestError::UnknownSubjectSuffix( subject.to_string(), )); } }) } pub fn subject_suffix(&self) -> &str { match self { VeritechRequest::ActionRun(action_run_request) => action_run_request.subject_suffix(), VeritechRequest::KillExecution(kill_execution_request) => { kill_execution_request.subject_suffix() } VeritechRequest::Management(management_request) => management_request.subject_suffix(), VeritechRequest::Resolver(resolver_function_request) => { resolver_function_request.subject_suffix() } VeritechRequest::SchemaVariantDefinition(schema_variant_definition_request) => { schema_variant_definition_request.subject_suffix() } VeritechRequest::Validation(validation_request) => validation_request.subject_suffix(), VeritechRequest::Debug(debug_request) => debug_request.subject_suffix(), } } pub fn execution_id(&self) -> &str { match self { VeritechRequest::ActionRun(action_run_request) => &action_run_request.execution_id, VeritechRequest::KillExecution(kill_execution_request) => { &kill_execution_request.execution_id } VeritechRequest::Management(management_request) => &management_request.execution_id, VeritechRequest::Resolver(resolver_function_request) => { &resolver_function_request.execution_id } VeritechRequest::SchemaVariantDefinition(schema_variant_definition_request) => { &schema_variant_definition_request.execution_id } VeritechRequest::Validation(validation_request) => &validation_request.execution_id, VeritechRequest::Debug(debug_request) => &debug_request.execution_id, } } } #[inline] pub fn incoming_subject(prefix: Option<&str>) -> Subject { nats_std::subject::prefixed(prefix, INCOMING_SUBJECT) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server