Skip to main content
Glama
lib.rs11 kB
//! This crate contains functionality for setting up and communicating with the audit database. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::str::FromStr; use chrono::{ DateTime, Utc, }; use serde::{ Deserialize, Serialize, }; use si_data_pg::{ PgError, PgPoolError, PgRow, }; use si_events::{ Actor, AuthenticationMethod, ChangeSetId, UserPk, WorkspacePk, audit_log::{ AuditLogKind, AuditLogMetadata, }, ulid, }; use telemetry::prelude::*; use thiserror::Error; mod config; mod context; mod migrate; pub use config::{ AuditDatabaseConfig, DBNAME, default_pg_pool_config, }; pub use context::{ AuditDatabaseContext, AuditDatabaseContextError, }; pub use migrate::{ AuditDatabaseMigrationError, migrate, }; #[allow(missing_docs)] #[remain::sorted] #[derive(Error, Debug)] pub enum AuditDatabaseError { #[error("chrono parse error: {0}")] ChronoParse(#[from] chrono::ParseError), #[error("pg error: {0}")] Pg(#[from] PgError), #[error("pg pool error: {0}")] PgPool(#[from] PgPoolError), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("ulid decode error: {0}")] UlidDecode(#[from] ulid::DecodeError), } type Result<T> = std::result::Result<T, AuditDatabaseError>; /// A row in the audit logs table of the audit database. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct AuditLogRow { /// Indicates the workspace that the row belongs to. pub workspace_id: WorkspacePk, /// The [kind](AuditLogKind) of the [`AuditLog`] (converted into a string because enum discriminants are not /// serializable). pub kind: String, /// The timestamp that can be used in ISO RFC 3339 format. pub timestamp: DateTime<Utc>, /// The title of the [`AuditLog`]. It will likely be combined with the `entity_type` to make a full display name. pub title: String, /// The identifier of the change set, which will only be empty for actions taken outside of the workspace. pub change_set_id: Option<ChangeSetId>, /// The identifier of the user. If this is empty, it is the system user. pub user_id: Option<UserPk>, /// The entity name. pub entity_name: Option<String>, /// The entity type. pub entity_type: Option<String>, /// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an /// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind). pub metadata: Option<serde_json::Value>, /// Serialized version of Actor Metadata as string pub authentication_method: AuthenticationMethod, } impl AuditLogRow { /// Inserts a new row into the audit logs table of the audit database. #[allow(clippy::too_many_arguments)] #[instrument( name = "audit_log.database.insert", level = "debug", skip_all, fields( si.workspace.id = %workspace_id, ), )] pub async fn insert( context: &AuditDatabaseContext, workspace_id: WorkspacePk, kind: AuditLogKind, timestamp: String, change_set_id: Option<ChangeSetId>, actor: Actor, entity_name: Option<String>, authentication_method: AuthenticationMethod, ) -> Result<()> { let kind_as_string = kind.to_string(); let user_id = match actor { Actor::System => None, Actor::User(user_id) => Some(user_id), }; let metadata = AuditLogMetadata::from(kind); let (title, entity_type) = metadata.title_and_entity_type(); let serialized_metadata = serde_json::to_value(metadata)?; let serialized_authentication_method = serde_json::to_value(authentication_method)?; let timestamp: DateTime<Utc> = timestamp.parse()?; context .pg_pool() .get() .await? .query_one( "INSERT INTO audit_logs ( workspace_id, kind, timestamp, title, change_set_id, user_id, entity_name, entity_type, metadata, authentication_method ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) RETURNING *", &[ &workspace_id.to_string(), &kind_as_string, &timestamp, &title, &change_set_id.map(|id| id.to_string()), &user_id.map(|id| id.to_string()), &entity_name, &entity_type, &serialized_metadata, &serialized_authentication_method, ], ) .await?; Ok(()) } /// Lists rows of the audit logs table in the audit database. #[instrument( name = "audit_log.database.list", level = "debug", skip_all, fields( si.workspace.id = %workspace_id, ), )] pub async fn list( context: &AuditDatabaseContext, workspace_id: WorkspacePk, change_set_ids: Vec<ChangeSetId>, size: usize, sort_ascending: bool, ) -> Result<(Vec<Self>, bool)> { let size = size as i64; let change_set_ids: Vec<String> = change_set_ids.iter().map(|id| id.to_string()).collect(); let client = context.pg_pool().get().await?; // Determine if we can load more. let row = client .query_one( "SELECT COUNT(*) from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2)", &[&workspace_id, &change_set_ids], ) .await?; let count: i64 = row.try_get("count")?; let can_load_more = count > size; // Perform the main query. let query = if sort_ascending { "SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) ORDER BY timestamp ASC LIMIT $3" } else { "SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) ORDER BY timestamp DESC LIMIT $3" }; let mut logs = Vec::new(); let rows = client .query(query, &[&workspace_id, &change_set_ids, &size]) .await?; for row in rows { logs.push(Self::try_from(row)?); } Ok((logs, can_load_more)) } /// Lists rows of the audit logs table filtered by component ID in the audit database. #[instrument( name = "audit_log.database.list_for_component", level = "debug", skip_all, fields( si.workspace.id = %workspace_id, si.component.id = %component_id ), )] pub async fn list_for_component( context: &AuditDatabaseContext, workspace_id: WorkspacePk, change_set_ids: Vec<ChangeSetId>, component_id: si_events::ComponentId, size: usize, sort_ascending: bool, ) -> Result<(Vec<Self>, bool)> { let size = size as i64; let change_set_ids: Vec<String> = change_set_ids.iter().map(|id| id.to_string()).collect(); let component_id_str = component_id.to_string(); let client = context.pg_pool().get().await?; // Determine if we can load more. let row = client .query_one( "SELECT COUNT(*) from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) AND (metadata->>'componentId' = $3 OR entity_name = $3)", &[&workspace_id, &change_set_ids, &component_id_str], ) .await?; let count: i64 = row.try_get("count")?; let can_load_more = count > size; // Perform the main query. let query = if sort_ascending { "SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) AND (metadata->>'componentId' = $3 OR entity_name = $3) ORDER BY timestamp ASC LIMIT $4" } else { "SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) AND (metadata->>'componentId' = $3 OR entity_name = $3) ORDER BY timestamp DESC LIMIT $4" }; let mut logs_for_component = Vec::new(); let rows = client .query( query, &[&workspace_id, &change_set_ids, &component_id_str, &size], ) .await?; for row in rows { logs_for_component.push(Self::try_from(row)?); } Ok((logs_for_component, can_load_more)) } } impl TryFrom<PgRow> for AuditLogRow { type Error = AuditDatabaseError; fn try_from(value: PgRow) -> std::result::Result<Self, Self::Error> { let workspace_id = { let inner: String = value.try_get("workspace_id")?; WorkspacePk::from_str(&inner)? }; let change_set_id = { let maybe_inner: Option<String> = value.try_get("change_set_id")?; match maybe_inner { Some(inner) => Some(ChangeSetId::from_str(&inner)?), None => None, } }; let user_id = { let maybe_inner: Option<String> = value.try_get("user_id")?; match maybe_inner { Some(inner) => Some(UserPk::from_str(&inner)?), None => None, } }; let authentication_method = { let inner: serde_json::Value = value.try_get("authentication_method")?; serde_json::from_value::<AuthenticationMethod>(inner)? }; Ok(Self { workspace_id, kind: value.try_get("kind")?, timestamp: value.try_get("timestamp")?, title: value.try_get("title")?, change_set_id, user_id, entity_name: value.try_get("entity_name")?, entity_type: value.try_get("entity_type")?, metadata: value.try_get("metadata")?, authentication_method, }) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server