Skip to main content
Glama
audit_logging.rs9.4 kB
use audit_database::AuditDatabaseContext; use audit_logs_stream::AuditLogsStream; use dal::{ AttributeValue, DalContext, Prop, Schema, SchemaVariant, audit_logging, prop::PropPath, }; use dal_test::{ helpers::{ ChangeSetTestHelpers, confirm_jetstream_stream_has_no_messages, create_named_component_for_schema_variant_on_default_view, list_audit_logs_until_expected_number_of_rows, }, test, }; use pending_events::PendingEventsStream; use pretty_assertions_sorted::assert_eq; use si_events::audit_log::AuditLogKind; const DATABASE_RETRY_TIMEOUT_SECONDS: u64 = 2; const DATABASE_RETRY_INTERVAL_MILLISECONDS: u64 = 100; const STREAM_RETRY_TIMEOUT_SECONDS: u64 = 5; const STREAM_RETRY_INTERVAL_MILLISECONDS: u64 = 100; const SIZE: usize = 200; #[test] async fn round_trip(ctx: &mut DalContext, audit_database_context: AuditDatabaseContext) { let context = audit_database_context; // Collect schema information. let schema = Schema::get_by_name(ctx, "swifty") .await .expect("schema not found by name"); let schema_variant_id = Schema::default_variant_id(ctx, schema.id()) .await .expect("could not get default schema variant id"); let schema_variant = SchemaVariant::get_by_id(ctx, schema_variant_id) .await .expect("could not get schema variant"); // Create a component and commit. Mimic sdf by audit logging here. let component_name = "nyj despair_club"; let component = create_named_component_for_schema_variant_on_default_view( ctx, component_name, schema_variant_id, ) .await .expect("could not create component"); let component_id = component.id(); ctx.write_audit_log( AuditLogKind::CreateComponent { name: component_name.to_string(), component_id, schema_variant_id, schema_variant_name: schema_variant.display_name().to_string(), }, component_name.to_string(), ) .await .expect("could not write audit log"); ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) .await .expect("could not commit and update snapshot to visibility"); // Collect the streams needed throughout the test. let (source_stream, destination_stream) = { let source_stream_wrapper = PendingEventsStream::get_or_create(ctx.jetstream_context()) .await .expect("could not get or create pending events stream"); let destination_stream_wrapper = AuditLogsStream::get_or_create(ctx.jetstream_context()) .await .expect("could not get or create audit logs stream"); let source_stream = source_stream_wrapper .stream() .await .expect("could not get inner stream"); let destination_stream = destination_stream_wrapper .stream() .await .expect("could not get inner destination stream"); (source_stream, destination_stream) }; // Check that everything looks as we expect. { let expected_total = 4; confirm_jetstream_stream_has_no_messages( &source_stream, STREAM_RETRY_TIMEOUT_SECONDS, STREAM_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("stream message count is greater than zero"); let destination_stream_message_count = destination_stream .get_info() .await .expect("could not get destination stream info") .state .messages; assert_eq!( expected_total, // expected destination_stream_message_count // actual ); list_audit_logs_until_expected_number_of_rows( ctx, &context, SIZE, expected_total as usize, DATABASE_RETRY_TIMEOUT_SECONDS, DATABASE_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("could not list audit logs"); // No timeouts or sleeps needed since the previous query will have passed. audit_logging::list_for_component(ctx, &context, component_id, SIZE, true) .await .expect("could not list component-specific audit logs"); } // Update a property editor value and commit. Mimic sdf by audit logging here. let prop_path_raw = ["root", "domain", "name"]; let prop = Prop::find_prop_by_path(ctx, schema_variant_id, &PropPath::new(prop_path_raw)) .await .expect("could not find prop by path"); let mut attribute_value_ids = component .attribute_values_for_prop(ctx, &prop_path_raw) .await .expect("could not get attribute values for prop"); let attribute_value_id = attribute_value_ids .pop() .expect("no attribute values found"); assert!(attribute_value_ids.is_empty()); let before_value = AttributeValue::get_by_id(ctx, attribute_value_id) .await .expect("could not get attribute value by id") .value(ctx) .await .expect("could not get value for attribute value"); let after_value = Some(serde_json::json!("pain.")); AttributeValue::update(ctx, attribute_value_id, after_value.to_owned()) .await .expect("could not update attribute value"); ctx.write_audit_log( AuditLogKind::UpdatePropertyEditorValue { component_id, component_name: component_name.to_string(), schema_variant_id, schema_variant_display_name: schema_variant.display_name().to_string(), prop_id: prop.id, prop_name: prop.name.to_owned(), attribute_value_id, before_value, after_value, }, component_name.to_string(), ) .await .expect("could not write audit log"); ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) .await .expect("could not commit and update snapshot to visibility"); // Check that everything looks as we expect. { let expected_total = 6; confirm_jetstream_stream_has_no_messages( &source_stream, STREAM_RETRY_TIMEOUT_SECONDS, STREAM_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("stream message count is greater than zero"); let destination_stream_message_count = destination_stream .get_info() .await .expect("could not get destination stream info") .state .messages; assert_eq!( expected_total, // expected destination_stream_message_count // actual ); list_audit_logs_until_expected_number_of_rows( ctx, &context, SIZE, expected_total as usize, DATABASE_RETRY_TIMEOUT_SECONDS, DATABASE_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("could not list audit logs"); // No timeouts or sleeps needed since the previous query will have passed. audit_logging::list_for_component(ctx, &context, component_id, SIZE, true) .await .expect("could not list component-specific audit logs"); } // Delete a component and commit. Mimic sdf by audit logging here. ctx.write_audit_log( AuditLogKind::DeleteComponent { name: component_name.to_string(), component_id, schema_variant_id, schema_variant_name: schema_variant.display_name().to_string(), }, component_name.to_string(), ) .await .expect("could not write audit log"); assert!( component .delete(ctx) .await .expect("unable to delete component") .is_none() ); ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) .await .expect("could not commit and update snapshot to visibility"); // Check that everything looks as we expect. { let expected_total = 7; confirm_jetstream_stream_has_no_messages( &source_stream, STREAM_RETRY_TIMEOUT_SECONDS, STREAM_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("stream message count is greater than zero"); let destination_stream_message_count = destination_stream .get_info() .await .expect("could not get destination stream info") .state .messages; assert_eq!( expected_total, // expected destination_stream_message_count // actual ); list_audit_logs_until_expected_number_of_rows( ctx, &context, SIZE, expected_total as usize, DATABASE_RETRY_TIMEOUT_SECONDS, DATABASE_RETRY_INTERVAL_MILLISECONDS, ) .await .expect("could not list audit logs"); // No timeouts or sleeps needed since the previous query will have passed. audit_logging::list_for_component(ctx, &context, component_id, SIZE, true) .await .expect("could not list component-specific audit logs"); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server