Skip to main content
Glama

Convex MCP server

Official
by get-convex
lib.rs37.5 kB
#![feature(iterator_try_collect)] #![feature(let_chains)] use std::{ collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration, }; use anyhow::Context; use async_trait::async_trait; use common::{ components::ComponentPath, execution_context::ExecutionId, knobs::{ FUNCTION_LIMIT_WARNING_RATIO, TRANSACTION_MAX_READ_SIZE_BYTES, TRANSACTION_MAX_READ_SIZE_ROWS, }, types::{ ModuleEnvironment, StorageUuid, UdfIdentifier, }, RequestId, }; use events::usage::{ FunctionCallUsageFields, InsightReadLimitCall, UsageEvent, UsageEventLogger, }; use headers::ContentType; use parking_lot::Mutex; use pb::usage::{ CounterWithComponent as CounterWithComponentProto, CounterWithTag as CounterWithTagProto, FunctionUsageStats as FunctionUsageStatsProto, }; use value::{ heap_size::WithHeapSize, sha256::Sha256Digest, }; mod metrics; /// The core usage stats aggregator that is cheaply cloneable #[derive(Clone, Debug)] pub struct UsageCounter { usage_logger: Arc<dyn UsageEventLogger>, } impl UsageCounter { pub fn new(usage_logger: Arc<dyn UsageEventLogger>) -> Self { Self { usage_logger } } // Used for tracking storage ingress outside of a user function (e.g. snapshot // import/export). pub async fn track_independent_storage_ingress_size( &self, component_path: ComponentPath, tag: String, ingress_size: u64, ) { let independent_tracker = IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone()); independent_tracker .track_storage_ingress_size(component_path, tag, ingress_size) .await; } // Used for tracking storage egress outside of a user function (e.g. snapshot // import/export). pub async fn track_independent_storage_egress_size( &self, component_path: ComponentPath, tag: String, egress_size: u64, ) { let independent_tracker = IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone()); independent_tracker .track_storage_egress_size(component_path, tag, egress_size) .await; } } #[derive(Debug, Clone)] pub struct OccInfo { pub table_name: Option<String>, pub document_id: Option<String>, pub write_source: Option<String>, pub retry_count: u64, } pub enum CallType { Action { env: ModuleEnvironment, duration: Duration, memory_in_mb: u64, }, HttpAction { duration: Duration, memory_in_mb: u64, /// Sha256 of the response body response_sha256: Sha256Digest, }, Export, CachedQuery, UncachedQuery, Mutation { occ_info: Option<OccInfo>, }, Import, CloudBackup, CloudRestore, } impl CallType { fn tag(&self) -> &'static str { match self { Self::Action { .. } => "action", Self::Export => "export", Self::CachedQuery => "cached_query", Self::UncachedQuery => "uncached_query", Self::Mutation { .. } => "mutation", Self::HttpAction { .. } => "http_action", Self::Import => "import", Self::CloudBackup => "cloud_backup", Self::CloudRestore => "cloud_restore", } } fn is_occ(&self) -> bool { match self { Self::Mutation { occ_info, .. } => occ_info.is_some(), _ => false, } } fn occ_document_id(&self) -> Option<String> { match self { Self::Mutation { occ_info } => { occ_info.as_ref().and_then(|info| info.document_id.clone()) }, _ => None, } } fn occ_table_name(&self) -> Option<String> { match self { Self::Mutation { occ_info, .. } => { occ_info.as_ref().and_then(|info| info.table_name.clone()) }, _ => None, } } fn occ_write_source(&self) -> Option<String> { match self { Self::Mutation { occ_info, .. } => { occ_info.as_ref().and_then(|info| info.write_source.clone()) }, _ => None, } } fn occ_retry_count(&self) -> Option<u64> { match self { Self::Mutation { occ_info, .. } => occ_info.as_ref().map(|info| info.retry_count), _ => None, } } fn memory_megabytes(&self) -> u64 { match self { CallType::Action { memory_in_mb, .. } | CallType::HttpAction { memory_in_mb, .. } => { *memory_in_mb }, _ => 0, } } fn duration_millis(&self) -> u64 { match self { CallType::Action { duration, .. } | CallType::HttpAction { duration, .. } => { u64::try_from(duration.as_millis()) .expect("Action was running for over 584 billion years??") }, _ => 0, } } fn environment(&self) -> String { match self { CallType::Action { env, .. } => env, // All other UDF types, including HTTP actions run on the isolate // only. _ => &ModuleEnvironment::Isolate, } .to_string() } fn response_sha256(&self) -> Option<String> { match self { CallType::HttpAction { response_sha256, .. } => Some(response_sha256.as_hex()), _ => None, } } } impl UsageCounter { pub async fn track_call( &self, udf_path: UdfIdentifier, execution_id: ExecutionId, request_id: RequestId, call_type: CallType, success: bool, stats: FunctionUsageStats, ) { let mut usage_metrics = Vec::new(); // Because system udfs might cause usage before any data is added by the user, // we do not count their calls. We do count their bandwidth. let (should_track_calls, udf_id_type) = match &udf_path { UdfIdentifier::Function(path) => (!path.udf_path.is_system(), "function"), UdfIdentifier::Http(_) => (true, "http"), UdfIdentifier::SystemJob(_) => (false, "_system_job"), }; let (component_path, udf_id) = udf_path.clone().into_component_and_udf_path(); usage_metrics.push(UsageEvent::FunctionCall { fields: FunctionCallUsageFields { id: execution_id.to_string(), request_id: request_id.to_string(), status: if success { "success" } else { "failure" }.to_string(), component_path, udf_id, udf_id_type: udf_id_type.to_string(), tag: call_type.tag().to_string(), memory_megabytes: call_type.memory_megabytes(), duration_millis: call_type.duration_millis(), environment: call_type.environment(), is_tracked: should_track_calls, response_sha256: call_type.response_sha256(), is_occ: call_type.is_occ(), occ_table_name: call_type.occ_table_name(), occ_document_id: call_type.occ_document_id(), occ_write_source: call_type.occ_write_source(), occ_retry_count: call_type.occ_retry_count(), }, }); // We always track bandwidth, even for system udfs. self._track_function_usage( udf_path, stats, execution_id, request_id, success, &mut usage_metrics, ); self.usage_logger.record_async(usage_metrics).await; } // TODO: The existence of this function is a hack due to shortcuts we have // done in Node.js usage tracking. It should only be used by Node.js action // callbacks. We should only be using track_call() and never calling this // this directly. Otherwise, we will have the usage reflected in the usage // stats for billing but not in the UDF execution log counters. pub async fn track_function_usage( &self, udf_path: UdfIdentifier, execution_id: ExecutionId, request_id: RequestId, stats: FunctionUsageStats, ) { let mut usage_metrics = Vec::new(); self._track_function_usage( udf_path, stats, execution_id, request_id, true, &mut usage_metrics, ); self.usage_logger.record_async(usage_metrics).await; } pub fn _track_function_usage( &self, udf_path: UdfIdentifier, stats: FunctionUsageStats, execution_id: ExecutionId, request_id: RequestId, success: bool, usage_metrics: &mut Vec<UsageEvent>, ) { // Merge the storage stats. let (_, udf_id) = udf_path.into_component_and_udf_path(); for ((component_path, storage_api), function_count) in stats.storage_calls { usage_metrics.push(UsageEvent::FunctionStorageCalls { id: execution_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), call: storage_api, count: function_count, }); } for (component_path, ingress_size) in stats.storage_ingress_size { usage_metrics.push(UsageEvent::FunctionStorageBandwidth { id: execution_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), ingress: ingress_size, egress: 0, }); } for (component_path, egress_size) in stats.storage_egress_size { usage_metrics.push(UsageEvent::FunctionStorageBandwidth { id: execution_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), ingress: 0, egress: egress_size, }); } // Merge "by table" bandwidth stats. for ((component_path, table_name), ingress_size) in stats.database_ingress_size { usage_metrics.push(UsageEvent::DatabaseBandwidth { id: execution_id.to_string(), request_id: request_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: ingress_size, egress: 0, egress_rows: 0, }); } for ((component_path, table_name), egress_size) in stats.database_egress_size.clone() { let rows = stats .database_egress_rows .get(&(component_path.clone(), table_name.clone())) .unwrap_or(&0); usage_metrics.push(UsageEvent::DatabaseBandwidth { id: execution_id.to_string(), request_id: request_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: 0, egress: egress_size, egress_rows: *rows, }); } // Check read limits and add InsightReadLimit event if thresholds are exceeded let total_rows: u64 = stats.database_egress_rows.values().sum(); let total_bytes: u64 = stats.database_egress_size.values().sum(); let row_threshold = (*TRANSACTION_MAX_READ_SIZE_ROWS as f64 * *FUNCTION_LIMIT_WARNING_RATIO) as u64; let byte_threshold = (*TRANSACTION_MAX_READ_SIZE_BYTES as f64 * *FUNCTION_LIMIT_WARNING_RATIO) as u64; let did_exceed_document_threshold = total_rows >= row_threshold; let did_exceed_byte_threshold = total_bytes >= byte_threshold; if did_exceed_document_threshold || did_exceed_byte_threshold { let mut calls = Vec::new(); let component_path: Option<ComponentPath> = match stats.database_egress_rows.first_key_value() { Some(((component_path, _), _)) => Some(component_path.clone()), None => { tracing::error!( "Failed to find component path despite thresholds being exceeded" ); None }, }; if let Some(component_path) = component_path { for ((cp, table_name), egress_rows) in stats.database_egress_rows.into_iter() { let egress = stats .database_egress_size .get(&(cp, table_name.clone())) .copied() .unwrap_or(0); calls.push(InsightReadLimitCall { table_name, bytes_read: egress, documents_read: egress_rows, }); } usage_metrics.push(UsageEvent::InsightReadLimit { id: execution_id.to_string(), request_id: request_id.to_string(), udf_id: udf_id.clone(), component_path: component_path.serialize(), calls, success, }); } } for ((component_path, table_name), ingress_size) in stats.vector_ingress_size { usage_metrics.push(UsageEvent::VectorBandwidth { id: execution_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: ingress_size, egress: 0, }); } for ((component_path, table_name), egress_size) in stats.vector_egress_size { usage_metrics.push(UsageEvent::VectorBandwidth { id: execution_id.to_string(), component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: 0, egress: egress_size, }); } } } // We can track storage attributed by UDF or not. This is why unlike database // and vector search egress/ingress those methods are both on // FunctionUsageTracker and UsageCounters directly. #[async_trait] pub trait StorageUsageTracker: Send + Sync { async fn track_storage_call( &self, component_path: ComponentPath, storage_api: &'static str, storage_id: StorageUuid, content_type: Option<ContentType>, sha256: Sha256Digest, ) -> Box<dyn StorageCallTracker>; } #[async_trait] pub trait StorageCallTracker: Send + Sync { async fn track_storage_ingress_size( &self, component_path: ComponentPath, tag: String, ingress_size: u64, ); async fn track_storage_egress_size( &self, component_path: ComponentPath, tag: String, egress_size: u64, ); } struct IndependentStorageCallTracker { execution_id: ExecutionId, usage_logger: Arc<dyn UsageEventLogger>, } impl IndependentStorageCallTracker { fn new(execution_id: ExecutionId, usage_logger: Arc<dyn UsageEventLogger>) -> Self { Self { execution_id, usage_logger, } } } #[async_trait] impl StorageCallTracker for IndependentStorageCallTracker { async fn track_storage_ingress_size( &self, component_path: ComponentPath, tag: String, ingress_size: u64, ) { metrics::storage::log_storage_ingress_size(ingress_size); self.usage_logger .record_async(vec![UsageEvent::StorageBandwidth { id: self.execution_id.to_string(), component_path: component_path.serialize(), tag, ingress: ingress_size, egress: 0, }]) .await; } async fn track_storage_egress_size( &self, component_path: ComponentPath, tag: String, egress_size: u64, ) { metrics::storage::log_storage_egress_size(egress_size); self.usage_logger .record_async(vec![UsageEvent::StorageBandwidth { id: self.execution_id.to_string(), component_path: component_path.serialize(), tag, ingress: 0, egress: egress_size, }]) .await; } } #[async_trait] impl StorageUsageTracker for UsageCounter { async fn track_storage_call( &self, component_path: ComponentPath, storage_api: &'static str, storage_id: StorageUuid, content_type: Option<ContentType>, sha256: Sha256Digest, ) -> Box<dyn StorageCallTracker> { let execution_id = ExecutionId::new(); metrics::storage::log_storage_call(); self.usage_logger .record_async(vec![UsageEvent::StorageCall { id: execution_id.to_string(), component_path: component_path.serialize(), // Ideally we would track the Id<_storage> instead of the StorageUuid // but it's a bit annoying for now, so just going with this. storage_id: storage_id.to_string(), call: storage_api.to_string(), content_type: content_type.map(|c| c.to_string()), sha256: sha256.as_hex(), }]) .await; Box::new(IndependentStorageCallTracker::new( execution_id, self.usage_logger.clone(), )) } } /// Usage tracker used within a Transaction. Note that this structure does not /// directly report to the backend global counters and instead only buffers the /// counters locally. The counters get rolled into the global ones via /// UsageCounters::track_call() at the end of each UDF. This provides a /// consistent way to account for usage, where we only bill people for usage /// that makes it to the UdfExecution log. #[derive(Debug, Clone)] pub struct FunctionUsageTracker { // TODO: We should ideally not use an Arc<Mutex> here. The best way to achieve // this is to move the logic for accounting ingress out of the Committer into // the Transaction. Then Transaction can solely own the counters and we can // remove clone(). The alternative is for the Committer to take ownership of // the usage tracker and then return it, but this will make it complicated if // we later decide to charge people for OCC bandwidth. state: Arc<Mutex<FunctionUsageStats>>, } impl FunctionUsageTracker { pub fn new() -> Self { Self { state: Arc::new(Mutex::new(FunctionUsageStats::default())), } } /// Calculate FunctionUsageStats here pub fn gather_user_stats(self) -> FunctionUsageStats { self.state.lock().clone() } /// Adds the given usage stats to the current tracker. pub fn add(&self, stats: FunctionUsageStats) { self.state.lock().merge(stats); } // Tracks database usage from write operations (insert/update/delete) for // documents that are not in vector indexes. If the document has one or more // vectors in a vector index, call `track_vector_ingress_size` instead of // this method. // // You must always check to see if a document is a vector index before // calling this method. pub fn track_database_ingress_size( &self, component_path: ComponentPath, table_name: String, ingress_size: u64, skip_logging: bool, ) { if skip_logging { return; } let mut state = self.state.lock(); state .database_ingress_size .mutate_entry_or_default((component_path, table_name), |count| *count += ingress_size); } pub fn track_database_egress_size( &self, component_path: ComponentPath, table_name: String, egress_size: u64, skip_logging: bool, ) { if skip_logging { return; } let mut state = self.state.lock(); state .database_egress_size .mutate_entry_or_default((component_path, table_name), |count| *count += egress_size); } pub fn track_database_egress_rows( &self, component_path: ComponentPath, table_name: String, egress_rows: u64, skip_logging: bool, ) { if skip_logging { return; } let mut state = self.state.lock(); state .database_egress_rows .mutate_entry_or_default((component_path, table_name), |count| *count += egress_rows); } // Tracks the vector ingress surcharge and database usage for documents // that have one or more vectors in a vector index. // // If the document does not have a vector in a vector index, call // `track_database_ingress_size` instead of this method. // // Vector bandwidth is a surcharge on vector related bandwidth usage. As a // result it counts against both bandwidth ingress and vector ingress. // Ingress is a bit trickier than egress because vector ingress needs to be // updated whenever the mutated document is in a vector index. To be in a // vector index the document must both be in a table with a vector index and // have at least one vector that's actually used in the index. pub fn track_vector_ingress_size( &self, component_path: ComponentPath, table_name: String, ingress_size: u64, skip_logging: bool, ) { if skip_logging { return; } // Note that vector search counts as both database and vector bandwidth // per the comment above. let mut state = self.state.lock(); let key = (component_path, table_name); state .database_ingress_size .mutate_entry_or_default(key.clone(), |count| { *count += ingress_size; }); state .vector_ingress_size .mutate_entry_or_default(key, |count| { *count += ingress_size; }); } // Tracks bandwidth usage from vector searches // // Vector bandwidth is a surcharge on vector related bandwidth usage. As a // result it counts against both bandwidth egress and vector egress. It's an // error to increment vector egress without also incrementing database // egress. The reverse is not true however, it's totally fine to increment // general database egress without incrementing vector egress if the operation // is not a vector search. // // Unlike track_database_ingress_size, this method is explicitly vector related // because we should always know that the relevant operation is a vector // search. In contrast, for ingress any insert/update/delete could happen to // impact a vector index. pub fn track_vector_egress_size( &self, component_path: ComponentPath, table_name: String, egress_size: u64, skip_logging: bool, ) { if skip_logging { return; } // Note that vector search counts as both database and vector bandwidth // per the comment above. let mut state = self.state.lock(); let key = (component_path, table_name); state .database_egress_size .mutate_entry_or_default(key.clone(), |count| *count += egress_size); state .vector_egress_size .mutate_entry_or_default(key, |count| *count += egress_size); } } // For UDFs, we track storage at the per UDF level, no finer. So we can just // aggregate over the entire UDF and not worry about sending usage events or // creating unique execution ids. // Note: If we want finer-grained breakdown of file bandwidth, we can thread the // tag through FunctionUsageStats. For now we're just interested in the // breakdown of file bandwidth from functions vs external sources like snapshot // export/cloud backups. #[async_trait] impl StorageCallTracker for FunctionUsageTracker { async fn track_storage_ingress_size( &self, component_path: ComponentPath, _tag: String, ingress_size: u64, ) { let mut state = self.state.lock(); metrics::storage::log_storage_ingress_size(ingress_size); state .storage_ingress_size .mutate_entry_or_default(component_path, |count| *count += ingress_size); } async fn track_storage_egress_size( &self, component_path: ComponentPath, _tag: String, egress_size: u64, ) { let mut state = self.state.lock(); metrics::storage::log_storage_egress_size(egress_size); state .storage_egress_size .mutate_entry_or_default(component_path, |count| *count += egress_size); } } #[async_trait] impl StorageUsageTracker for FunctionUsageTracker { async fn track_storage_call( &self, component_path: ComponentPath, storage_api: &'static str, _storage_id: StorageUuid, _content_type: Option<ContentType>, _sha256: Sha256Digest, ) -> Box<dyn StorageCallTracker> { let mut state = self.state.lock(); metrics::storage::log_storage_call(); state .storage_calls .mutate_entry_or_default((component_path, storage_api.to_string()), |count| { *count += 1 }); Box::new(self.clone()) } } type TableName = String; type StorageAPI = String; /// User-facing UDF stats, built #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct FunctionUsageStats { pub storage_calls: WithHeapSize<BTreeMap<(ComponentPath, StorageAPI), u64>>, pub storage_ingress_size: WithHeapSize<BTreeMap<ComponentPath, u64>>, pub storage_egress_size: WithHeapSize<BTreeMap<ComponentPath, u64>>, pub database_ingress_size: WithHeapSize<BTreeMap<(ComponentPath, TableName), u64>>, pub database_egress_size: WithHeapSize<BTreeMap<(ComponentPath, TableName), u64>>, pub database_egress_rows: WithHeapSize<BTreeMap<(ComponentPath, TableName), u64>>, pub vector_ingress_size: WithHeapSize<BTreeMap<(ComponentPath, TableName), u64>>, pub vector_egress_size: WithHeapSize<BTreeMap<(ComponentPath, TableName), u64>>, } impl FunctionUsageStats { pub fn aggregate(&self) -> AggregatedFunctionUsageStats { AggregatedFunctionUsageStats { database_read_bytes: self.database_egress_size.values().sum(), database_write_bytes: self.database_ingress_size.values().sum(), database_read_documents: self.database_egress_rows.values().sum(), storage_read_bytes: self.storage_egress_size.values().sum(), storage_write_bytes: self.storage_ingress_size.values().sum(), vector_index_read_bytes: self.vector_egress_size.values().sum(), vector_index_write_bytes: self.vector_ingress_size.values().sum(), } } fn merge(&mut self, other: Self) { // Merge the storage stats. for (key, function_count) in other.storage_calls { self.storage_calls .mutate_entry_or_default(key, |count| *count += function_count); } for (key, ingress_size) in other.storage_ingress_size { self.storage_ingress_size .mutate_entry_or_default(key, |count| *count += ingress_size); } for (key, egress_size) in other.storage_egress_size { self.storage_egress_size .mutate_entry_or_default(key, |count| *count += egress_size); } // Merge "by table" bandwidth other. for (key, ingress_size) in other.database_ingress_size { self.database_ingress_size .mutate_entry_or_default(key.clone(), |count| *count += ingress_size); } for (key, egress_size) in other.database_egress_size { self.database_egress_size .mutate_entry_or_default(key.clone(), |count| *count += egress_size); } for (key, egress_rows) in other.database_egress_rows { self.database_egress_rows .mutate_entry_or_default(key.clone(), |count| *count += egress_rows); } for (key, ingress_size) in other.vector_ingress_size { self.vector_ingress_size .mutate_entry_or_default(key.clone(), |count| *count += ingress_size); } for (key, egress_size) in other.vector_egress_size { self.vector_egress_size .mutate_entry_or_default(key.clone(), |count| *count += egress_size); } } } #[cfg(any(test, feature = "testing"))] mod usage_arbitrary { use proptest::prelude::*; use crate::{ ComponentPath, FunctionUsageStats, StorageAPI, TableName, WithHeapSize, }; impl Arbitrary for FunctionUsageStats { type Parameters = (); type Strategy = BoxedStrategy<Self>; fn arbitrary_with((): Self::Parameters) -> Self::Strategy { let strategies = ( proptest::collection::btree_map( any::<(ComponentPath, StorageAPI)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), proptest::collection::btree_map(any::<ComponentPath>(), 0..=1024u64, 0..=4) .prop_map(WithHeapSize::from), proptest::collection::btree_map(any::<ComponentPath>(), 0..=1024u64, 0..=4) .prop_map(WithHeapSize::from), proptest::collection::btree_map( any::<(ComponentPath, TableName)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), proptest::collection::btree_map( any::<(ComponentPath, TableName)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), proptest::collection::btree_map( any::<(ComponentPath, TableName)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), proptest::collection::btree_map( any::<(ComponentPath, TableName)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), proptest::collection::btree_map( any::<(ComponentPath, TableName)>(), 0..=1024u64, 0..=4, ) .prop_map(WithHeapSize::from), ); strategies .prop_map( |( storage_calls, storage_ingress_size, storage_egress_size, database_ingress_size, database_egress_size, database_egress_rows, vector_ingress_size, vector_egress_size, )| FunctionUsageStats { storage_calls, storage_ingress_size, storage_egress_size, database_ingress_size, database_egress_size, database_egress_rows, vector_ingress_size, vector_egress_size, }, ) .boxed() } } } fn to_by_tag_count( counts: impl Iterator<Item = ((ComponentPath, String), u64)>, ) -> Vec<CounterWithTagProto> { counts .map( |((component_path, table_name), count)| CounterWithTagProto { component_path: component_path.serialize(), table_name: Some(table_name), count: Some(count), }, ) .collect() } fn to_by_component_count( counts: impl Iterator<Item = (ComponentPath, u64)>, ) -> Vec<CounterWithComponentProto> { counts .map(|(component_path, count)| CounterWithComponentProto { component_path: component_path.serialize(), count: Some(count), }) .collect() } fn from_by_tag_count( counts: Vec<CounterWithTagProto>, ) -> anyhow::Result<impl Iterator<Item = ((ComponentPath, String), u64)>> { let counts: Vec<_> = counts .into_iter() .map(|c| -> anyhow::Result<_> { let component_path = ComponentPath::deserialize(c.component_path.as_deref())?; let name = c.table_name.context("Missing `tag` field")?; let count = c.count.context("Missing `count` field")?; Ok(((component_path, name), count)) }) .try_collect()?; Ok(counts.into_iter()) } fn from_by_component_tag_count( counts: Vec<CounterWithComponentProto>, ) -> anyhow::Result<impl Iterator<Item = (ComponentPath, u64)>> { let counts: Vec<_> = counts .into_iter() .map(|c| -> anyhow::Result<_> { let component_path = ComponentPath::deserialize(c.component_path.as_deref())?; let count = c.count.context("Missing `count` field")?; Ok((component_path, count)) }) .try_collect()?; Ok(counts.into_iter()) } impl From<FunctionUsageStats> for FunctionUsageStatsProto { fn from(stats: FunctionUsageStats) -> Self { FunctionUsageStatsProto { storage_calls: to_by_tag_count(stats.storage_calls.into_iter()), storage_ingress_size_by_component: to_by_component_count( stats.storage_ingress_size.into_iter(), ), storage_egress_size_by_component: to_by_component_count( stats.storage_egress_size.into_iter(), ), database_ingress_size: to_by_tag_count(stats.database_ingress_size.into_iter()), database_egress_size: to_by_tag_count(stats.database_egress_size.into_iter()), database_egress_rows: to_by_tag_count(stats.database_egress_rows.into_iter()), vector_ingress_size: to_by_tag_count(stats.vector_ingress_size.into_iter()), vector_egress_size: to_by_tag_count(stats.vector_egress_size.into_iter()), } } } impl TryFrom<FunctionUsageStatsProto> for FunctionUsageStats { type Error = anyhow::Error; fn try_from(stats: FunctionUsageStatsProto) -> anyhow::Result<Self> { let storage_calls = from_by_tag_count(stats.storage_calls)?.collect(); let storage_ingress_size = from_by_component_tag_count(stats.storage_ingress_size_by_component)?.collect(); let storage_egress_size = from_by_component_tag_count(stats.storage_egress_size_by_component)?.collect(); let database_ingress_size = from_by_tag_count(stats.database_ingress_size)?.collect(); let database_egress_size = from_by_tag_count(stats.database_egress_size)?.collect(); let database_egress_rows = from_by_tag_count(stats.database_egress_rows)?.collect(); let vector_ingress_size = from_by_tag_count(stats.vector_ingress_size)?.collect(); let vector_egress_size = from_by_tag_count(stats.vector_egress_size)?.collect(); Ok(FunctionUsageStats { storage_calls, storage_ingress_size, storage_egress_size, database_ingress_size, database_egress_rows, database_egress_size, vector_ingress_size, vector_egress_size, }) } } /// User-facing UDF stats, that is logged in the UDF execution log /// and might be used for debugging purposes. #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct AggregatedFunctionUsageStats { pub database_read_bytes: u64, pub database_write_bytes: u64, pub database_read_documents: u64, pub storage_read_bytes: u64, pub storage_write_bytes: u64, pub vector_index_read_bytes: u64, pub vector_index_write_bytes: u64, } #[cfg(test)] mod tests { use cmd_util::env::env_config; use proptest::prelude::*; use value::testing::assert_roundtrips; use super::{ FunctionUsageStats, FunctionUsageStatsProto, }; proptest! { #![proptest_config( ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() } )] #[test] fn test_usage_stats_roundtrips(stats in any::<FunctionUsageStats>()) { assert_roundtrips::<FunctionUsageStats, FunctionUsageStatsProto>(stats); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server