Skip to main content
Glama

Convex MCP server

Official
by get-convex
usage_tracking.rs15.4 kB
use std::time::Duration; use common::{ assert_obj, bootstrap_model::index::IndexMetadata, components::{ ComponentFunctionPath, ComponentId, ComponentPath, }, execution_context::ExecutionId, maybe_val, query::{ IndexRange, IndexRangeExpression, Order, Query, }, types::{ IndexDescriptor, IndexName, ModuleEnvironment, TableName, UdfIdentifier, }, RequestId, }; use keybroker::Identity; use maplit::btreeset; use pretty_assertions::assert_eq; use runtime::testing::TestRuntime; use sync_types::CanonicalizedUdfPath; use usage_tracking::{ CallType, FunctionUsageTracker, }; use value::{ sha256::Sha256Digest, TableNamespace, }; use vector::VectorSearch; use crate::{ test_helpers::DbFixtures, tests::vector_test_utils::{ add_document_vec_array, IndexData, VectorFixtures, }, IndexModel, ResolvedQuery, UserFacingModel, }; fn test_udf_identifier() -> UdfIdentifier { let udf_path: CanonicalizedUdfPath = "test.js:default".parse().unwrap(); let component = ComponentPath::root(); let path = ComponentFunctionPath { component, udf_path: udf_path.strip(), }; UdfIdentifier::Function(path.canonicalize()) } #[convex_macro::test_runtime] async fn vector_insert_with_no_index_does_not_count_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let table_name: TableName = "my_table".parse()?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, &table_name, [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Action { env: ModuleEnvironment::Isolate, duration: Duration::from_secs(10), memory_in_mb: 10, }, true, tx_usage.gather_user_stats(), ) .await; let stats = fixtures.test_usage_logger.collect(); assert!(stats.recent_vector_ingress_size.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn vector_insert_counts_usage_for_backfilling_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.backfilling_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; fixtures .add_document_vec_array(index_name.table(), [3f64, 4f64]) .await?; let stats = fixtures.test_usage_logger.collect(); let value = stats .recent_vector_ingress_size .get(&(*index_name.table()).to_string()) .cloned(); assert!(value.unwrap() > 0); Ok(()) } #[convex_macro::test_runtime] async fn vector_insert_counts_usage_for_enabled_indexes(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Action { env: ModuleEnvironment::Isolate, duration: Duration::from_secs(10), memory_in_mb: 10, }, true, tx_usage.gather_user_stats(), ) .await; let stats = fixtures.test_usage_logger.collect(); let value = stats .recent_vector_ingress_size .get(&(*index_name.table()).to_string()) .cloned(); assert!(value.unwrap() > 0); Ok(()) } #[convex_macro::test_runtime] async fn vectors_in_segment_count_as_usage(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures .db .usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Action { env: ModuleEnvironment::Isolate, duration: Duration::from_secs(10), memory_in_mb: 10, }, true, tx_usage.gather_user_stats(), ) .await; fixtures.new_live_index_flusher()?.step().await?; let storage = fixtures .db .latest_database_snapshot()? .get_vector_index_storage(&Identity::system())?; let key = (ComponentPath::root(), index_name.table().clone()); let value = storage.get(&key).cloned(); assert_eq!(value, Some(8_u64)); Ok(()) } #[convex_macro::test_runtime] async fn vector_query_counts_bandwidth(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.enabled_vector_index().await?; // Use a user transaction, not a system transaction let tx_usage = FunctionUsageTracker::new(); let mut tx = fixtures .db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?; fixtures.db.commit(tx).await?; fixtures.new_backfill_index_flusher()?.step().await?; let (_, usage_stats) = fixtures .db .vector_search( Identity::Unknown(None), VectorSearch { index_name: index_name.clone(), component_id: ComponentId::Root, limit: Some(10), vector: vec![0.; 2], expressions: btreeset![], }, ) .await?; tx_usage.add(usage_stats); fixtures .db .usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Action { env: ModuleEnvironment::Isolate, duration: Duration::from_secs(10), memory_in_mb: 10, }, true, tx_usage.gather_user_stats(), ) .await; let stats = fixtures.test_usage_logger.collect(); let vector_egress = stats.recent_vector_egress_size; let bandwidth_egress = stats.recent_database_egress_size; assert!(*vector_egress.get(&index_name.table().to_string()).unwrap() > 0); assert!( *bandwidth_egress .get(&index_name.table().to_string()) .unwrap() > 0 ); Ok(()) } #[convex_macro::test_runtime] async fn test_usage_tracking_basic_insert_and_get(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let obj = assert_obj!("key" => vec![0; 100]); let table_name: TableName = "my_table".parse()?; let doc_id = UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), obj.clone()) .await?; db.commit(tx).await?; db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; let stats = test_usage_logger.collect(); // Database ingress counted for write to user table, rounded up let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.len(), 1); assert!(database_ingress.contains_key("my_table")); assert!(*database_ingress.get("my_table").unwrap() > 0); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.values().sum::<u64>(), 0); // Database egress counted for read to user table, rounded up let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; UserFacingModel::new_root_for_test(&mut tx) .get_with_ts(doc_id, None) .await?; db.commit(tx).await?; db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; let stats = test_usage_logger.collect(); let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.values().sum::<u64>(), 0); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.len(), 1); assert!(database_egress.contains_key("my_table")); assert!(*database_egress.get("my_table").unwrap() > 0); Ok(()) } #[convex_macro::test_runtime] async fn test_usage_tracking_insert_with_index(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; // Add a user index let table_name: TableName = "my_table".parse()?; let namespace = TableNamespace::test_user(); let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::system(), tx_usage.clone()) .await?; let index_name = IndexName::new(table_name.clone(), IndexDescriptor::new("by_key")?)?; IndexModel::new(&mut tx) .add_application_index( namespace, IndexMetadata::new_enabled(index_name.clone(), vec!["key".parse()?].try_into()?), ) .await .unwrap_or_else(|e| panic!("Failed to add index for {} {:?}", "by_key", e)); db.commit(tx).await?; db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let obj = assert_obj!("key" => 1); let obj2 = assert_obj!("key" => 3); let obj3 = assert_obj!("key" => 1); UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), obj.clone()) .await?; UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), obj2.clone()) .await?; UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), obj3.clone()) .await?; db.commit(tx).await?; db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; let stats = test_usage_logger.collect(); let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.len(), 1); assert!(database_ingress.contains_key("my_table")); assert!(*database_ingress.get("my_table").unwrap() > 0); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.values().sum::<u64>(), 0); let tx_usage = FunctionUsageTracker::new(); let mut tx = db .begin_with_usage(Identity::Unknown(None), tx_usage.clone()) .await?; let index_query = Query::index_range(IndexRange { index_name, range: vec![IndexRangeExpression::Eq("key".parse()?, maybe_val!(1))], order: Order::Asc, }); let mut query_stream = ResolvedQuery::new(&mut tx, namespace, index_query)?; while query_stream.next(&mut tx, None).await?.is_some() {} db.commit(tx).await?; db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::Mutation { occ_info: None }, true, tx_usage.gather_user_stats(), ) .await; let stats = test_usage_logger.collect(); let database_ingress = stats.recent_database_ingress_size; assert_eq!(database_ingress.values().sum::<u64>(), 0); let database_egress = stats.recent_database_egress_size; assert_eq!(database_egress.len(), 1); assert!(database_egress.contains_key("my_table")); assert!(*database_egress.get("my_table").unwrap() > 0); Ok(()) } #[convex_macro::test_runtime] async fn http_action_counts_compute(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, test_usage_logger, .. } = DbFixtures::new(&rt).await?; let tx_usage = FunctionUsageTracker::new(); db.usage_counter() .track_call( test_udf_identifier(), ExecutionId::new(), RequestId::new(), CallType::HttpAction { duration: Duration::from_secs(5), memory_in_mb: 100, response_sha256: Sha256Digest::from([0; 32]), }, true, tx_usage.gather_user_stats(), ) .await; let stats = test_usage_logger.collect(); assert_eq!( stats.recent_node_action_compute_time.values().sum::<u64>(), 0 ); assert_eq!( stats.recent_v8_action_compute_time.values().sum::<u64>(), 500000 ); assert_eq!( *stats .recent_v8_action_compute_time .get("test.js:default") .unwrap(), 500000 ); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server