Skip to main content
Glama
usage_tracking.rs18.3 kB
use common::{ assert_obj, bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, }, components::{ ComponentId, ComponentPath, }, document::PackedDocument, maybe_val, query::{ IndexRange, IndexRangeExpression, Order, Query, }, types::{ IndexDescriptor, IndexName, PersistenceVersion, TableName, }, }; use indexing::index_registry::IndexedDocument; use keybroker::Identity; use maplit::btreeset; use pretty_assertions::assert_eq; use runtime::testing::TestRuntime; use usage_tracking::FunctionUsageTracker; use value::{ Size, TableNamespace, }; use vector::VectorSearch; use crate::{ test_helpers::DbFixtures, tests::{ text_test_utils::{ add_document, TextFixtures, TextIndexData, }, vector_test_utils::{ add_document_vec_array, VectorFixtures, VectorIndexData, }, }, IndexModel, ResolvedQuery, TestFacingModel, UserFacingModel, }; #[convex_macro::test_runtime] async fn vector_insert_with_no_index_does_not_count_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let table_name: TableName = "my_table".parse()?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, &table_name, [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = fixtures.test_usage_logger.collect(); assert!(stats.recent_vector_ingress_size.is_empty()); assert!(stats.recent_vector_ingress_size_v2.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn vector_insert_counts_usage_for_backfilling_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let VectorIndexData { index_name, qdrant_schema, .. } = fixtures.backfilling_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let doc_id = add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; let document = tx.get(doc_id).await?.unwrap(); fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = fixtures.test_usage_logger.collect(); let value = stats .recent_vector_ingress_size .get(&(*index_name.table()).to_string()) .cloned(); let value_v2 = stats .recent_vector_ingress_size_v2 .get(&(*index_name.table()).to_string()) .cloned(); assert_eq!(value, Some((document.size()) as u64)); assert_eq!( value_v2, Some((qdrant_schema.estimate_vector_size() + doc_id.size()) as u64) ); Ok(()) } #[convex_macro::test_runtime] async fn vector_insert_counts_usage_for_enabled_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let VectorIndexData { index_name, qdrant_schema, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let doc_id = add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; let document = tx.get(doc_id).await?.unwrap(); fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let mut stats = fixtures.test_usage_logger.collect(); let value = stats .recent_vector_ingress_size .remove(&(*index_name.table()).to_string()); let value_v2 = stats .recent_vector_ingress_size_v2 .remove(&(*index_name.table()).to_string()); assert_eq!(value, Some((document.size()) as u64)); assert_eq!( value_v2, Some((qdrant_schema.estimate_vector_size() + doc_id.size()) as u64) ); Ok(()) } #[convex_macro::test_runtime] async fn vectors_in_segment_count_as_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let VectorIndexData { index_name, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; fixtures.new_live_index_flusher()?.step().await?; let storage = fixtures .db .latest_database_snapshot()? .get_vector_index_storage(&Identity::system())?; let key = (ComponentPath::root(), index_name.table().clone()); let value = storage.get(&key).cloned(); assert_eq!(value, Some(8_u64)); Ok(()) } #[convex_macro::test_runtime] async fn vector_query_counts_bandwidth(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let VectorIndexData { index_name, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures.new_backfill_index_flusher()?.step().await?; let (results, usage_stats) = fixtures .db .vector_search( Identity::Unknown(None), VectorSearch { index_name: index_name.clone(), component_id: ComponentId::Root, limit: Some(10), vector: vec![0.; 2], expressions: btreeset![], }, ) .await?; tx_usage.add(usage_stats); fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let total_size = results.into_iter().map(|row| row.size() as u64).sum(); let mut stats = fixtures.test_usage_logger.collect(); assert_eq!( stats .recent_vector_egress_size .remove(&index_name.table().to_string()), Some(total_size) ); assert_eq!( stats .recent_database_egress_size .remove(&index_name.table().to_string()), Some(total_size) ); Ok(()) } #[convex_macro::test_runtime] async fn text_fields_in_segment_count_as_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let TextIndexData { index_name, .. } = fixtures.enabled_text_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document(&mut tx, index_name.table(), "test").await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; fixtures.new_live_text_flusher().step().await?; let storage = fixtures .db .latest_database_snapshot()? .get_text_index_storage(&Identity::system())?; let key = (ComponentPath::root(), index_name.table().clone()); let value = storage.get(&key).cloned(); assert_eq!(value, Some(2658_u64)); Ok(()) } #[convex_macro::test_runtime] async fn text_insert_with_no_index_does_not_count_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let table_name: TableName = "my_table".parse()?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document(&mut tx, &table_name, "hello").await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = fixtures.test_usage_logger.collect(); assert!(stats.recent_text_ingress_size.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn text_insert_counts_usage_for_backfilling_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let TextIndexData { index_name, tantivy_schema, .. } = fixtures.insert_backfilling_text_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let document_id = add_document(&mut tx, index_name.table(), "hello").await?; let document = tx.get(document_id).await?.unwrap(); fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = fixtures.test_usage_logger.collect(); let value = stats .recent_text_ingress_size .get(&(*index_name.table()).to_string()) .cloned(); let expected_size = tantivy_schema.estimate_size(&document); assert_eq!(value, Some(expected_size)); Ok(()) } #[convex_macro::test_runtime] async fn text_insert_counts_usage_for_enabled_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let TextIndexData { index_name, tantivy_schema, .. } = fixtures.enabled_text_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let document_id = add_document(&mut tx, index_name.table(), "hello").await?; let document = tx.get(document_id).await?.unwrap(); fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = fixtures.test_usage_logger.collect(); let value = stats .recent_text_ingress_size .get(&(*index_name.table()).to_string()) .cloned(); let expected_size = tantivy_schema.estimate_size(&document); assert_eq!(value, Some(expected_size)); Ok(()) } #[convex_macro::test_runtime] async fn test_usage_tracking_basic_insert_and_get(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let obj = assert_obj!("key" => vec![0; 100]); let table_name: TableName = "my_table".parse()?; let doc_id = TestFacingModel::new(&mut tx) .insert(&table_name, obj.clone()) .await?; db.commit(tx).await?; db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = test_usage_logger.collect(); // Database ingress counted for write to user table, rounded up let mut database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.len(), 1); assert!(database_ingress.contains_key("my_table")); let document = db.begin_system().await?.get(doc_id).await?.unwrap(); assert_eq!( database_ingress.remove("my_table"), Some(document.size() as u64) ); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.values().sum::<u64>(), 0); // Database egress counted for read to user table, rounded up let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; UserFacingModel::new_root_for_test(&mut tx) .get_with_ts(doc_id.developer_id, None) .await?; db.commit(tx).await?; db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = test_usage_logger.collect(); let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.values().sum::<u64>(), 0); let mut database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.len(), 1); assert!(database_egress.contains_key("my_table")); assert_eq!( database_egress.remove("my_table"), Some(document.size() as u64) ); Ok(()) } #[convex_macro::test_runtime] async fn test_usage_tracking_insert_with_index(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; // Add a user index let table_name: TableName = "my_table".parse()?; let namespace = TableNamespace::test_user(); let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::system(), tx_usage.clone()) .await?; let index_name = IndexName::new(table_name.clone(), IndexDescriptor::new("by_key")?)?; let fields: IndexedFields = vec!["key".parse()?].try_into()?; IndexModel::new(&mut tx) .add_application_index( namespace, IndexMetadata::new_enabled(index_name.clone(), fields.clone()), ) .await .unwrap_or_else(|e| panic!("Failed to add index for {} {:?}", "by_key", e)); db.commit(tx).await?; db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let obj = assert_obj!("key" => 1); let obj2 = assert_obj!("key" => 3); let obj3 = assert_obj!("key" => 1); let doc_id1 = TestFacingModel::new(&mut tx) .insert(&table_name, obj.clone()) .await?; let doc_id2 = TestFacingModel::new(&mut tx) .insert(&table_name, obj2.clone()) .await?; let doc_id3 = TestFacingModel::new(&mut tx) .insert(&table_name, obj3.clone()) .await?; db.commit(tx).await?; db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let mut tx = db.begin_system().await?; let doc1 = tx.get(doc_id1).await?.unwrap(); let doc2 = tx.get(doc_id2).await?.unwrap(); let doc3 = tx.get(doc_id3).await?.unwrap(); let stats = test_usage_logger.collect(); let mut database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.len(), 1); assert!(database_ingress.contains_key("my_table")); assert_eq!( database_ingress.remove("my_table"), // double it for the index Some((doc1.size() + doc2.size() + doc3.size()) as u64 * 2) ); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.values().sum::<u64>(), 0); let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let index_query = Query::index_range(IndexRange { index_name, range: vec![IndexRangeExpression::Eq("key".parse()?, maybe_val!(1))], order: Order::Asc, }); let mut query_stream = ResolvedQuery::new(&mut tx, namespace, index_query)?; while query_stream.next(&mut tx, None).await?.is_some() {} db.commit(tx).await?; db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = test_usage_logger.collect(); let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.values().sum::<u64>(), 0); let mut database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.len(), 1); assert!(database_egress.contains_key("my_table")); assert_eq!( database_egress.remove("my_table"), Some( (doc1.size() + doc3.size() + PackedDocument::pack(&doc1) .index_key_bytes(&fields, PersistenceVersion::V5) .len() + PackedDocument::pack(&doc3) .index_key_bytes(&fields, PersistenceVersion::V5) .len()) as u64 ) ); Ok(()) } #[convex_macro::test_runtime] async fn test_action_counts_compute(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; let tx_usage = FunctionUsageTracker::new(); db.usage_counter() .track_call_test(tx_usage.gather_user_stats()) .await; let stats = test_usage_logger.collect(); assert_eq!( stats.recent_node_action_compute_time.values().sum::<u64>(), 0 ); assert_eq!( stats.recent_v8_action_compute_time.values().sum::<u64>(), 100000 ); assert_eq!( *stats .recent_v8_action_compute_time .get("test.js:default") .unwrap(), 100000 ); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server