Skip to main content
Glama
lib.rs8.46 kB
use std::result; use bytes::Bytes; use edda_core::{ api_types::{ Container, ContentInfo, RequestId, SerializeContainer, SerializeError, new_change_set_request::{ NewChangeSetRequest, NewChangeSetRequestVCurrent, }, rebuild_request::{ RebuildRequest, RebuildRequestVCurrent, }, update_request::{ UpdateRequest, UpdateRequestVCurrent, }, }, nats, }; use si_data_nats::{ HeaderMap, NatsClient, async_nats::{ self, jetstream::context::PublishError, }, header, jetstream::{ self, Context, }, }; use si_events::{ ChangeSetId, WorkspacePk, WorkspaceSnapshotAddress, change_batch::ChangeBatchAddress, }; use telemetry::prelude::*; use telemetry_nats::propagation; use thiserror::Error; #[remain::sorted] #[derive(Debug, Error)] pub enum ClientError { #[error("error creating jetstream stream: {0}")] CreateStream(#[source] async_nats::jetstream::context::CreateStreamError), #[error("request publish error: {0}")] Publish(#[from] PublishError), #[error("error serializing request: {0}")] Serialize(#[from] SerializeError), } type Error = ClientError; type Result<T> = result::Result<T, ClientError>; pub type EddaClient = Client; #[derive(Clone, Debug)] pub struct Client { context: Context, } impl Client { pub async fn new(nats: NatsClient) -> Result<Self> { let context = jetstream::new(nats); // Ensure that the streams are already created let _ = nats::edda_tasks_jetstream_stream(&context) .await .map_err(Error::CreateStream)?; let _ = nats::edda_requests_jetstream_stream(&context) .await .map_err(Error::CreateStream)?; Ok(Self { context }) } /// Asynchronously request an index update from a workspace past snapshot to the current /// snapshot and return a [`RequestId`]. #[instrument( name = "edda.client.update_from_workspace_snapshot" level = "info", skip_all, fields ( si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, edda.client.request.update.from_snapshot_address = ?from_snapshot_address, edda.client.request.update.to_snapshot_address = ?to_snapshot_address, edda.client.request.update.change_batch_address = ?change_batch_address ) )] pub async fn update_from_workspace_snapshot( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, from_snapshot_address: WorkspaceSnapshotAddress, to_snapshot_address: WorkspaceSnapshotAddress, change_batch_address: ChangeBatchAddress, ) -> Result<RequestId> { let id = RequestId::new(); let request = UpdateRequest::new(UpdateRequestVCurrent { id, from_snapshot_address, to_snapshot_address, change_batch_address, }); let mut info = ContentInfo::from(&request); let (content_type, payload) = request.to_vec()?; info.content_type = content_type.into(); self.publish_inner( Some(ChangeSetLocator::new(workspace_id, change_set_id)), id, payload.into(), info, ) .await } #[instrument( name = "edda.client.rebuild_for_change_set" level = "info", skip_all, fields ( si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, ) )] pub async fn rebuild_for_change_set( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, ) -> Result<RequestId> { let id = RequestId::new(); let request = RebuildRequest::new(RebuildRequestVCurrent { id }); let mut info = ContentInfo::from(&request); let (content_type, payload) = request.to_vec()?; info.content_type = content_type.into(); self.publish_inner( Some(ChangeSetLocator::new(workspace_id, change_set_id)), id, payload.into(), info, ) .await } #[instrument( name = "edda.client.new_change_set" level = "info", skip_all, fields ( si.workspace.id = %workspace_id, si.change_set.id = %new_change_set_id, ) )] pub async fn new_change_set( &self, workspace_id: WorkspacePk, new_change_set_id: ChangeSetId, base_change_set_id: ChangeSetId, to_snapshot_address: WorkspaceSnapshotAddress, ) -> Result<RequestId> { let id = RequestId::new(); let request = NewChangeSetRequest::new(NewChangeSetRequestVCurrent { id, base_change_set_id, new_change_set_id, to_snapshot_address, }); let mut info = ContentInfo::from(&request); let (content_type, payload) = request.to_vec()?; info.content_type = content_type.into(); self.publish_inner( Some(ChangeSetLocator::new(workspace_id, new_change_set_id)), id, payload.into(), info, ) .await } pub async fn rebuild_for_deployment(&self) -> Result<RequestId> { let id = RequestId::new(); let request = RebuildRequest::new(RebuildRequestVCurrent { id }); let mut info = ContentInfo::from(&request); let (content_type, payload) = request.to_vec()?; info.content_type = content_type.into(); self.publish_inner(None, id, payload.into(), info).await } async fn publish_inner( &self, changeset_data: Option<ChangeSetLocator>, id: RequestId, payload: Bytes, info: ContentInfo<'_>, ) -> Result<RequestId> { // Cut down on the number of `String` allocations dealing with ids let mut wid_buf = [0; WorkspacePk::ID_LEN]; let mut csid_buf = [0; ChangeSetId::ID_LEN]; let (requests_subject, tasks_subject) = if let Some(ChangeSetLocator { workspace_pk, change_set_id: changeset_id, }) = changeset_data { let wid_ref = workspace_pk.array_to_str(&mut wid_buf); let csid_ref = changeset_id.array_to_str(&mut csid_buf); ( nats::subject::request_for_change_set( self.context.metadata().subject_prefix(), wid_ref, csid_ref, ), nats::subject::process_task_for_change_set( self.context.metadata().subject_prefix(), wid_ref, csid_ref, ), ) } else { ( nats::subject::request_for_deployment(self.context.metadata().subject_prefix()), nats::subject::process_task_for_deployment( self.context.metadata().subject_prefix(), ), ) }; let mut headers = HeaderMap::new(); propagation::inject_headers(&mut headers); info.inject_into_headers(&mut headers); headers.insert(header::NATS_MESSAGE_ID, id.to_string()); self.context .publish_with_headers(requests_subject, headers, payload) .await? .await?; // There is one more optional future here which is confirmation from the NATS server that // our publish was acked. However, the task stream will drop new messages that are // duplicates and this returns an error on the "ack future". Instead, we'll keep this as // fire and forget. self.context.publish(tasks_subject, vec![].into()).await?; Ok(id) } // TODO(fnichol): add method to be called from SDF where we get either a NATS k/v watch or some // `impl Future` that resolves when the index is updated (still using a k/v watch under the // hood) } struct ChangeSetLocator { workspace_pk: WorkspacePk, change_set_id: ChangeSetId, } impl ChangeSetLocator { fn new(workspace_pk: WorkspacePk, change_set_id: ChangeSetId) -> Self { Self { workspace_pk, change_set_id, } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server