Skip to main content
Glama
ws_event.rs11 kB
use std::num::ParseIntError; use serde::{ Deserialize, Serialize, }; use si_data_nats::NatsError; use si_data_pg::PgError; use si_events::Actor; use si_frontend_types as frontend_types; use thiserror::Error; use ulid::Ulid; use crate::{ ChangeSetId, DalContext, FuncError, PropId, SchemaVariantError, SecretCreatedPayload, SecretUpdatedPayload, TransactionsError, WorkspacePk, approval_requirement::{ ApprovalRequirementDefinitionCreatedPayload, ApprovalRequirementDefinitionRemovedPayload, IndividualApproverPayload, }, audit_logging::AuditLogsPublishedPayload, change_set::event::{ ChangeSetActorPayload, ChangeSetAppliedPayload, ChangeSetCreatedPayload, ChangeSetRenamePayload, ChangeSetStateChangePayload, }, component::{ ComponentCreatedPayload, ComponentDeletedPayload, ComponentSetPositionPayload, ComponentUpdatedPayload, ComponentUpgradedPayload, ConnectionDeletedPayload, ConnectionUpsertedPayload, }, diagram::view::{ ViewComponentsUpdatePayload, ViewDeletedPayload, ViewObjectCreatedPayload, ViewObjectRemovedPayload, ViewWsPayload, }, func::{ FuncWsEventCodeSaved, FuncWsEventFuncSummary, FuncWsEventGenerating, FuncWsEventPayload, runner::FuncRunLogUpdatedPayload, }, management::prototype::{ ManagementFuncExecutedPayload, ManagementOperationsCompletePayload, ManagementOperationsFailedPayload, ManagementOperationsInProgressPayload, }, module::ModulesUpdatedPayload, pkg::{ ImportWorkspaceVotePayload, WorkspaceActorPayload, WorkspaceImportApprovalActorPayload, }, prompt_override::PromptUpdatedPayload, qualification::QualificationCheckPayload, schema::variant::{ SchemaVariantClonedPayload, SchemaVariantDeletedPayload, SchemaVariantReplacedPayload, SchemaVariantSavedPayload, SchemaVariantUpdatedPayload, }, secret::SecretDeletedPayload, status::StatusUpdate, user::{ CursorPayload, OnlinePayload, UserWorkspaceFlagsPayload, }, }; #[remain::sorted] #[derive(Error, Debug)] pub enum WsEventError { #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("nats txn error: {0}")] Nats(#[from] NatsError), #[error("no user in context")] NoUserInContext, #[error("no workspace in tenancy")] NoWorkspaceInTenancy, #[error("number parse string error: {0}")] ParseIntError(#[from] ParseIntError), #[error("pg error: {0}")] Pg(#[from] PgError), #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("workspace snapshot: {0}")] WorkspaceSnapshot(#[from] crate::WorkspaceSnapshotError), } pub type WsEventResult<T> = Result<T, WsEventError>; impl From<FuncError> for WsEventError { fn from(err: FuncError) -> Self { Box::new(err).into() } } #[remain::sorted] #[derive(Deserialize, Serialize, Debug, Clone, Eq, PartialEq)] #[serde(tag = "kind", content = "data")] #[allow(clippy::large_enum_variant)] pub enum WsPayload { ActionsListUpdated(ChangeSetId), ApprovalRequirementAddIndividualApprover(IndividualApproverPayload), ApprovalRequirementDefinitionCreated(ApprovalRequirementDefinitionCreatedPayload), ApprovalRequirementDefinitionRemoved(ApprovalRequirementDefinitionRemovedPayload), ApprovalRequirementRemoveIndividualApprover(IndividualApproverPayload), AsyncError(ErrorPayload), AsyncFinish(FinishPayload), AuditLogsPublished(AuditLogsPublishedPayload), ChangeSetAbandoned(ChangeSetActorPayload), ChangeSetApplied(ChangeSetAppliedPayload), ChangeSetApprovalStatusChanged(ChangeSetId), ChangeSetCanceled(ChangeSetId), ChangeSetCreated(ChangeSetCreatedPayload), ChangeSetRename(ChangeSetRenamePayload), ChangeSetStatusChanged(ChangeSetStateChangePayload), ChangeSetWritten(ChangeSetId), CheckedQualifications(QualificationCheckPayload), ComponentCreated(ComponentCreatedPayload), ComponentDeleted(ComponentDeletedPayload), ComponentUpdated(ComponentUpdatedPayload), ComponentUpgraded(ComponentUpgradedPayload), ConnectionDeleted(ConnectionDeletedPayload), ConnectionUpserted(ConnectionUpsertedPayload), Cursor(CursorPayload), FuncArgumentsSaved(FuncWsEventPayload), FuncCodeSaved(FuncWsEventCodeSaved), FuncCreated(FuncWsEventFuncSummary), FuncDeleted(FuncWsEventPayload), FuncGenerating(FuncWsEventGenerating), FuncRunLogUpdated(FuncRunLogUpdatedPayload), FuncSaved(FuncWsEventPayload), FuncUpdated(FuncWsEventFuncSummary), ImportWorkspaceVote(ImportWorkspaceVotePayload), ManagementFuncExecuted(ManagementFuncExecutedPayload), ManagementOperationsComplete(ManagementOperationsCompletePayload), ManagementOperationsFailed(ManagementOperationsFailedPayload), ManagementOperationsInProgress(ManagementOperationsInProgressPayload), ModuleImported(Vec<si_frontend_types::SchemaVariant>), ModulesUpdated(ModulesUpdatedPayload), Online(OnlinePayload), PromptUpdated(PromptUpdatedPayload), ResourceRefreshed(ComponentUpdatedPayload), SchemaVariantCloned(SchemaVariantClonedPayload), SchemaVariantCreated(frontend_types::SchemaVariant), SchemaVariantDeleted(SchemaVariantDeletedPayload), SchemaVariantReplaced(SchemaVariantReplacedPayload), SchemaVariantSaved(SchemaVariantSavedPayload), SchemaVariantUpdated(frontend_types::SchemaVariant), SchemaVariantUpdateFinished(SchemaVariantUpdatedPayload), SecretCreated(SecretCreatedPayload), SecretDeleted(SecretDeletedPayload), SecretUpdated(SecretUpdatedPayload), SetComponentPosition(ComponentSetPositionPayload), StatusUpdate(StatusUpdate), UserWorkspaceFlagsUpdated(UserWorkspaceFlagsPayload), ViewComponentsUpdate(ViewComponentsUpdatePayload), ViewCreated(ViewWsPayload), ViewDeleted(ViewDeletedPayload), ViewObjectCreated(ViewObjectCreatedPayload), ViewObjectRemoved(ViewObjectRemovedPayload), ViewUpdated(ViewWsPayload), WorkspaceImportBeginApprovalProcess(WorkspaceImportApprovalActorPayload), WorkspaceImportCancelApprovalProcess(WorkspaceActorPayload), } #[remain::sorted] #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, Copy, Hash)] #[serde(rename_all = "camelCase", tag = "kind", content = "id")] pub enum StatusValueKind { Attribute(PropId), CodeGen, Internal, Qualification, } #[derive(Deserialize, Serialize, Debug, Clone, Eq, PartialEq)] pub struct WsEvent { version: i64, workspace_pk: WorkspacePk, change_set_id: Option<ChangeSetId>, actor: Option<Actor>, request_ulid: Option<ulid::Ulid>, payload: WsPayload, } impl WsEvent { pub async fn new_raw( workspace_pk: WorkspacePk, change_set_id: Option<ChangeSetId>, actor: Option<Actor>, request_ulid: Option<ulid::Ulid>, payload: WsPayload, ) -> WsEventResult<Self> { Ok(WsEvent { version: 1, workspace_pk, change_set_id, actor, request_ulid, payload, }) } pub async fn new(ctx: &DalContext, payload: WsPayload) -> WsEventResult<Self> { let workspace_pk = match ctx.tenancy().workspace_pk_opt() { Some(pk) => pk, None => { return Err(WsEventError::NoWorkspaceInTenancy); } }; let change_set_pk = ctx.change_set_id(); Self::new_raw( workspace_pk, Some(change_set_pk), Some(ctx.events_actor()), ctx.request_ulid(), payload, ) .await } pub async fn new_for_workspace(ctx: &DalContext, payload: WsPayload) -> WsEventResult<Self> { let workspace_pk = match ctx.tenancy().workspace_pk_opt() { Some(pk) => pk, None => { return Err(WsEventError::NoWorkspaceInTenancy); } }; Self::new_raw( workspace_pk, None, Some(ctx.events_actor()), ctx.request_ulid(), payload, ) .await } pub fn workspace_pk(&self) -> WorkspacePk { self.workspace_pk } pub fn set_workspace_pk(&mut self, workspace_pk: WorkspacePk) { self.workspace_pk = workspace_pk; } pub fn set_change_set_id(&mut self, change_set_id: Option<ChangeSetId>) { self.change_set_id = change_set_id; } pub fn change_set_id(&self) -> Option<ChangeSetId> { self.change_set_id } fn workspace_subject(&self) -> String { format!("si.workspace_pk.{}.event", self.workspace_pk) } /// Publishes the [`event`](Self) to the [`NatsTxn`](si_data_nats::NatsTxn). When the /// transaction is committed, the [`event`](Self) will be published for external use. pub async fn publish_on_commit(&self, ctx: &DalContext) -> WsEventResult<()> { ctx.txns() .await? .nats() .publish(self.workspace_subject(), &self) .await?; Ok(()) } /// Publishes the [`event`](Self) immediately to the Nats stream, without /// waiting for the transactions to commit. Care should be taken to avoid /// sending data to the frontend, such as object ids, that will only be /// valid if the transaction commits successfully. pub async fn publish_immediately(&self, ctx: &DalContext) -> WsEventResult<()> { ctx.txns() .await? .nats() .publish_immediately(self.workspace_subject(), &self) .await?; Ok(()) } } #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct ErrorPayload { id: Ulid, error: String, } #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct FinishPayload { id: Ulid, } impl WsEvent { pub async fn async_error(ctx: &DalContext, id: Ulid, error: String) -> WsEventResult<Self> { WsEvent::new(ctx, WsPayload::AsyncError(ErrorPayload { id, error })).await } pub async fn async_finish(ctx: &DalContext, id: Ulid) -> WsEventResult<Self> { WsEvent::new(ctx, WsPayload::AsyncFinish(FinishPayload { id })).await } pub async fn async_finish_workspace(ctx: &DalContext, id: Ulid) -> WsEventResult<Self> { WsEvent::new_for_workspace(ctx, WsPayload::AsyncFinish(FinishPayload { id })).await } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server