export_storage.rs•7.4 kB
use std::collections::BTreeMap;
use anyhow::Context;
use bytes::Bytes;
use common::{
components::ComponentPath,
document::ParseDocument,
knobs::{
EXPORT_MAX_INFLIGHT_PREFETCH_BYTES,
EXPORT_STORAGE_GET_CONCURRENCY,
},
persistence::LatestDocument,
runtime::Runtime,
types::{
IndexId,
TableName,
},
};
use database::MultiTableIterator;
use fastrace::{
future::FutureExt,
Span,
};
use futures::{
pin_mut,
stream,
StreamExt,
TryStreamExt,
};
use mime2ext::mime2ext;
use model::{
exports::types::ExportRequestor,
file_storage::{
types::FileStorageEntry,
FILE_STORAGE_TABLE,
FILE_STORAGE_VIRTUAL_TABLE,
},
};
use serde::{
Deserialize,
Serialize,
};
use serde_json::json;
use storage::StorageExt;
use tokio_util::io::StreamReader;
use usage_tracking::{
FunctionUsageTracker,
StorageCallTracker,
StorageUsageTracker,
};
use value::{
TableNamespace,
TabletId,
};
use crate::{
zip_uploader::ZipSnapshotUpload,
ExportComponents,
};
pub(crate) async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
components: &ExportComponents<RT>,
path_prefix: &str,
zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>,
namespace: TableNamespace,
component_path: &ComponentPath,
table_iterator: &mut MultiTableIterator<RT>,
by_id_indexes: &BTreeMap<TabletId, IndexId>,
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
usage: &FunctionUsageTracker,
requestor: ExportRequestor,
) -> anyhow::Result<()> {
// _storage
let tablet_id = system_tables
.get(&(namespace, FILE_STORAGE_TABLE.clone()))
.context("_file_storage does not exist")?;
let by_id = by_id_indexes
.get(tablet_id)
.context("_file_storage.by_id does not exist")?;
// First write metadata to _storage/documents.jsonl
let mut table_upload = zip_snapshot_upload
.start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone())
.await?;
{
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
pin_mut!(stream);
while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? {
let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?;
let virtual_storage_id = file_storage_entry.id().developer_id;
let creation_time = f64::from(file_storage_entry.creation_time());
table_upload
.write_json_line(json!(FileStorageZipMetadata {
id: virtual_storage_id.encode(),
creation_time: Some(creation_time),
sha256: Some(file_storage_entry.sha256.as_base64()),
size: Some(file_storage_entry.size),
content_type: file_storage_entry.content_type.clone(),
internal_id: Some(file_storage_entry.storage_id.to_string()),
}))
.await?;
}
}
table_upload.complete().await?;
let max_prefetch_bytes = *EXPORT_MAX_INFLIGHT_PREFETCH_BYTES;
let inflight_bytes_semaphore = tokio::sync::Semaphore::new(max_prefetch_bytes);
let files_stream = table_iterator
.stream_documents_in_table(*tablet_id, *by_id, None)
.map_ok(|LatestDocument { value: doc, .. }| async {
let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?;
let virtual_storage_id = file_storage_entry.id().developer_id;
// Add an extension, which isn't necessary for anything and might be incorrect,
// but allows the file to be viewed at a glance in most cases.
let extension_guess = file_storage_entry
.content_type
.as_ref()
.and_then(mime2ext)
.map(|extension| format!(".{extension}"))
.unwrap_or_default();
let path = format!(
"{path_prefix}{}/{}{extension_guess}",
*FILE_STORAGE_VIRTUAL_TABLE,
virtual_storage_id.encode()
);
let file_stream = components
.file_storage
.get(&file_storage_entry.storage_key)
.await?
.with_context(|| {
format!(
"file missing from storage: {} with key {:?}",
file_storage_entry.developer_id().encode(),
file_storage_entry.storage_key,
)
})?;
let content_type = file_storage_entry
.content_type
.as_ref()
.map(|ct| ct.parse())
.transpose()?;
usage
.track_storage_call(
component_path.clone(),
requestor.usage_tag(),
file_storage_entry.storage_id.clone(),
content_type,
file_storage_entry.sha256.clone(),
)
.await;
usage
.track_storage_egress_size(
component_path.clone(),
requestor.usage_tag().to_string(),
file_stream.content_length as u64,
)
.await;
if (file_stream.content_length as usize) < max_prefetch_bytes {
let permit = inflight_bytes_semaphore
.acquire_many(file_stream.content_length as u32)
.await?;
// Prefetch the file before passing it to the zip writer.
// This can happen in parallel with other files.
let bytes: Vec<Bytes> = file_stream
.stream
.try_collect()
.in_span(Span::enter_with_local_parent("prefetch_storage_file"))
.await?;
let stream = StreamReader::new(stream::iter(bytes.into_iter().map(Ok)).boxed());
Ok((path, stream, permit))
} else {
// Wait until all other ongoing prefetches are finished, then stream this file
// serially.
let permit = inflight_bytes_semaphore
.acquire_many(max_prefetch_bytes as u32)
.await?;
// Note that fetching won't start until the reader is first polled (which won't
// happen until it's passed to `stream_full_file`).
Ok((path, file_stream.into_tokio_reader(), permit))
}
})
.try_buffer_unordered(*EXPORT_STORAGE_GET_CONCURRENCY); // Note that this will return entries in an arbitrary order
pin_mut!(files_stream);
while let Some((path, file_stream, permit)) = files_stream.try_next().await? {
zip_snapshot_upload
.stream_full_file(path, file_stream)
.await?;
drop(permit);
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FileStorageZipMetadata {
#[serde(rename = "_id")]
pub id: String,
#[serde(rename = "_creationTime")]
pub creation_time: Option<f64>,
pub sha256: Option<String>,
pub size: Option<i64>,
pub content_type: Option<String>,
pub internal_id: Option<String>,
}