Skip to main content
Glama

Convex MCP server

Official
by get-convex
cron_jobs.rs8.15 kB
use std::{ collections::BTreeMap, str::FromStr, time::Duration, }; use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentId, ComponentPath, }, query::{ IndexRange, IndexRangeExpression, Order, Query, }, runtime::Runtime, }; use database::{ query::TableFilter, DeveloperQuery, TableModel, Transaction, }; use keybroker::Identity; use model::{ backend_state::{ types::BackendState, BackendStateModel, }, cron_jobs::{ types::{ CronIdentifier, CronJob, CronSchedule, CronSpec, }, CronModel, CRON_JOB_LOGS_INDEX_BY_NAME_TS, CRON_JOB_LOGS_NAME_FIELD, }, }; use runtime::testing::TestRuntime; use serde_json::Value as JsonValue; use udf::helpers::parse_udf_args; use crate::{ test_helpers::{ ApplicationTestExt, OBJECTS_TABLE, OBJECTS_TABLE_COMPONENT, }, Application, }; fn test_cron_identifier() -> CronIdentifier { CronIdentifier::from_str("test").unwrap() } async fn create_cron_job( tx: &mut Transaction<TestRuntime>, ) -> anyhow::Result<( BTreeMap<CronIdentifier, CronJob>, CronModel<'_, TestRuntime>, )> { let mut cron_model = CronModel::new(tx, ComponentId::test_user()); let mut map = serde_json::Map::new(); map.insert( "key".to_string(), serde_json::Value::String("value".to_string()), ); let path = CanonicalizedComponentFunctionPath { component: ComponentPath::test_user(), udf_path: "basic:insertObject".parse()?, }; let cron_spec = CronSpec { udf_path: path.udf_path.clone(), udf_args: parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?, cron_schedule: CronSchedule::Interval { seconds: 60 }, }; let original_jobs = cron_model.list().await?; let name = test_cron_identifier(); cron_model.create(name, cron_spec).await?; Ok((original_jobs, cron_model)) } fn cron_log_query<RT: Runtime>( tx: &mut Transaction<RT>, component: ComponentId, ) -> anyhow::Result<DeveloperQuery<RT>> { DeveloperQuery::new( tx, component.into(), Query::index_range(IndexRange { index_name: CRON_JOB_LOGS_INDEX_BY_NAME_TS.name(), range: vec![IndexRangeExpression::Eq( CRON_JOB_LOGS_NAME_FIELD.clone(), common::types::MaybeValue(Some(test_cron_identifier().to_string().try_into()?)), )], order: Order::Asc, }), TableFilter::IncludePrivateSystemTables, ) } #[convex_macro::test_runtime] pub(crate) async fn test_cron_jobs_success(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // udf-tests include crons, so we let them execute so that we can then add // a new cron without hitting an OCC. rt.wait(Duration::from_secs(100)).await; let mut tx = application.begin(Identity::system()).await?; let (original_jobs, mut cron_model) = create_cron_job(&mut tx).await?; let jobs = cron_model.list().await?; assert_eq!(jobs.len(), original_jobs.len() + 1); let mut table_model = TableModel::new(&mut tx); assert!( table_model .table_is_empty(OBJECTS_TABLE_COMPONENT.into(), &OBJECTS_TABLE) .await? ); application.commit_test(tx).await?; // Cron jobs executor within application will pick up the job and // execute it. Add some wait time to make this less racy. rt.wait(Duration::from_secs(100)).await; let mut tx = application.begin(Identity::system()).await?; let mut table_model = TableModel::new(&mut tx); assert!( !table_model .table_is_empty(OBJECTS_TABLE_COMPONENT.into(), &OBJECTS_TABLE) .await? ); let mut logs_query = cron_log_query(&mut tx, OBJECTS_TABLE_COMPONENT)?; assert!(logs_query.next(&mut tx, None).await?.is_some()); Ok(()) } #[convex_macro::test_runtime] pub(crate) async fn test_cron_jobs_race_condition(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // udf-tests include crons, so we let them execute so that we can then add // a new cron without hitting an OCC. rt.wait(Duration::from_secs(100)).await; let mut tx = application.begin(Identity::system()).await?; let (original_jobs, mut model) = create_cron_job(&mut tx).await?; let jobs = model.list().await?; assert_eq!(jobs.len(), original_jobs.len() + 1); let job = jobs.get(&test_cron_identifier()).unwrap(); // Delete the cron job let job_metadata = model .list_metadata() .await? .remove(&test_cron_identifier()) .unwrap(); model.delete(job_metadata).await?; let jobs = model.list().await?; assert_eq!(jobs.len(), original_jobs.len()); application.commit_test(tx).await?; // This simulates the race condition where the job executor picks up a cron // to execute after the cron was created but before it was deleted. We should // handle the race condition gracefully instead of trying to run the stale cron. application .test_one_off_cron_job_executor_run(job.clone()) .await?; Ok(()) } #[convex_macro::test_runtime] async fn test_paused_cron_jobs(rt: TestRuntime) -> anyhow::Result<()> { test_cron_jobs_helper(rt, BackendState::Paused).await?; Ok(()) } #[convex_macro::test_runtime] async fn test_disable_cron_jobs(rt: TestRuntime) -> anyhow::Result<()> { test_cron_jobs_helper(rt, BackendState::Disabled).await?; Ok(()) } async fn test_cron_jobs_helper(rt: TestRuntime, backend_state: BackendState) -> anyhow::Result<()> { // Helper for testing behavior for pausing or disabling backends let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Change backend state let mut tx = application.begin(Identity::system()).await?; let mut backend_state_model = BackendStateModel::new(&mut tx); backend_state_model .toggle_backend_state(backend_state) .await?; application.commit_test(tx).await?; let mut tx = application.begin(Identity::system()).await?; let (original_jobs, mut cron_model) = create_cron_job(&mut tx).await?; let jobs = cron_model.list().await?; assert_eq!(jobs.len(), original_jobs.len() + 1); let mut table_model = TableModel::new(&mut tx); assert!( table_model .table_is_empty(OBJECTS_TABLE_COMPONENT.into(), &OBJECTS_TABLE) .await? ); application.commit_test(tx).await?; // Cron jobs executor within application will pick up the job and // execute it. Add some wait time to make this less racy. Job should not execute // because the backend is paused. rt.wait(Duration::from_secs(100)).await; let mut tx = application.begin(Identity::system()).await?; let mut table_model = TableModel::new(&mut tx); assert!( table_model .table_is_empty(OBJECTS_TABLE_COMPONENT.into(), &OBJECTS_TABLE) .await? ); let mut logs_query = cron_log_query(&mut tx, ComponentId::test_user())?; assert!(logs_query.next(&mut tx, Some(1)).await?.is_none()); // Resuming the backend should make the jobs execute. let mut model = BackendStateModel::new(&mut tx); model.toggle_backend_state(BackendState::Running).await?; application.commit_test(tx).await?; rt.wait(Duration::from_secs(100)).await; let mut tx = application.begin(Identity::system()).await?; let mut table_model = TableModel::new(&mut tx); assert!( !table_model .table_is_empty(OBJECTS_TABLE_COMPONENT.into(), &OBJECTS_TABLE) .await? ); let mut logs_query = cron_log_query(&mut tx, ComponentId::Root)?; assert!(logs_query.next(&mut tx, None).await?.is_some()); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server