Skip to main content
Glama
lib.rs4.78 kB
use std::{ result, str::Utf8Error, }; use bytes::Bytes; use si_data_nats::{ NatsClient, Subject, async_nats::{ self, jetstream::{ consumer::StreamError, context::{ KeyValueErrorKind, RequestError, }, kv::{ self, }, stream::ConsumerError, }, }, jetstream, }; use si_id::{ ChangeSetId, WorkspacePk, }; use strum::AsRefStr; use thiserror::Error; mod deployment; mod kv_history; mod workspace; const NATS_KV_BUCKET_NAME: &str = "FRIGG"; #[remain::sorted] #[derive(Debug, Error)] pub enum Error { #[error("consumer error: {0}")] Consumer(#[from] ConsumerError), #[error("create error: {0}")] Create(#[from] kv::CreateError), #[error("error creating kv store: {0}")] CreateKeyValue(#[from] async_nats::jetstream::context::CreateKeyValueError), #[error("error deserializing kv value: {0}")] Deserialize(#[source] serde_json::Error), #[error("entry error when getting change set index")] EntryGetChangeSetIndex(#[source] kv::EntryError), #[error("entry error when getting deployment index")] EntryGetDeploymentIndex(#[source] kv::EntryError), #[error("entry error when performing get object raw bytes: {0}")] EntryGetObjectRawBytes(#[source] kv::EntryError), #[error("error getting kv store: {0}")] GetKeyValue(#[from] async_nats::jetstream::context::KeyValueError), #[error("index object not found at key: {0}")] IndexObjectNotFound(Subject), #[error("nats request error: {0}")] NatsRequest(#[from] RequestError), #[error("object kind was expected to be 'MvIndex' but was '{0}'")] NotIndexKind(String), #[error( "object listed in changeset index not found: workspace: {workspace_id}, change set: {change_set_id}, kind: {kind}, id: {id}" )] ObjectNotFoundForChangesetIndex { workspace_id: WorkspacePk, change_set_id: ChangeSetId, kind: String, id: String, }, #[error("object listed in deployment index not found: kind: {kind}, id: {id}")] ObjectNotFoundForDeploymentIndex { kind: String, id: String }, #[error("put error: {0}")] Put(#[from] kv::PutError), #[error("error serializing kv value: {0}")] Serialize(#[source] serde_json::Error), #[error("stream error: {0}")] Stream(#[from] StreamError), #[error("update error: {0}")] Update(#[from] kv::UpdateError), #[error("utf8 encoding error: {0}")] Utf8(#[from] Utf8Error), #[error("kv watch error: {0}")] Watch(#[from] async_nats::jetstream::kv::WatchError), } pub type FriggError = Error; type Result<T> = result::Result<T, Error>; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct KvRevision(u64); impl From<u64> for KvRevision { fn from(value: u64) -> Self { Self(value) } } #[remain::sorted] #[derive(AsRefStr, Debug, PartialEq)] #[strum(serialize_all = "snake_case")] enum Domain { Index, Object, } #[remain::sorted] #[derive(AsRefStr, Debug, PartialEq)] #[strum(serialize_all = "snake_case")] enum Scope { ChangeSet, Deployment, Workspace, } #[derive(Clone, Debug)] pub struct FriggStore { nats: NatsClient, store: kv::Store, } impl FriggStore { pub fn new(nats: NatsClient, store: kv::Store) -> Self { Self { nats, store } } async fn get_object_raw_bytes(&self, key: &Subject) -> Result<Option<(Bytes, KvRevision)>> { let Some(entry) = self .store .entry(key.to_string()) .await .map_err(Error::EntryGetObjectRawBytes)? else { return Ok(None); }; match entry.operation { kv::Operation::Delete | kv::Operation::Purge => Ok(None), kv::Operation::Put => Ok(Some((entry.value, entry.revision.into()))), } } } pub async fn frigg_kv(context: &jetstream::Context, prefix: Option<&str>) -> Result<kv::Store> { let bucket = nats_std::jetstream::prefixed(prefix, NATS_KV_BUCKET_NAME); let kv = match context.get_key_value(bucket.clone()).await { Ok(kv) => kv, Err(err) => match err.kind() { KeyValueErrorKind::GetBucket | KeyValueErrorKind::JetStream => { create_kv(context, bucket).await? } _ => return Err(err.into()), }, }; Ok(kv) } async fn create_kv(context: &jetstream::Context, bucket: String) -> Result<kv::Store> { context .create_key_value(kv::Config { bucket, description: "Frigg store data".to_owned(), ..Default::default() }) .await .map_err(Into::into) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server