Skip to main content
Glama
activities.rs4.51 kB
use std::sync::Arc; use futures::StreamExt; use si_events::{ Actor, ChangeSetId, Tenancy, WorkspacePk, }; use si_layer_cache::{ LayerDb, activities::ActivityPayloadDiscriminants, event::LayeredEventMetadata, }; use tokio_util::sync::CancellationToken; use crate::integration_test::{ make_test_layerdb_config, setup_compute_executor, setup_nats_client, setup_pg_db, }; type TestLayerDb = LayerDb<Arc<String>, Arc<String>, String, String, String, String, String>; #[tokio::test] async fn activities() { let token = CancellationToken::new(); let db = setup_pg_db("activities").await; let compute_executor = setup_compute_executor(); // First, we need a layerdb for slash let (ldb_slash, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db.clone(), setup_nats_client(Some("activities".to_string())).await, compute_executor.clone(), token.clone(), ) .await .expect("cannot create layerdb"); ldb_slash.pg_migrate().await.expect("migrate layerdb"); // Then, we need a layerdb for axl let (ldb_axl, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db, setup_nats_client(Some("activities".to_string())).await, compute_executor, token.clone(), ) .await .expect("cannot create layerdb"); ldb_axl.pg_migrate().await.expect("migrate layerdb"); // Create our metadata let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); let actor = Actor::System; let metadata = LayeredEventMetadata::new(tenancy, actor); // Subscribe to all activities let mut activities = ldb_axl .activity() .subscribe_all() .await .expect("cannot subscribe to all activities"); // Publish an activity let activity = ldb_slash .activity() .test() .integration_test("drop me the bomb", metadata.clone(), None) .await .expect("cannot publish activity"); let restored_activity = activities .next() .await .expect("no message waiting when one was expected") .expect("error receiving message"); assert_eq!(activity, restored_activity); } #[tokio::test] async fn activities_subscribe_partial() { let token = CancellationToken::new(); let db = setup_pg_db("activities_subscribe_partial").await; let compute_executor = setup_compute_executor(); // First, we need a layerdb for slash let (ldb_slash, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db.clone(), setup_nats_client(Some("activities_subscribe_partial".to_string())).await, compute_executor.clone(), token.clone(), ) .await .expect("cannot create layerdb"); ldb_slash.pg_migrate().await.expect("migrate layerdb"); // Then, we need a layerdb for axl let (ldb_axl, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db, setup_nats_client(Some("activities_subscribe_partial".to_string())).await, compute_executor, token.clone(), ) .await .expect("cannot create layerdb"); // Subscribe to only rebase finished activities let mut activities = ldb_axl .activity() .subscribe(vec![ActivityPayloadDiscriminants::IntegrationTestAlt]) .await .expect("cannot subscribe to all activities"); let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new()); let actor = Actor::System; let metadata = LayeredEventMetadata::new(tenancy, actor); // Publish an activity ldb_slash .activity() .test() .integration_test("skid row", metadata.clone(), None) .await .expect("cannot publish activity"); // Publish an activity let activity = ldb_slash .activity() .test() .integration_test_alt("kix", metadata, None) .await .expect("cannot publish activity"); // The nats publishing rules would require that the first activity (the rebase request) be // recieved before the second (the rebase finished event). So we can confirm we have subject // filtering working. let restored_activity = activities .next() .await .expect("no message waiting") .expect("error receiving message"); assert_eq!(activity, restored_activity); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server