Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs13.6 kB
use std::{ collections::BTreeMap, str::FromStr, sync::{ Arc, LazyLock, }, }; use anyhow::Context; use common::{ document::{ ParseDocument, ParsedDocument, }, query::{ IndexRange, IndexRangeExpression, Order, Query, }, runtime::Runtime, types::{ GenericIndexName, IndexName, StorageUuid, }, virtual_system_mapping::VirtualSystemDocMapper, }; use database::{ query::{ resolved_query_batch_next, TableFilter, }, unauthorized_error, DatabaseSnapshot, IndexModel, ResolvedQuery, SearchNotEnabled, SystemMetadataModel, TableModel, Transaction, }; use errors::ErrorMetadata; use futures::TryStreamExt; use keybroker::Identity; use maplit::btreemap; use pb::storage::{ file_storage_id::StorageIdType as FileStorageIdTypeProto, FileStorageId as FileStorageIdProto, }; use usage_tracking::FunctionUsageTracker; use value::{ id_v6::DeveloperDocumentId, ConvexValue, FieldPath, ResolvedDocumentId, TableName, TableNamespace, }; use self::virtual_table::FileStorageDocMapper; use crate::{ file_storage::types::FileStorageEntry, virtual_system_mapping, SystemIndex, SystemTable, }; pub mod types; pub mod virtual_table; pub type BatchKey = usize; pub static FILE_STORAGE_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_file_storage" .parse() .expect("invalid built-in file storage table") }); pub static FILE_STORAGE_VIRTUAL_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_storage" .parse() .expect("_storage is not a valid virtual table name") }); pub static FILE_STORAGE_INDEX_BY_ID: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_id(FILE_STORAGE_TABLE.clone())); pub static FILE_STORAGE_INDEX_BY_CREATION_TIME: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_creation_time(FILE_STORAGE_TABLE.clone())); pub static FILE_STORAGE_VIRTUAL_INDEX_BY_ID: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_id(FILE_STORAGE_VIRTUAL_TABLE.clone())); static FILE_STORAGE_VIRTUAL_INDEX_BY_CREATION_TIME: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_creation_time(FILE_STORAGE_VIRTUAL_TABLE.clone())); static FILE_STORAGE_ID_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "storageId".parse().expect("invalid storageId field")); pub static FILE_STORAGE_ID_INDEX: LazyLock<SystemIndex<FileStorageTable>> = LazyLock::new(|| SystemIndex::new("by_storage_id", [&FILE_STORAGE_ID_FIELD]).unwrap()); pub struct FileStorageTable; impl SystemTable for FileStorageTable { type Metadata = FileStorageEntry; fn table_name() -> &'static TableName { &FILE_STORAGE_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![FILE_STORAGE_ID_INDEX.clone()] } fn virtual_table() -> Option<( &'static TableName, BTreeMap<IndexName, IndexName>, Arc<dyn VirtualSystemDocMapper>, )> { Some(( &FILE_STORAGE_VIRTUAL_TABLE, btreemap! { FILE_STORAGE_VIRTUAL_INDEX_BY_CREATION_TIME.clone() => FILE_STORAGE_INDEX_BY_CREATION_TIME.clone(), FILE_STORAGE_VIRTUAL_INDEX_BY_ID.clone() => FILE_STORAGE_INDEX_BY_ID.clone(), }, Arc::new(FileStorageDocMapper), )) } } #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, derive_more::Display)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum FileStorageId { LegacyStorageId(StorageUuid), DocumentId(DeveloperDocumentId), } impl FromStr for FileStorageId { type Err = anyhow::Error; fn from_str(storage_id: &str) -> Result<Self, Self::Err> { let decoded_id = DeveloperDocumentId::decode(storage_id); match decoded_id { Ok(decoded_id) => Ok(FileStorageId::DocumentId(decoded_id)), Err(_) => Ok(FileStorageId::LegacyStorageId(storage_id.parse().context( ErrorMetadata::bad_request( "InvalidStorageId", format!( "Invalid storage ID: \"{storage_id}\". Storage ID should be an Id of '_storage' table, or a UUID string.", ), ), )?)), } } } impl TryFrom<FileStorageIdProto> for FileStorageId { type Error = anyhow::Error; fn try_from(storage_id: FileStorageIdProto) -> Result<Self, Self::Error> { let storage_id = match storage_id.storage_id_type { Some(FileStorageIdTypeProto::LegacyStorageId(storage_id)) => { FileStorageId::LegacyStorageId(storage_id.parse()?) }, Some(FileStorageIdTypeProto::DocumentId(storage_id)) => { FileStorageId::DocumentId(DeveloperDocumentId::decode(storage_id.as_str())?) }, None => anyhow::bail!("Missing `storage_id_type` field"), }; Ok(storage_id) } } impl From<FileStorageId> for FileStorageIdProto { fn from(storage_id: FileStorageId) -> Self { let storage_id_type = match storage_id { FileStorageId::LegacyStorageId(storage_id) => { FileStorageIdTypeProto::LegacyStorageId(storage_id.to_string()) }, FileStorageId::DocumentId(storage_id) => { FileStorageIdTypeProto::DocumentId(storage_id.encode()) }, }; FileStorageIdProto { storage_id_type: Some(storage_id_type), } } } pub struct FileStorageModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, namespace: TableNamespace, } impl<'a, RT: Runtime> FileStorageModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>, namespace: TableNamespace) -> Self { Self { tx, namespace } } pub async fn store_file( &mut self, entry: FileStorageEntry, ) -> anyhow::Result<ResolvedDocumentId> { // Call insert_metadata rather than insert because we already // did access check on `identity` rather than `self.identity` SystemMetadataModel::new(self.tx, self.namespace) .insert_metadata(&FILE_STORAGE_TABLE, entry.try_into()?) .await } pub async fn get_file( &mut self, storage_id: FileStorageId, ) -> anyhow::Result<Option<ParsedDocument<FileStorageEntry>>> { self.get_file_batch(btreemap! {0 => storage_id}) .await .remove(&0) .context("batch_key missing")? } pub async fn get_file_batch( &mut self, storage_ids: BTreeMap<BatchKey, FileStorageId>, ) -> BTreeMap<BatchKey, anyhow::Result<Option<ParsedDocument<FileStorageEntry>>>> { let batch_size = storage_ids.len(); let mut results = BTreeMap::new(); let mut queries = BTreeMap::new(); for (batch_key, storage_id) in storage_ids { match self.query_for_storage_id(storage_id) { Ok(query) => { queries.insert(batch_key, query); }, Err(e) => { results.insert(batch_key, Err(e)); }, } } let queries_to_fetch = queries .iter_mut() .map(|(batch_key, query)| (*batch_key, (query, Some(1)))) .collect(); for (batch_key, fetch_result) in resolved_query_batch_next(queries_to_fetch, self.tx).await { let parsed_result = match fetch_result { Err(e) => Err(e), Ok(None) => Ok(None), Ok(Some((doc, _))) => ParseDocument::parse(doc).map(Some), }; results.insert(batch_key, parsed_result); } assert_eq!(results.len(), batch_size); results } fn query_for_storage_id( &mut self, storage_id: FileStorageId, ) -> anyhow::Result<ResolvedQuery<RT>> { let index_query = match storage_id { FileStorageId::LegacyStorageId(storage_id) => Query::index_range(IndexRange { index_name: FILE_STORAGE_ID_INDEX.name(), range: vec![IndexRangeExpression::Eq( FILE_STORAGE_ID_FIELD.clone(), ConvexValue::try_from(storage_id)?.into(), )], order: Order::Asc, }), FileStorageId::DocumentId(document_id) => { let table_name = self .tx .resolve_idv6( document_id, self.namespace, TableFilter::ExcludePrivateSystemTables, ) .context(ErrorMetadata::bad_request( "InvalidArgument", format!( "Invalid storage ID. Storage ID cannot be an ID on any table other \ than '_storage'.", ), ))?; anyhow::ensure!( table_name == *FILE_STORAGE_VIRTUAL_TABLE, ErrorMetadata::bad_request( "InvalidArgument", format!( "Invalid storage ID. Storage ID cannot be an ID on any table other \ than '_storage'.", ), ) ); let table_mapping = self.tx.table_mapping().clone(); let document_id = self .tx .virtual_system_mapping() .virtual_id_v6_to_system_resolved_doc_id( self.namespace, &document_id, &table_mapping, )?; Query::get(FILE_STORAGE_TABLE.clone(), document_id.into()) }, }; ResolvedQuery::new(self.tx, self.namespace, index_query) } pub async fn delete_file( &mut self, storage_id: FileStorageId, identity: Identity, ) -> anyhow::Result<Option<FileStorageEntry>> { // We only expect this function to be called by the framework as part // of a storage syscall. We require passing in a system identity to confirm // that the caller isn't letting a user call this directly if !identity.is_system() { anyhow::bail!(unauthorized_error("delete_file")) } let Some(entry) = self.get_file(storage_id).await? else { return Ok(None); }; let document_id = entry.id(); SystemMetadataModel::new(self.tx, self.namespace) .delete(document_id) .await?; Ok(Some(entry.into_value())) } pub async fn get_total_storage_count(&mut self) -> anyhow::Result<u64> { TableModel::new(self.tx) .must_count(self.namespace, &FILE_STORAGE_TABLE.clone()) .await } } #[fastrace::trace] pub async fn get_total_file_storage_size<RT: Runtime>( identity: &Identity, db: &DatabaseSnapshot<RT>, ) -> anyhow::Result<u64> { let tablet_id_to_by_id_index = { let mut tx = db.begin_tx( identity.clone(), Arc::new(SearchNotEnabled), FunctionUsageTracker::new(), virtual_system_mapping().clone(), )?; let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; let table_mapping = tx.table_mapping(); let tablet_id_to_by_id_index: BTreeMap<_, _> = table_mapping .iter() .filter(|(tablet_id, _, _, table_name)| { **table_name == *FILE_STORAGE_TABLE && table_mapping.is_active(*tablet_id) }) .map(|(tablet_id, ..)| { anyhow::Ok(( tablet_id, *by_id_indexes .get(&tablet_id) .context("_file_storage by_id index not found")?, )) }) .try_collect()?; tablet_id_to_by_id_index }; let mut table_iterator = db .table_iterator() .multi(tablet_id_to_by_id_index.keys().copied().collect()); let mut total_size = 0; for (tablet_id, by_id_index) in tablet_id_to_by_id_index { let mut table_stream = Box::pin(table_iterator.stream_documents_in_table(tablet_id, by_id_index, None)); while let Some(storage_document) = table_stream.try_next().await? { let storage_entry: ParsedDocument<FileStorageEntry> = storage_document.value.parse()?; total_size += storage_entry.size as u64; } drop(table_stream); table_iterator.unregister_table(tablet_id)?; } Ok(total_size) } #[cfg(test)] mod tests { use cmd_util::env::env_config; use pb::storage::FileStorageId as FileStorageIdProto; use proptest::prelude::*; use sync_types::testing::assert_roundtrips; use super::FileStorageId; proptest! { #![proptest_config( ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() } )] #[test] fn test_function_result_proto_roundtrips(left in any::<FileStorageId>()) { assert_roundtrips::<FileStorageId, FileStorageIdProto>(left); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server