Skip to main content
Glama

Convex MCP server

Official
by get-convex
export_storage.rs7.4 kB
use std::collections::BTreeMap; use anyhow::Context; use bytes::Bytes; use common::{ components::ComponentPath, document::ParseDocument, knobs::{ EXPORT_MAX_INFLIGHT_PREFETCH_BYTES, EXPORT_STORAGE_GET_CONCURRENCY, }, persistence::LatestDocument, runtime::Runtime, types::{ IndexId, TableName, }, }; use database::MultiTableIterator; use fastrace::{ future::FutureExt, Span, }; use futures::{ pin_mut, stream, StreamExt, TryStreamExt, }; use mime2ext::mime2ext; use model::{ exports::types::ExportRequestor, file_storage::{ types::FileStorageEntry, FILE_STORAGE_TABLE, FILE_STORAGE_VIRTUAL_TABLE, }, }; use serde::{ Deserialize, Serialize, }; use serde_json::json; use storage::StorageExt; use tokio_util::io::StreamReader; use usage_tracking::{ FunctionUsageTracker, StorageCallTracker, StorageUsageTracker, }; use value::{ TableNamespace, TabletId, }; use crate::{ zip_uploader::ZipSnapshotUpload, ExportComponents, }; pub(crate) async fn write_storage_table<'a, 'b: 'a, RT: Runtime>( components: &ExportComponents<RT>, path_prefix: &str, zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>, namespace: TableNamespace, component_path: &ComponentPath, table_iterator: &mut MultiTableIterator<RT>, by_id_indexes: &BTreeMap<TabletId, IndexId>, system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>, usage: &FunctionUsageTracker, requestor: ExportRequestor, ) -> anyhow::Result<()> { // _storage let tablet_id = system_tables .get(&(namespace, FILE_STORAGE_TABLE.clone())) .context("_file_storage does not exist")?; let by_id = by_id_indexes .get(tablet_id) .context("_file_storage.by_id does not exist")?; // First write metadata to _storage/documents.jsonl let mut table_upload = zip_snapshot_upload .start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) .await?; { let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); pin_mut!(stream); while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? { let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?; let virtual_storage_id = file_storage_entry.id().developer_id; let creation_time = f64::from(file_storage_entry.creation_time()); table_upload .write_json_line(json!(FileStorageZipMetadata { id: virtual_storage_id.encode(), creation_time: Some(creation_time), sha256: Some(file_storage_entry.sha256.as_base64()), size: Some(file_storage_entry.size), content_type: file_storage_entry.content_type.clone(), internal_id: Some(file_storage_entry.storage_id.to_string()), })) .await?; } } table_upload.complete().await?; let max_prefetch_bytes = *EXPORT_MAX_INFLIGHT_PREFETCH_BYTES; let inflight_bytes_semaphore = tokio::sync::Semaphore::new(max_prefetch_bytes); let files_stream = table_iterator .stream_documents_in_table(*tablet_id, *by_id, None) .map_ok(|LatestDocument { value: doc, .. }| async { let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?; let virtual_storage_id = file_storage_entry.id().developer_id; // Add an extension, which isn't necessary for anything and might be incorrect, // but allows the file to be viewed at a glance in most cases. let extension_guess = file_storage_entry .content_type .as_ref() .and_then(mime2ext) .map(|extension| format!(".{extension}")) .unwrap_or_default(); let path = format!( "{path_prefix}{}/{}{extension_guess}", *FILE_STORAGE_VIRTUAL_TABLE, virtual_storage_id.encode() ); let file_stream = components .file_storage .get(&file_storage_entry.storage_key) .await? .with_context(|| { format!( "file missing from storage: {} with key {:?}", file_storage_entry.developer_id().encode(), file_storage_entry.storage_key, ) })?; let content_type = file_storage_entry .content_type .as_ref() .map(|ct| ct.parse()) .transpose()?; usage .track_storage_call( component_path.clone(), requestor.usage_tag(), file_storage_entry.storage_id.clone(), content_type, file_storage_entry.sha256.clone(), ) .await; usage .track_storage_egress_size( component_path.clone(), requestor.usage_tag().to_string(), file_stream.content_length as u64, ) .await; if (file_stream.content_length as usize) < max_prefetch_bytes { let permit = inflight_bytes_semaphore .acquire_many(file_stream.content_length as u32) .await?; // Prefetch the file before passing it to the zip writer. // This can happen in parallel with other files. let bytes: Vec<Bytes> = file_stream .stream .try_collect() .in_span(Span::enter_with_local_parent("prefetch_storage_file")) .await?; let stream = StreamReader::new(stream::iter(bytes.into_iter().map(Ok)).boxed()); Ok((path, stream, permit)) } else { // Wait until all other ongoing prefetches are finished, then stream this file // serially. let permit = inflight_bytes_semaphore .acquire_many(max_prefetch_bytes as u32) .await?; // Note that fetching won't start until the reader is first polled (which won't // happen until it's passed to `stream_full_file`). Ok((path, file_stream.into_tokio_reader(), permit)) } }) .try_buffer_unordered(*EXPORT_STORAGE_GET_CONCURRENCY); // Note that this will return entries in an arbitrary order pin_mut!(files_stream); while let Some((path, file_stream, permit)) = files_stream.try_next().await? { zip_snapshot_upload .stream_full_file(path, file_stream) .await?; drop(permit); } Ok(()) } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct FileStorageZipMetadata { #[serde(rename = "_id")] pub id: String, #[serde(rename = "_creationTime")] pub creation_time: Option<f64>, pub sha256: Option<String>, pub size: Option<i64>, pub content_type: Option<String>, pub internal_id: Option<String>, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server