Skip to main content
Glama
func_run.rs14.1 kB
use std::{ collections::HashSet, sync::Arc, time::Duration, }; use chrono::Utc; use si_events::{ Actor, ChangeSetId, ContentHash, FuncBackendKind, FuncBackendResponseType, FuncKind, FuncRun, FuncRunBuilder, FuncRunId, FuncRunState, Tenancy, UserPk, WorkspacePk, }; use si_layer_cache::{ LayerDb, db::serialize, }; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use crate::integration_test::{ make_test_layerdb_config, setup_compute_executor, setup_nats_client, setup_pg_db, }; type TestLayerDb = LayerDb<String, String, String, String, String, String, String>; #[tokio::test] async fn write_to_db() { let token = CancellationToken::new(); let (ldb, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), setup_pg_db("func_run_write_to_db").await, setup_nats_client(Some("func_run_write_to_db".to_string())).await, setup_compute_executor(), token, ) .await .expect("cannot create layerdb"); ldb.pg_migrate().await.expect("migrate layer db"); let (tenancy, actor) = ( Tenancy::new(WorkspacePk::new(), ChangeSetId::new()), Actor::User(UserPk::new()), ); let func_run = create_func_run(actor, tenancy, "joshua graham"); let key_str: Arc<str> = func_run.id().to_string().into(); let value: Arc<FuncRun> = Arc::new(func_run); ldb.func_run() .write(value.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); // Are we in memory? let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!(value.id(), in_memory.expect("func run not in memory").id()); // Are we in pg? let in_pg_postcard = ldb .func_run() .cache .pg() .get(&key_str) .await .expect("error getting data from pg") .expect("no cas object in pg"); let in_pg: FuncRun = serialize::from_bytes(&in_pg_postcard[..]).expect("cannot deserialize data"); assert_eq!(value.id(), in_pg.id()); } #[tokio::test] async fn update() { let token = CancellationToken::new(); let db = setup_pg_db("func_run_update_to_db").await; let (ldb, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db.clone(), setup_nats_client(Some("func_run_update_to_db".to_string())).await, setup_compute_executor(), token.clone(), ) .await .expect("cannot create layerdb"); ldb.pg_migrate().await.expect("migrate layer db"); let (ldb_remote, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), db, setup_nats_client(Some("func_run_update_to_db".to_string())).await, setup_compute_executor(), token, ) .await .expect("cannot create layerdb"); let (tenancy, actor) = ( Tenancy::new(WorkspacePk::new(), ChangeSetId::new()), Actor::User(UserPk::new()), ); let func_run = create_func_run(actor, tenancy, "lindsey buckingham"); let key_str: Arc<str> = func_run.id().to_string().into(); let value: Arc<FuncRun> = Arc::new(func_run); ldb.func_run() .write(value.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); // Are we in memory? let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!(value.id(), in_memory.expect("func run not in memory").id()); // Are we in pg? let in_pg_postcard = ldb .func_run() .cache .pg() .get(&key_str) .await .expect("error getting data from pg") .expect("no cas object in pg"); let in_pg: FuncRun = serialize::from_bytes(&in_pg_postcard[..]).expect("cannot deserialize data"); assert_eq!(value.id(), in_pg.id()); assert_eq!(value.state(), in_pg.state()); // Update the state to success let mut update_func_run_inner = Arc::unwrap_or_clone(value); update_func_run_inner.set_state(FuncRunState::Success); let update_func_run = Arc::new(update_func_run_inner); ldb.func_run() .write(update_func_run.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); // Are we in memory? let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!( update_func_run.state(), in_memory.expect("func run not in memory").state(), "updated in memory state" ); // Are we in pg? let in_pg_postcard = ldb .func_run() .cache .pg() .get(&key_str) .await .expect("error getting data from pg") .expect("no cas object in pg"); let in_pg: FuncRun = serialize::from_bytes(&in_pg_postcard[..]).expect("cannot deserialize data"); assert_eq!(update_func_run.state(), in_pg.state()); let max_check_count = 10; let mut memory_check_count = 0; while memory_check_count <= max_check_count { let in_memory = ldb_remote .func_run() .cache .cache() .get(key_str.clone()) .await; match in_memory { Some(value) => { assert_eq!(update_func_run.state(), value.state()); break; } None => { memory_check_count += 1; tokio::time::sleep_until(Instant::now() + Duration::from_millis(1)).await; } } } assert_ne!( max_check_count, memory_check_count, "value did not arrive in the remote memory cache within 10ms" ); } #[tokio::test] async fn write_and_read_many_for_workspace_id() { let token = CancellationToken::new(); let (ldb, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), setup_pg_db("func_run_write_and_read_many_for_workspace_id").await, setup_nats_client(Some( "fun_run_write_and_read_many_for_workspace_id".to_string(), )) .await, setup_compute_executor(), token, ) .await .expect("cannot create layerdb"); ldb.pg_migrate().await.expect("migrate ldb"); let (tenancy, actor) = ( Tenancy::new(WorkspacePk::new(), ChangeSetId::new()), Actor::User(UserPk::new()), ); let values = vec![ Arc::new(create_func_run(actor, tenancy, "dead money")), Arc::new(create_func_run(actor, tenancy, "honest hearts")), Arc::new(create_func_run(actor, tenancy, "old world blues")), Arc::new(create_func_run(actor, tenancy, "lonesome road")), ]; let expected: HashSet<FuncRunId> = HashSet::from_iter(values.iter().map(|v| v.id())); for value in values { ldb.func_run() .write(value.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); } let read_many_in_workspace_values = ldb .func_run() .read_many_for_workspace(tenancy.workspace_pk) .await .expect("error getting data from pg") .expect("should be able to read"); assert_eq!( expected, HashSet::from_iter(read_many_in_workspace_values.iter().map(|v| v.id())) ); } #[tokio::test] async fn read_many_for_workspace_paginated() { let token = CancellationToken::new(); let (ldb, _): (TestLayerDb, _) = LayerDb::from_services( make_test_layerdb_config(), setup_pg_db("fun_run_write_and_read_many_for_workspace_paginated").await, setup_nats_client(Some( "fun_run_write_and_read_many_for_workspace_paginated".to_string(), )) .await, setup_compute_executor(), token, ) .await .expect("cannot create layerdb"); ldb.pg_migrate().await.expect("migrate ldb"); let (tenancy, actor) = ( Tenancy::new(WorkspacePk::new(), ChangeSetId::new()), Actor::User(UserPk::new()), ); let values = vec![ Arc::new(create_func_run(actor, tenancy, "dead money")), Arc::new(create_func_run(actor, tenancy, "honest hearts")), Arc::new(create_func_run(actor, tenancy, "old world blues")), Arc::new(create_func_run(actor, tenancy, "lonesome road")), ]; // create func runs for value in values { ldb.func_run() .write(value.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; } let read_many_in_workspace_values = ldb .func_run() .read_many_for_workspace(tenancy.workspace_pk) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&read_many_in_workspace_values); // let's get the very first value (no cursor, limit 1) let first_value = ldb .func_run() .read_many_for_workspace_paginated(tenancy.workspace_pk, tenancy.change_set_id, 1, None) .await .expect("error getting data from pg") .expect("should be able to read") .pop() .expect("has one entry"); // the first returned should be the last func let cursor = first_value.id(); assert_eq!("lonesome road", first_value.function_name()); // now let's get the next 2 values (cursor is the first result's RunId and a limit of 2) let next_two_values = ldb .func_run() .read_many_for_workspace_paginated( tenancy.workspace_pk, tenancy.change_set_id, 2, Some(cursor), ) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&next_two_values); assert_eq!(2, next_two_values.len()); let next_two_expected: HashSet<FuncRunId> = HashSet::from_iter(next_two_values.iter().map(|v| v.id())); // now let's start at the same cursor (the first result returned) and fetch a limit of 10 (which should be all of the remaining funcs) let all_remaining = ldb .func_run() .read_many_for_workspace_paginated( tenancy.workspace_pk, tenancy.change_set_id, 10, Some(cursor), ) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&all_remaining); assert_eq!(3, all_remaining.len()); // Create some more runs - simulating a user who has a cursor ID but hasn't fetched all latest yet let values = vec![ Arc::new(create_func_run(actor, tenancy, "dead money part 2")), Arc::new(create_func_run(actor, tenancy, "honest hearts part 2")), Arc::new(create_func_run(actor, tenancy, "old world blues part 2")), Arc::new(create_func_run(actor, tenancy, "lonesome road part 2")), ]; // create func runs for value in values { ldb.func_run() .write(value.clone(), None, tenancy, actor) .await .expect("failed to write to layerdb"); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; } // repeat the same, get the next 2 from the previous cursor let next_two_values = ldb .func_run() .read_many_for_workspace_paginated( tenancy.workspace_pk, tenancy.change_set_id, 2, Some(cursor), ) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&next_two_values); assert_eq!(2, next_two_values.len()); assert_eq!( next_two_expected, HashSet::from_iter(next_two_values.iter().map(|v| v.id())) ); // ensure the results are the same when getting a page of funcs older than the last cursor let all_remaining = ldb .func_run() .read_many_for_workspace_paginated( tenancy.workspace_pk, tenancy.change_set_id, 10, Some(cursor), ) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&all_remaining); assert_eq!(3, all_remaining.len()); // But if I fetch without a cursor, I get the new funcs now! let first_value = ldb .func_run() .read_many_for_workspace_paginated(tenancy.workspace_pk, tenancy.change_set_id, 1, None) .await .expect("error getting data from pg") .expect("should be able to read") .pop() .expect("has one entry"); // the first returned should be the last func let cursor = first_value.id(); assert_eq!("lonesome road part 2", first_value.function_name()); // now fetch all remaining from this new cursor, which should yield all func runs let all_remaining = ldb .func_run() .read_many_for_workspace_paginated( tenancy.workspace_pk, tenancy.change_set_id, 10, Some(cursor), ) .await .expect("error getting data from pg") .expect("should be able to read"); dbg!(&all_remaining); assert_eq!(7, all_remaining.len()); } fn create_func_run(actor: Actor, tenancy: Tenancy, function_name: impl Into<String>) -> FuncRun { let func_run_create_time = Utc::now(); FuncRunBuilder::default() .actor(actor) .tenancy(tenancy) .component_id(None) .attribute_value_id(None) .backend_kind(FuncBackendKind::JsAction) .backend_response_type(FuncBackendResponseType::Action) .function_name(function_name.into()) .function_kind(FuncKind::Action) .function_args_cas_address(ContentHash::default()) .function_code_cas_address(ContentHash::default()) .created_at(func_run_create_time) .updated_at(func_run_create_time) .build() .expect("could not build func run") }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server