Skip to main content
Glama

Convex MCP server

Official
by get-convex
query_cache.rs21.7 kB
use std::time::Duration; use common::{ bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, }, components::{ CanonicalizedComponentFunctionPath, ComponentPath, PublicFunctionPath, }, pause::PauseController, types::FunctionCaller, RequestId, }; use keybroker::{ testing::TestUserIdentity, Identity, UserIdentity, }; use must_let::must_let; use runtime::testing::TestRuntime; use serde_json::{ json, Value as JsonValue, }; use value::{ val, ConvexValue, }; use crate::{ test_helpers::ApplicationTestExt, Application, }; fn udf_path(path: &str) -> PublicFunctionPath { PublicFunctionPath::Component(CanonicalizedComponentFunctionPath { component: ComponentPath::test_user(), udf_path: path.parse().unwrap(), }) } async fn run_query_with_journal( application: &Application<TestRuntime>, path: &str, arg: JsonValue, identity: Identity, journal: Option<Option<String>>, expect_cached: bool, ) -> anyhow::Result<(ConvexValue, Option<String>)> { let ts = application.now_ts_for_reads(); let result = application .read_only_udf_at_ts( RequestId::new(), udf_path(path), vec![arg], identity, *ts, journal, FunctionCaller::Action { parent_scheduled_job: None, parent_execution_id: None, }, ) .await?; let (function_log, _) = application.function_log().stream(0.0).await; let last_log_entry = function_log.last().unwrap(); assert_eq!(last_log_entry.cached_result, expect_cached); Ok((result.result?.unpack(), result.journal)) } async fn run_query( application: &Application<TestRuntime>, path: &str, arg: JsonValue, identity: Identity, expect_cached: bool, ) -> anyhow::Result<ConvexValue> { Ok( run_query_with_journal(application, path, arg, identity, None, expect_cached) .await? .0, ) } async fn insert_object(application: &Application<TestRuntime>) -> anyhow::Result<ConvexValue> { let result = application .mutation_udf( RequestId::new(), PublicFunctionPath::Component(CanonicalizedComponentFunctionPath { component: ComponentPath::test_user(), udf_path: "basic:insertObject".parse()?, }), vec![json!({})], Identity::system(), None, FunctionCaller::Action { parent_scheduled_job: None, parent_execution_id: None, }, None, ) .await??; Ok(result.value.unpack()) } #[convex_macro::test_runtime] async fn test_query_cache(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let result1 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), false, ) .await?; rt.advance_time(Duration::from_secs(1)).await; // Same query, so it's cached. let result2 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), true, ) .await?; // The query gets the current time, but the result is cached so the results // should match. // It's a bit weird to be using Date.now() to test this, since we just want // to know that the query was cached. The purpose is to assert that the // function is not re-executing. If it were re-executing, the Date.now() // would be different. assert_eq!(result1, result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_data_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let result1 = run_query( &application, "basic:listAllObjects", json!({}), Identity::system(), false, ) .await?; insert_object(&application).await?; let result2 = run_query( &application, "basic:listAllObjects", json!({}), Identity::system(), false, ) .await?; assert_ne!(result1, result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_time_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let time_result1 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), false, ) .await?; let do_nothing_result1 = run_query( &application, "basic:doNothing", json!({}), Identity::system(), false, ) .await?; rt.advance_time(Duration::from_mins(20)).await; // Write a new object to bump timestamps. // It doesn't have to succeed; it'll still bump the timestamp. let _ = insert_object(&application).await; // After 20 minutes, the time query is not cached anymore. let time_result2 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), false, ) .await?; assert_ne!(time_result1, time_result2); // The doNothing query is still cached. let do_nothing_result2 = run_query( &application, "basic:doNothing", json!({}), Identity::system(), true, ) .await?; assert_eq!(do_nothing_result1, do_nothing_result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_precise_data_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let value1 = insert_object(&application).await?; must_let!(let ConvexValue::Object(obj1) = value1); must_let!(let ConvexValue::String(id1) = obj1.get("_id").unwrap()); let result1 = run_query( &application, "basic:getObject", json!({"id": **id1}), Identity::system(), false, ) .await?; // Inserting a new object doesn't invalidate the cache for the old object. insert_object(&application).await?; let result2 = run_query( &application, "basic:getObject", json!({"id": **id1}), Identity::system(), true, ) .await?; assert_eq!(result1, result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_arg_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let result1 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), false, ) .await?; rt.advance_time(Duration::from_secs(1)).await; let result2 = run_query( &application, "basic:readTimeMs", json!({"cacheBuster": 1}), Identity::system(), false, ) .await?; assert_ne!(result1, result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_auth_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Run query as first user let result1 = run_query( &application, "auth:getIdentifier", json!({}), Identity::user(UserIdentity::test()), false, ) .await?; // Run query as system let result2 = run_query( &application, "auth:getIdentifier", json!({}), Identity::system(), false, ) .await?; // Results should be different since they're from different auth assert_ne!(result1, result2); // Run query as first user again let result3 = run_query( &application, "auth:getIdentifier", json!({}), Identity::user(UserIdentity::test()), true, ) .await?; // Results should be the same since it's the same user assert_eq!(result1, result3); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_error_not_cached(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Run query that throws an error first time let error1 = run_query( &application, "custom_errors:queryThrows", json!({}), Identity::system(), false, ) .await; assert!(error1.is_err(), "Expected first query to throw an error"); // Run same query again - should throw error again and not be cached let error2 = run_query( &application, "custom_errors:queryThrows", json!({}), Identity::system(), false, ) .await; assert!(error2.is_err(), "Expected second query to throw an error"); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_without_checking_auth(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let result1 = run_query( &application, "basic:readTimeMs", json!({}), Identity::user(UserIdentity::test()), false, ) .await?; rt.advance_time(Duration::from_secs(1)).await; // The query doesn't read ctx.auth, so it's cached across identities. let result2 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), true, ) .await?; // Result is the same because the query doesn't re-execute and get a new // Date.now(). assert_eq!(result1, result2); Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_unauthed_race( rt: TestRuntime, pause_controller: PauseController, ) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Run the same query as different users, in parallel. // In this case we don't know that the query doesn't check auth, so // neither request waits for the other. // To make sure they run in parallel, run each query up until they try // to run a function, which is after they have checked the cache and decided // that it's a cache miss. let mut first_query = Box::pin(run_query( &application, "basic:listAllObjects", json!({}), Identity::user(UserIdentity::test()), false, )); let first_hold_guard = pause_controller.hold("run_function"); let first_pause_guard = tokio::select! { _ = &mut first_query => { panic!("First query completed before pause"); } pause_guard = first_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; let second_hold_guard = pause_controller.hold("run_function"); let mut second_query = Box::pin(run_query( &application, "basic:listAllObjects", json!({}), Identity::system(), false, )); let second_pause_guard = tokio::select! { _ = &mut second_query => { panic!("Second query completed before pause"); } pause_guard = second_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; first_pause_guard.unpause(); first_query.await?; second_pause_guard.unpause(); second_query.await?; // Insert an object to invalidate the cache. // Then run both queries again in parallel. // In this case we can guess that the query doesn't check auth, so // the second request should wait for the first and use the cached value. insert_object(&application).await?; // Rerun queries in parallel let mut first_query = Box::pin(run_query( &application, "basic:listAllObjects", json!({}), Identity::user(UserIdentity::test()), false, )); let first_hold_guard = pause_controller.hold("run_function"); let first_pause_guard = tokio::select! { _ = &mut first_query => { panic!("First query completed before pause"); } pause_guard = first_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; let mut second_query = Box::pin(run_query( &application, "basic:listAllObjects", json!({}), Identity::system(), true, // cache hit )); // The second one never gets to run_function, so use perform_cache_op instead // to pause it when it's waiting for the first query to finish. let second_hold_guard = pause_controller.hold("perform_cache_op"); let second_pause_guard = tokio::select! { _ = &mut second_query => { panic!("Second query completed before pause"); } pause_guard = second_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; first_pause_guard.unpause(); first_query.await?; second_pause_guard.unpause(); second_query.await?; Ok(()) } async fn test_query_cache_with_conditional_auth_check_inner( rt: TestRuntime, subquery: bool, ) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let query = if subquery { "auth:conditionallyCheckAuthInSubquery" } else { "auth:conditionallyCheckAuth" }; // First run query that doesn't check auth let result1 = run_query( &application, query, json!({}), Identity::user(UserIdentity::test()), false, ) .await?; rt.advance_time(Duration::from_secs(1)).await; // The query is cached across identities. let result2 = run_query(&application, query, json!({}), Identity::system(), true).await?; assert_eq!(result1, result2); insert_object(&application).await?; // Now that there's an object, the query checks auth. let result3 = run_query( &application, query, json!({}), Identity::user(UserIdentity::test()), false, ) .await?; let result4 = run_query(&application, query, json!({}), Identity::system(), false).await?; assert_ne!(result1, result3); assert_ne!(result1, result4); assert_ne!(result3, result4); // different auth Ok(()) } #[convex_macro::test_runtime] async fn test_query_cache_with_conditional_auth_check(rt: TestRuntime) -> anyhow::Result<()> { test_query_cache_with_conditional_auth_check_inner(rt, false).await } #[convex_macro::test_runtime] async fn test_query_cache_with_conditional_auth_check_in_subquery( rt: TestRuntime, ) -> anyhow::Result<()> { test_query_cache_with_conditional_auth_check_inner(rt, true).await } #[convex_macro::test_runtime] async fn test_query_cache_conditional_auth_check_race( rt: TestRuntime, pause_controller: PauseController, ) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Run query as one user. run_query( &application, "auth:conditionallyCheckAuth", json!({}), Identity::user(UserIdentity::test()), false, ) .await?; // Insert an object to make the query check auth. // Then run two queries in parallel, for different users. // We guess that the queries don't check auth, so one waits for the other. // But the queries actually do check auth, so the second one re-executes // after waiting. insert_object(&application).await?; // Run queries in parallel let mut first_query = Box::pin(run_query( &application, "auth:conditionallyCheckAuth", json!({}), Identity::user(UserIdentity::test()), false, )); let first_hold_guard = pause_controller.hold("run_function"); let first_pause_guard = tokio::select! { _ = &mut first_query => { panic!("First query completed before pause"); } pause_guard = first_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; let mut second_query = Box::pin(run_query( &application, "auth:conditionallyCheckAuth", json!({}), Identity::system(), false, // cache miss )); // Pause the second query when it's waiting for the first query to finish. let second_hold_guard = pause_controller.hold("perform_cache_op"); let second_pause_guard = tokio::select! { _ = &mut second_query => { panic!("Second query completed before pause"); } pause_guard = second_hold_guard.wait_for_blocked() => { pause_guard.unwrap() } }; first_pause_guard.unpause(); let result1 = first_query.await?; assert_eq!(result1, val!("https://testauth.fake.domain|testauth|123")); second_pause_guard.unpause(); let result2 = second_query.await?; assert_eq!(result2, val!("No user")); Ok(()) } #[convex_macro::test_runtime] async fn test_cache_paginated_query(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; application .add_index(IndexMetadata::new_enabled( "test.by_hello".parse()?, IndexedFields::try_from(vec!["hello".parse()?])?, )) .await?; for i in 1..=5 { assert!(application .mutation_udf( RequestId::new(), udf_path("query:insert"), vec![json!({"number": i * 10})], Identity::system(), None, FunctionCaller::Test, None ) .await? .is_ok()); } let (page1_1, journal1_1) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": null } }), Identity::system(), None, false, ) .await?; assert!(journal1_1.is_some()); assert_eq!(page1_1["page"][0]["hello"], ConvexValue::Float64(10.0)); assert_eq!(page1_1["page"][1]["hello"], ConvexValue::Float64(20.0)); // Rerunning the query as-is should yield a cache hit let (page1_2, journal1_2) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": null } }), Identity::system(), None, true, /* expect_cached */ ) .await?; assert_eq!(page1_1, page1_2); // TODO: consider making journal encoding deterministic, // since these journals should be the same. assert_ne!(journal1_1, journal1_2); // Rerunning the query, but passing in the journal from the first run, // should _also_ result in a cache hit let (page1_3, journal1_3) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": null } }), Identity::system(), Some(journal1_1.clone()), true, /* expect_cached */ ) .await?; assert_eq!(page1_1, page1_3); // TODO: consider making journal encoding deterministic, // since these journals should be the same. assert_ne!(journal1_1, journal1_3); // Load a second page. let (page2_1, journal2_1) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": page1_1["continueCursor"].to_internal_json() } }), Identity::system(), None, false, ) .await?; assert!(journal2_1.is_some()); assert_eq!(page2_1["page"][0]["hello"], ConvexValue::Float64(30.0)); assert_eq!(page2_1["page"][1]["hello"], ConvexValue::Float64(40.0)); // Insert an item into the first page. assert!(application .mutation_udf( RequestId::new(), udf_path("query:insert"), vec![json!({"number": 15})], Identity::system(), None, FunctionCaller::Test, None ) .await? .is_ok()); // Re-query the first page. let (page1_4, journal1_4) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": null } }), Identity::system(), Some(journal1_1.clone()), false, ) .await?; assert!(journal1_4.is_some()); assert_eq!(page1_4["page"][0]["hello"], ConvexValue::Float64(10.0)); assert_eq!(page1_4["page"][1]["hello"], ConvexValue::Float64(15.0)); // new! assert_eq!(page1_4["page"][2]["hello"], ConvexValue::Float64(20.0)); // The second page should still be cached. let (page2_2, journal2_2) = run_query_with_journal( &application, "query:paginateIndex", json!({ "paginationOpts": { "numItems": 2, "cursor": page1_1["continueCursor"].to_internal_json() } }), Identity::system(), None, true /* expect_cached */, ) .await?; assert!(journal2_2.is_some()); assert_eq!(page2_1, page2_2); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server