lib.rs•47.3 kB
#![feature(try_blocks)]
#![feature(coroutines)]
use std::{
cmp,
env,
fmt::{
Debug,
Display,
},
fs::{
self,
File,
OpenOptions,
},
io::{
self,
Read,
Seek,
SeekFrom,
Write,
},
mem,
ops::Range,
path::{
Path,
PathBuf,
},
pin::Pin,
sync::Arc,
task::{
Context,
Poll,
},
time::Duration,
};
use anyhow::Context as _;
use async_trait::async_trait;
use bytes::Bytes;
use common::{
errors::report_error,
runtime::Runtime,
types::{
FullyQualifiedObjectKey,
ObjectKey,
},
};
use futures::{
future::BoxFuture,
io::{
Error as IoError,
ErrorKind as IoErrorKind,
},
pin_mut,
ready,
select_biased,
stream::{
self,
BoxStream,
IntoAsyncRead,
},
FutureExt,
Stream,
StreamExt,
TryStreamExt,
};
use futures_async_stream::try_stream;
use serde_json::{
json,
Value as JsonValue,
};
use tempfile::TempDir;
use tokio::{
io::AsyncWrite,
sync::mpsc,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{
io::StreamReader,
sync::PollSender,
};
use url::Url;
use value::sha256::{
Sha256,
Sha256Digest,
};
pub const LOCAL_DIR_MIN_PART_SIZE: usize = 5 * (1 << 20);
pub const LOCAL_DIR_MAX_PART_SIZE: usize = 8 * (1 << 30);
pub const MAX_NUM_PARTS: usize = 10000;
pub const MAXIMUM_PARALLEL_UPLOADS: usize = 8;
#[derive(Clone, Eq, PartialEq, PartialOrd, Ord, derive_more::Display)]
pub struct UploadId(String);
impl From<String> for UploadId {
fn from(s: String) -> Self {
Self(s)
}
}
pub struct StorageGetStream {
pub content_length: i64,
pub stream: BoxStream<'static, io::Result<Bytes>>,
}
impl StorageGetStream {
#[cfg(any(test, feature = "testing"))]
pub async fn collect_as_bytes(self) -> anyhow::Result<Bytes> {
use http_body_util::BodyExt;
let Self {
content_length,
stream,
} = self;
let content = BodyExt::collect(axum::body::Body::from_stream(stream))
.await?
.to_bytes();
anyhow::ensure!(
(content_length as usize) == content.len(),
"ContentLength mismatch"
);
Ok(content)
}
pub fn into_reader(self) -> IntoAsyncRead<BoxStream<'static, io::Result<Bytes>>> {
self.stream.into_async_read()
}
pub fn into_tokio_reader(self) -> StreamReader<BoxStream<'static, io::Result<Bytes>>, Bytes> {
StreamReader::new(self.stream)
}
}
#[derive(Clone, Hash, PartialEq, Eq, Debug, derive_more::Constructor, derive_more::Into)]
pub struct StorageCacheKey(String);
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ClientDrivenUploadToken(pub String);
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ClientDrivenUploadPartToken(pub String);
pub const DOWNLOAD_CHUNK_SIZE: u64 = 8 * (1 << 20);
pub const MAX_CONCURRENT_CHUNK_DOWNLOADS: usize = 16;
#[async_trait]
pub trait Storage: Send + Sync + Debug {
/// Start a new upload for generated data where no authoritative hash is
/// present to be verified and no hash is desired.
///
/// Storage may choose to implement some checksuming strategy for uploads,
/// but it's opaque to callers.
async fn start_upload(&self) -> anyhow::Result<Box<BufferedUpload>>;
/// A multi-part upload where the client uploads parts one at a time.
async fn start_client_driven_upload(&self) -> anyhow::Result<ClientDrivenUploadToken>;
async fn upload_part(
&self,
token: ClientDrivenUploadToken,
part_number: u16,
part: Bytes,
) -> anyhow::Result<ClientDrivenUploadPartToken>;
async fn finish_client_driven_upload(
&self,
token: ClientDrivenUploadToken,
part_tokens: Vec<ClientDrivenUploadPartToken>,
) -> anyhow::Result<ObjectKey>;
/// Gets a URL that can be used to fetch an object.
async fn signed_url(&self, key: ObjectKey, expires_in: Duration) -> anyhow::Result<String>;
/// Creates a presigned url for uploading an object.
async fn presigned_upload_url(
&self,
expires_in: Duration,
) -> anyhow::Result<(ObjectKey, String)>;
async fn get_object_attributes(
&self,
key: &ObjectKey,
) -> anyhow::Result<Option<ObjectAttributes>> {
self.get_fq_object_attributes(&self.fully_qualified_key(key))
.await
}
// TODO: FullyQualifiedObjectKey is an abstraction violation as it possibly
// reads from another `Storage` instance; get rid of it
async fn get_fq_object_attributes(
&self,
key: &FullyQualifiedObjectKey,
) -> anyhow::Result<Option<ObjectAttributes>>;
/// Not intended to be called directly.
/// Use get_range() or get() instead.
fn get_small_range(
&self,
key: &FullyQualifiedObjectKey,
bytes_range: Range<u64>,
) -> BoxFuture<'static, anyhow::Result<StorageGetStream>>;
fn storage_type_proto(&self) -> pb::searchlight::StorageType;
/// Return a cache key suitable for the given ObjectKey, even in
/// a multi-tenant cache.
fn cache_key(&self, key: &ObjectKey) -> StorageCacheKey;
/// Return a fully qualified key, including info on bucket name
/// and suitable for access in multi-tenant scenario
fn fully_qualified_key(&self, key: &ObjectKey) -> FullyQualifiedObjectKey;
fn test_only_decompose_fully_qualified_key(
&self,
key: FullyQualifiedObjectKey,
) -> anyhow::Result<ObjectKey>;
/// Delete the given object.
async fn delete_object(&self, key: &ObjectKey) -> anyhow::Result<()>;
}
pub struct ObjectAttributes {
pub size: u64,
}
pub struct SizeAndHash {
sha256: Sha256,
size: usize,
err: Option<anyhow::Error>,
}
impl SizeAndHash {
fn new() -> Self {
Self {
sha256: Sha256::new(),
size: 0,
err: None,
}
}
fn update(&mut self, result: &anyhow::Result<Bytes>) {
if self.err.is_some() {
return;
}
match result {
Ok(bytes) => {
self.sha256.update(bytes);
self.size += bytes.len();
},
Err(e) => {
self.err = Some(anyhow::anyhow!(
"At least one byte value was a failure {e:?}"
))
},
}
}
fn finish(self) -> anyhow::Result<(usize, Sha256Digest)> {
if let Some(err) = self.err {
anyhow::bail!(err);
}
Ok((self.size, self.sha256.finalize()))
}
}
#[async_trait]
pub trait Upload: Send + Sync {
/// Writes data to specified object. The `data` argument must be at most
/// 5GB, and if it isn't the final chunk of the file, it must be at
/// least 5MB. There may be at most 10000 writes in a single upload.
/// Must be followed by `complete` to complete the object.
async fn write(&mut self, data: Bytes) -> anyhow::Result<()>;
/// Writes data to the specified object in parts where multiple parts may be
/// uploaded in parallel. The stream must contain the bytes in the same
/// order as they are present in the file.
/// Must be follwed by 'complete' to complete the object.
///
/// See UploadExt for some easier to use variants or if you need to verify
/// a checksum.
async fn try_write_parallel<'a>(
&'a mut self,
stream: &mut Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send + 'a>>,
) -> anyhow::Result<()>;
/// Abort the multipart upload. Must call either abort or complete to avoid
/// being charged for incomplete objects.
async fn abort(self: Box<Self>) -> anyhow::Result<()>;
/// Completes the multipart object.
async fn complete(self: Box<Self>) -> anyhow::Result<ObjectKey>;
}
/// Helper functions for working with uploads for functions that have generic
/// types that would otherwise prevent Upload from being "object safe":
/// https://doc.rust-lang.org/reference/items/traits.html#object-safety.
#[async_trait]
pub trait UploadExt {
/// Similar to try_write_parallel, but without support for per item Results
async fn write_parallel(
&mut self,
stream: impl Stream<Item = Bytes> + Send + 'static,
) -> anyhow::Result<()>;
/// Calculates a linear sha256 digest from the stream, then uploads the
/// parts in parallel.
async fn try_write_parallel_and_hash(
&mut self,
stream: impl Stream<Item = anyhow::Result<Bytes>> + Send,
) -> anyhow::Result<(usize, Sha256Digest)>;
}
#[async_trait]
impl<T: Upload> UploadExt for T {
async fn write_parallel(
&mut self,
stream: impl Stream<Item = Bytes> + Send,
) -> anyhow::Result<()> {
let mut boxed = stream.map(Ok).boxed();
self.try_write_parallel(&mut boxed).await
}
async fn try_write_parallel_and_hash(
&mut self,
stream: impl Stream<Item = anyhow::Result<Bytes>> + Send,
) -> anyhow::Result<(usize, Sha256Digest)> {
let mut size_and_hash = SizeAndHash::new();
let mut boxed = stream
.map(|value| {
size_and_hash.update(&value);
value
})
.boxed();
self.try_write_parallel(&mut boxed).await?;
drop(boxed);
size_and_hash.finish()
}
}
#[must_use]
pub struct BufferedUpload {
upload: Box<dyn Upload>,
buffer: Vec<u8>,
max_intermediate_part_size: usize,
target_intermediate_part_size: usize,
}
impl BufferedUpload {
pub fn new(
upload: impl Upload + 'static,
min_intermediate_part_size: usize,
max_intermediate_part_size: usize,
) -> Self {
let buffer = Vec::with_capacity(min_intermediate_part_size);
Self {
upload: Box::new(upload),
buffer,
max_intermediate_part_size,
target_intermediate_part_size: min_intermediate_part_size,
}
}
fn update_buffer_and_get_next(&mut self, data: Bytes) -> Option<Bytes> {
Self::_update_buffer_and_get_next(
&mut self.buffer,
&mut self.target_intermediate_part_size,
self.max_intermediate_part_size,
data,
)
}
// Hack around wanting to use this when `upload` is borrowed. Rust can't
// otherwise tell that we're using different and not previously borrowed
// parts of self.
fn _update_buffer_and_get_next(
buffer: &mut Vec<u8>,
target_intermediate_part_size: &mut usize,
max_intermediate_part_size: usize,
data: Bytes,
) -> Option<Bytes> {
// Fast path, ship the buffer without copying.
if buffer.is_empty()
&& data.len() >= *target_intermediate_part_size
&& data.len() <= max_intermediate_part_size
{
*target_intermediate_part_size = cmp::min(
*target_intermediate_part_size * 2,
max_intermediate_part_size,
);
return Some(data);
}
buffer.extend_from_slice(&data);
if buffer.len() >= *target_intermediate_part_size {
*target_intermediate_part_size = cmp::min(
*target_intermediate_part_size * 2,
max_intermediate_part_size,
);
let ready = if buffer.len() > max_intermediate_part_size {
let remainder = buffer.split_off(max_intermediate_part_size);
mem::replace(buffer, remainder)
} else {
mem::replace(buffer, Vec::with_capacity(*target_intermediate_part_size))
};
Some(ready.into())
} else {
None
}
}
}
#[async_trait]
impl Upload for BufferedUpload {
async fn write(&mut self, data: Bytes) -> anyhow::Result<()> {
if let Some(buf) = self.update_buffer_and_get_next(data) {
self.upload.write(buf).await?;
}
Ok(())
}
async fn try_write_parallel<'a>(
&'a mut self,
stream: &mut Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send + 'a>>,
) -> anyhow::Result<()> {
// Try to keep some buffered data ready in the channel, but not too much.
let (tx, rx) = mpsc::channel(MAXIMUM_PARALLEL_UPLOADS / 2);
let mut boxed_rx = ReceiverStream::new(rx).boxed();
let mut upload = self.upload.try_write_parallel(&mut boxed_rx).fuse();
let buffer_bytes = async {
let result: anyhow::Result<()> = try {
while let Some(result) = stream.next().await {
match result {
Err(e) => tx.send(Err(e)).await?,
Ok(buf) => {
if let Some(buf) = Self::_update_buffer_and_get_next(
&mut self.buffer,
&mut self.target_intermediate_part_size,
self.max_intermediate_part_size,
buf,
) {
tx.send(Ok(buf)).await?;
}
},
}
}
};
drop(tx);
result
}
.fuse();
pin_mut!(buffer_bytes);
// We do loop, clippy is confused by select_biased!
#[allow(clippy::never_loop)]
loop {
select_biased! {
upload_result = upload => {
return upload_result;
}
bytes_result = buffer_bytes => {
bytes_result?;
}
}
}
}
async fn abort(mut self: Box<Self>) -> anyhow::Result<()> {
self.upload.abort().await
}
async fn complete(mut self: Box<Self>) -> anyhow::Result<ObjectKey> {
let Self {
buffer: ready,
mut upload,
..
} = *self;
upload.write(ready.into()).await?;
upload.complete().await
}
}
pub struct ChannelWriter {
parts: PollSender<Bytes>,
current_part: Vec<u8>,
part_size: usize,
}
impl ChannelWriter {
pub fn new(sender: mpsc::Sender<Bytes>, part_size: usize) -> Self {
Self {
parts: PollSender::new(sender),
current_part: Vec::with_capacity(part_size),
part_size,
}
}
}
impl AsyncWrite for ChannelWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, IoError>> {
let self_ = self.get_mut();
loop {
if self_.current_part.len() < self_.part_size {
let n = cmp::min(buf.len(), self_.part_size - self_.current_part.len());
self_.current_part.extend_from_slice(&buf[..n]);
return Poll::Ready(Ok(n));
}
ready!(self_
.parts
.poll_reserve(cx)
.map_err(|_| IoError::new(IoErrorKind::BrokenPipe, "Channel closed")))?;
let next_buf = Vec::with_capacity(self_.part_size);
let buf = mem::replace(&mut self_.current_part, next_buf);
self_
.parts
.send_item(buf.into())
.map_err(|_| IoError::new(IoErrorKind::BrokenPipe, "Channel closed"))?;
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
// We want to control the part size, so don't do anything on flush.
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
let self_ = self.get_mut();
if !self_.current_part.is_empty() {
ready!(self_
.parts
.poll_reserve(cx)
.map_err(|_| IoError::new(IoErrorKind::BrokenPipe, "Channel closed")))?;
let next_buf = vec![];
let buf = mem::replace(&mut self_.current_part, next_buf);
self_
.parts
.send_item(buf.into())
.map_err(|_| IoError::new(IoErrorKind::BrokenPipe, "Channel closed"))?;
}
Poll::Ready(Ok(()))
}
}
#[async_trait]
pub trait StorageExt {
/// Gets a previously stored object.
async fn get(&self, key: &ObjectKey) -> anyhow::Result<Option<StorageGetStream>>;
/// Gets a stream for a range of a previously stored object.
async fn get_range(
&self,
key: &ObjectKey,
bytes_range: (std::ops::Bound<u64>, std::ops::Bound<u64>),
) -> anyhow::Result<Option<StorageGetStream>>;
// Equivalents for the above taking fully qualified objects
// TODO: FullyQualifiedObjectKey is an abstraction violation as it possibly
// reads from another `Storage` instance; get rid of it
async fn get_fq_object(
&self,
object_key: &FullyQualifiedObjectKey,
) -> anyhow::Result<Option<StorageGetStream>>;
async fn get_fq_object_range(
&self,
key: &FullyQualifiedObjectKey,
bytes_range: (std::ops::Bound<u64>, std::ops::Bound<u64>),
) -> anyhow::Result<Option<StorageGetStream>>;
/// Requires that `byte_range` be in-bounds for the object
fn get_fq_object_exact_range(
&self,
key: &FullyQualifiedObjectKey,
byte_range: Range<u64>,
) -> StorageGetStream;
// Implementation detail
async fn get_small_range_with_retries(
&self,
key: &FullyQualifiedObjectKey,
small_byte_range: Range<u64>,
) -> anyhow::Result<StorageGetStream>;
}
#[async_trait]
impl StorageExt for Arc<dyn Storage> {
async fn get(&self, key: &ObjectKey) -> anyhow::Result<Option<StorageGetStream>> {
self.get_range(
key,
(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded),
)
.await
}
async fn get_range(
&self,
key: &ObjectKey,
bytes_range: (std::ops::Bound<u64>, std::ops::Bound<u64>),
) -> anyhow::Result<Option<StorageGetStream>> {
self.get_fq_object_range(&self.fully_qualified_key(key), bytes_range)
.await
}
async fn get_fq_object(
&self,
key: &FullyQualifiedObjectKey,
) -> anyhow::Result<Option<StorageGetStream>> {
self.get_fq_object_range(
key,
(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded),
)
.await
}
async fn get_fq_object_range(
&self,
key: &FullyQualifiedObjectKey,
bytes_range: (std::ops::Bound<u64>, std::ops::Bound<u64>),
) -> anyhow::Result<Option<StorageGetStream>> {
let Some(attributes) = self.get_fq_object_attributes(key).await? else {
return Ok(None);
};
let start_byte = cmp::min(
match bytes_range.0 {
std::ops::Bound::Included(bound) => bound,
std::ops::Bound::Excluded(bound) => bound + 1,
std::ops::Bound::Unbounded => 0,
},
attributes.size,
);
let end_byte_bound = cmp::min(
match bytes_range.1 {
std::ops::Bound::Included(bound) => bound + 1,
std::ops::Bound::Excluded(bound) => bound,
std::ops::Bound::Unbounded => attributes.size,
},
attributes.size,
);
Ok(Some(self.get_fq_object_exact_range(
key,
start_byte..end_byte_bound,
)))
}
fn get_fq_object_exact_range(
&self,
key: &FullyQualifiedObjectKey,
Range {
start: start_byte,
end: end_byte_bound,
}: Range<u64>,
) -> StorageGetStream {
let num_chunks = 1 + (end_byte_bound - start_byte) / DOWNLOAD_CHUNK_SIZE;
// A list of futures, each of which resolves to a stream.
let mut chunk_futures = vec![];
for idx in 0..num_chunks {
let chunk_start = start_byte + DOWNLOAD_CHUNK_SIZE * idx;
let chunk_end = if idx == num_chunks - 1 {
end_byte_bound
} else {
start_byte + DOWNLOAD_CHUNK_SIZE * (idx + 1)
};
let self_ = self.clone();
let key_ = key.clone();
let stream_fut = async move {
self_
.get_small_range_with_retries(&key_, chunk_start..chunk_end)
.await
.map_err(
// Mapping everything to `io::ErrorKind::Other` feels bad, but it's what
// the AWS library does internally.
io::Error::other,
)
.map(|storage_get_stream| storage_get_stream.stream)
};
chunk_futures.push(stream_fut);
}
// Convert the list of futures into a stream, where each item is the resolved
// output of the future (i.e. a `ByteStream`)
let byte_stream = futures::stream::iter(chunk_futures)
// Limit the concurrency of the chunk downloads
.buffered(MAX_CONCURRENT_CHUNK_DOWNLOADS)
// Flatten the `Stream<Item = io::Result<Stream<Item = io::Result<Bytes>>>>` into a single `Stream<Item = io::Result<Bytes>>`
.try_flatten();
StorageGetStream {
content_length: (end_byte_bound - start_byte) as i64,
stream: Box::pin(byte_stream),
}
}
#[fastrace::trace]
async fn get_small_range_with_retries(
&self,
key: &FullyQualifiedObjectKey,
small_byte_range: Range<u64>,
) -> anyhow::Result<StorageGetStream> {
let output = self.get_small_range(key, small_byte_range.clone()).await?;
let content_length = output.content_length;
let initial_stream = output.stream;
Ok(StorageGetStream {
content_length,
stream: Box::pin(stream_object_with_retries(
initial_stream,
self.clone(),
key.clone(),
small_byte_range,
STORAGE_GET_RETRIES,
)),
})
}
}
const STORAGE_GET_RETRIES: usize = 5;
#[allow(clippy::blocks_in_conditions)]
#[try_stream(ok = Bytes, error = io::Error)]
async fn stream_object_with_retries(
mut stream: BoxStream<'static, io::Result<Bytes>>,
storage: Arc<dyn Storage>,
key: FullyQualifiedObjectKey,
small_byte_range: Range<u64>,
mut retries_remaining: usize,
) {
let mut bytes_yielded = 0;
loop {
match stream.try_next().await {
Ok(Some(chunk)) => {
bytes_yielded += chunk.len();
yield chunk;
if small_byte_range.start + bytes_yielded as u64 >= small_byte_range.end {
// In case there's a later error, we don't want to retry and fetch zero bytes,
// so just end here.
return Ok(());
}
},
Ok(None) => return Ok(()),
Err(e) if retries_remaining == 0 => {
return Err(e);
},
Err(e) => {
let mut toreport = anyhow::anyhow!(e).context(format!(
"failed while reading stream for {key:?}. {retries_remaining} attempts \
remaining"
));
report_error(&mut toreport).await;
let new_range =
(small_byte_range.start + bytes_yielded as u64)..small_byte_range.end;
let output = storage
.get_small_range(&key, new_range)
.await
.map_err(io::Error::other)?;
stream = output.stream;
retries_remaining -= 1;
},
}
}
}
#[cfg(test)]
mod tests {
use crate::ObjectKey;
#[tokio::test]
async fn test_object_key() -> anyhow::Result<()> {
assert_eq!(
&String::from(ObjectKey::try_from(
"folder/name-to_test/9.json".to_owned()
)?),
"folder/name-to_test/9.json",
);
assert!(ObjectKey::try_from("folder>name".to_owned()).is_err());
Ok(())
}
}
#[derive(Clone)]
pub struct LocalDirStorage<RT: Runtime> {
rt: RT,
dir: PathBuf,
_temp_dir: Option<Arc<TempDir>>,
}
impl<RT: Runtime> std::fmt::Debug for LocalDirStorage<RT> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalDirStorage")
.field("dir", &self.dir)
.finish()
}
}
impl<RT: Runtime> LocalDirStorage<RT> {
// Creates local storage using a temporary directory. The directory
// is deleted when the object is dropped.
pub fn new(rt: RT) -> anyhow::Result<Self> {
let temp_dir = TempDir::new()?;
let storage = Self {
rt,
dir: temp_dir.path().to_owned(),
_temp_dir: Some(Arc::new(temp_dir)),
};
Ok(storage)
}
/// Create storage at the provided directory
pub fn new_at_path(rt: RT, dir: PathBuf) -> anyhow::Result<Self> {
let dir = if dir.is_absolute() {
dir
} else {
env::current_dir()?.join(dir)
};
fs::create_dir_all(&dir)?;
let storage = Self {
rt,
dir,
_temp_dir: None,
};
Ok(storage)
}
/// Returns the path to the storage directory.
pub fn path(&self) -> &PathBuf {
&self.dir
}
fn filename_for_key(&self, key: ObjectKey) -> String {
String::from(key) + ".blob"
}
pub fn for_use_case(rt: RT, dir: &str, use_case: StorageUseCase) -> anyhow::Result<Self> {
let use_case_str = use_case.to_string();
anyhow::ensure!(!dir.is_empty());
let storage = LocalDirStorage::new_at_path(rt, PathBuf::from(dir).join(use_case_str))?;
Ok(storage)
}
}
struct ClientDrivenUpload {
object_key: ObjectKey,
filepath: PathBuf,
}
impl TryFrom<ClientDrivenUpload> for ClientDrivenUploadToken {
type Error = anyhow::Error;
fn try_from(value: ClientDrivenUpload) -> Result<Self, Self::Error> {
let v = json!({
"objectKey": value.object_key.to_string(),
"filepath": value.filepath.to_str(),
});
Ok(ClientDrivenUploadToken(serde_json::to_string(&v)?))
}
}
impl TryFrom<ClientDrivenUploadToken> for ClientDrivenUpload {
type Error = anyhow::Error;
fn try_from(value: ClientDrivenUploadToken) -> Result<Self, Self::Error> {
let v: JsonValue = serde_json::from_str(&value.0)?;
let object_key = v
.get("objectKey")
.context("missing objectKey")?
.as_str()
.context("objectKey should be str")?
.try_into()?;
let filepath = v
.get("filepath")
.context("missing filepath")?
.as_str()
.context("filepath should be str")?
.parse()?;
Ok(Self {
object_key,
filepath,
})
}
}
#[async_trait]
impl<RT: Runtime> Storage for LocalDirStorage<RT> {
async fn start_upload(&self) -> anyhow::Result<Box<BufferedUpload>> {
let object_key: ObjectKey = self.rt.new_uuid_v4().to_string().try_into()?;
let key = self.filename_for_key(object_key.clone());
let filepath = self.dir.join(key);
// The filename constraints on the local file system are a bit stricter than S3,
// so these might fail. If it fails, let's think about what kinds of paths we're
// passing in and figure out if we want to expand LocalDirStorage, or constrain
// the keys.
//
// Note that "/" (forward slash) is allowed in the key and is reinterpreted onto
// the local file system as a directory.
fs::create_dir_all(filepath.parent().expect("Must have parent")).context(
"LocalDirStorage file creation failed. Perhaps the storage object key isn't valid?",
)?;
let file = File::create(filepath).context(
"LocalDirStorage file creation failed. Perhaps the storage object key isn't valid?",
)?;
let upload = LocalDirUpload {
object_key,
file: Some(file),
num_parts: 0,
};
let upload = BufferedUpload::new(upload, LOCAL_DIR_MIN_PART_SIZE, LOCAL_DIR_MAX_PART_SIZE);
Ok(Box::new(upload))
}
async fn start_client_driven_upload(&self) -> anyhow::Result<ClientDrivenUploadToken> {
let object_key: ObjectKey = self.rt.new_uuid_v4().to_string().try_into()?;
let key = self.filename_for_key(object_key.clone());
let filepath = self.dir.join(key);
// The filename constraints on the local file system are a bit stricter than S3,
// so these might fail. If it fails, let's think about what kinds of paths we're
// passing in and figure out if we want to expand LocalDirStorage, or constrain
// the keys.
//
// Note that "/" (forward slash) is allowed in the key and is reinterpreted onto
// the local file system as a directory.
fs::create_dir_all(filepath.parent().expect("Must have parent")).context(
"LocalDirStorage file creation failed. Perhaps the storage object key isn't valid?",
)?;
let _file = File::create(filepath.clone()).context(
"LocalDirStorage file creation failed. Perhaps the storage object key isn't valid?",
)?;
ClientDrivenUpload {
object_key,
filepath,
}
.try_into()
}
async fn upload_part(
&self,
token: ClientDrivenUploadToken,
_part_number: u16,
part: Bytes,
) -> anyhow::Result<ClientDrivenUploadPartToken> {
let ClientDrivenUpload {
object_key,
filepath,
} = token.try_into()?;
let file = OpenOptions::new().append(true).open(filepath)?;
let mut upload = LocalDirUpload {
object_key,
file: Some(file),
num_parts: 0, // unused
};
upload.write(part).await?;
Ok(ClientDrivenUploadPartToken(String::new()))
}
async fn finish_client_driven_upload(
&self,
token: ClientDrivenUploadToken,
_part_tokens: Vec<ClientDrivenUploadPartToken>,
) -> anyhow::Result<ObjectKey> {
let ClientDrivenUpload {
object_key,
filepath: _,
} = token.try_into()?;
Ok(object_key)
}
async fn signed_url(&self, key: ObjectKey, _expires_in: Duration) -> anyhow::Result<String> {
let key = self.filename_for_key(key);
let path = self.dir.join(key);
let url = Url::from_file_path(&path)
.map_err(|()| anyhow::anyhow!("Dir isn't valid UTF8: {:?}", self.dir))?;
Ok(url.into())
}
async fn presigned_upload_url(
&self,
expires_in: Duration,
) -> anyhow::Result<(ObjectKey, String)> {
let object_key: ObjectKey = self.rt.new_uuid_v4().to_string().try_into()?;
Ok((
object_key.clone(),
self.signed_url(object_key, expires_in).await?,
))
}
fn cache_key(&self, key: &ObjectKey) -> StorageCacheKey {
let key = self.filename_for_key(key.clone());
let path = self.dir.join(key);
StorageCacheKey(path.to_string_lossy().to_string())
}
fn fully_qualified_key(&self, key: &ObjectKey) -> FullyQualifiedObjectKey {
let key = self.filename_for_key(key.clone());
let path = self.dir.join(key);
path.to_string_lossy().to_string().into()
}
fn test_only_decompose_fully_qualified_key(
&self,
key: FullyQualifiedObjectKey,
) -> anyhow::Result<ObjectKey> {
let key: String = key.into();
let path = Path::new(&key);
let remaining = path.strip_prefix(&self.dir)?.to_string_lossy();
remaining
.strip_suffix(".blob")
.context("Doesn't end with .blob")?
.try_into()
}
fn get_small_range(
&self,
key: &FullyQualifiedObjectKey,
bytes_range: Range<u64>,
) -> BoxFuture<'static, anyhow::Result<StorageGetStream>> {
let path = Path::new(key.as_str()).to_owned();
async move {
let mut buf = vec![0; (bytes_range.end - bytes_range.start) as usize];
let mut file = File::open(path.clone()).context(format!(
"Local dir storage couldn't open {}",
path.display()
))?;
file.seek(SeekFrom::Start(bytes_range.start))?;
file.read_exact(&mut buf)?;
Ok(StorageGetStream {
content_length: (bytes_range.end - bytes_range.start) as i64,
stream: stream::once(async move { Ok(buf.into()) }).boxed(),
})
}
.boxed()
}
async fn get_fq_object_attributes(
&self,
key: &FullyQualifiedObjectKey,
) -> anyhow::Result<Option<ObjectAttributes>> {
let path = Path::new(key.as_str());
let mut buf = vec![];
let result = File::open(path);
if result.is_err() {
return Ok(None);
}
let mut file = result.unwrap();
file.read_to_end(&mut buf)?;
Ok(Some(ObjectAttributes {
size: buf.len() as u64,
}))
}
fn storage_type_proto(&self) -> pb::searchlight::StorageType {
pb::searchlight::StorageType {
storage_type: Some(pb::searchlight::storage_type::StorageType::Local(
pb::searchlight::LocalStorage {
path: self.dir.to_string_lossy().into_owned(),
},
)),
}
}
async fn delete_object(&self, key: &ObjectKey) -> anyhow::Result<()> {
let key = self.filename_for_key(key.clone());
let path = self.dir.join(key);
fs::remove_file(path)?;
Ok(())
}
}
pub struct LocalDirUpload {
object_key: ObjectKey,
file: Option<File>,
num_parts: usize,
}
#[async_trait]
impl Upload for LocalDirUpload {
async fn write(&mut self, data: Bytes) -> anyhow::Result<()> {
anyhow::ensure!(self.num_parts < MAX_NUM_PARTS);
anyhow::ensure!(data.len() <= LOCAL_DIR_MAX_PART_SIZE);
let file = self
.file
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Upload not active"))?;
file.write_all(&data)?;
self.num_parts += 1;
Ok(())
}
async fn try_write_parallel<'a>(
&'a mut self,
stream: &mut Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send + 'a>>,
) -> anyhow::Result<()> {
while let Some(value) = stream.next().await {
self.write(value?).await?;
}
Ok(())
}
async fn abort(mut self: Box<Self>) -> anyhow::Result<()> {
anyhow::ensure!(self.file.is_some());
self.file.take();
Ok(())
}
async fn complete(mut self: Box<Self>) -> anyhow::Result<ObjectKey> {
let object_key = self.object_key;
let file = self.file.take().context("Completing inactive file")?;
file.sync_all()?;
Ok(object_key)
}
}
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub enum StorageUseCase {
/// Snapshot Exports
Exports,
/// Snapshot Imports, stored temporarily so we can process multiple times
/// without holding in memory.
SnapshotImports,
/// Our module cache
Modules,
/// User/customer facing storage
Files,
/// Search index snapshots
SearchIndexes,
}
impl Display for StorageUseCase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StorageUseCase::Exports => write!(f, "exports"),
StorageUseCase::SnapshotImports => write!(f, "snapshot_imports"),
StorageUseCase::Modules => write!(f, "modules"),
StorageUseCase::Files => write!(f, "files"),
StorageUseCase::SearchIndexes => write!(f, "search"),
}
}
}
#[cfg(test)]
mod buffered_upload_tests {
use std::{
pin::Pin,
sync::Arc,
};
use async_trait::async_trait;
use bytes::Bytes;
use common::types::ObjectKey;
use futures::{
Stream,
StreamExt,
TryStreamExt,
};
use parking_lot::Mutex;
use runtime::{
prod::ProdRuntime,
testing::TestRuntime,
};
use tokio::{
io::AsyncWriteExt,
sync::mpsc,
};
use tokio_stream::wrappers::ReceiverStream;
use crate::{
BufferedUpload,
ChannelWriter,
Upload,
UploadExt,
};
struct NoopUpload {
parts: Arc<Mutex<Vec<Bytes>>>,
}
#[async_trait]
impl Upload for NoopUpload {
async fn write(&mut self, data: Bytes) -> anyhow::Result<()> {
self.parts.lock().push(data);
Ok(())
}
async fn try_write_parallel<'a>(
&'a mut self,
stream: &mut Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send + 'a>>,
) -> anyhow::Result<()> {
while let Some(value) = stream.try_next().await? {
self.write(value).await?;
}
Ok(())
}
async fn abort(self: Box<Self>) -> anyhow::Result<()> {
Ok(())
}
async fn complete(self: Box<Self>) -> anyhow::Result<common::types::ObjectKey> {
Ok(ObjectKey::try_from("asdf")?)
}
}
#[convex_macro::prod_rt_test]
async fn test_buffered_upload(_rt: ProdRuntime) -> anyhow::Result<()> {
let (sender, receiver) = mpsc::channel::<Bytes>(1);
// NOTE: data flows from ChannelWriter -> sender -> receiver -> BufferedUpload
// -> NoopUpload
let parts = Arc::new(Mutex::new(vec![]));
let upload = NoopUpload {
parts: parts.clone(),
};
let mut upload: Box<BufferedUpload> = Box::new(BufferedUpload::new(upload, 100, 1000));
let uploader = upload.try_write_parallel_and_hash(ReceiverStream::new(receiver).map(Ok));
let mut writer = ChannelWriter::new(sender, 10);
let write_fut = async move {
writer.write_all(b"abcdefghijklmnopqrstuvwxyz").await?;
writer.shutdown().await?;
drop(writer); // drop closes sender, allowing uploader to complete
anyhow::Ok(())
};
// Regression test: if ChannelWriter::poll_write reserves a new permit in
// `sender` each time it's called, then the uploader will deadlock.
let _ = futures::try_join!(write_fut, uploader)?;
let _ = upload.complete().await?;
let parts = parts.lock();
assert_eq!(
*parts,
vec![Bytes::from_static(b"abcdefghijklmnopqrstuvwxyz")]
);
Ok(())
}
#[convex_macro::test_runtime]
async fn test_buffered_upload_max_size(_rt: TestRuntime) -> anyhow::Result<()> {
let (sender, receiver) = mpsc::channel::<Bytes>(1);
let parts = Arc::new(Mutex::new(vec![]));
let upload = NoopUpload {
parts: parts.clone(),
};
let mut upload: Box<BufferedUpload> = Box::new(BufferedUpload::new(upload, 10, 30));
let uploader = upload.try_write_parallel_and_hash(ReceiverStream::new(receiver).map(Ok));
// Intentionally test case where we write sizes that don't divide evenly
// into the min or max size.
let mut writer = ChannelWriter::new(sender, 7);
let data =
b"abcdefghi_abcdefghi_abcdefghi_abcdefghi_abcdefghi_abcdefghi_abcdefghi_abcdefghi_";
let write_fut = async move {
// 80 bytes
writer.write_all(data).await?;
writer.shutdown().await?;
drop(writer); // drop closes sender, allowing uploader to complete
anyhow::Ok(())
};
let _ = futures::try_join!(write_fut, uploader)?;
let _ = upload.complete().await?;
let parts = parts.lock();
let joined_parts: Bytes = parts.iter().flat_map(|p| p.iter().copied()).collect();
assert_eq!(joined_parts, Bytes::from_static(data));
let lengths: Vec<_> = parts.iter().map(|p| p.len()).collect();
// 14 is the first multiple of 7 that's greater than 10
// 21 is the first multiple of 7 that's greater than 20
// 30 is the maximum size
// 15 is the remainder
assert_eq!(lengths, vec![14, 21, 30, 15]);
Ok(())
}
}
#[cfg(test)]
mod local_storage_tests {
use std::{
fs::File,
io::{
self,
Read,
},
sync::Arc,
time::Duration,
};
use anyhow::{
anyhow,
Context,
};
use bytes::Bytes;
use common::runtime::testing::TestRuntime;
use futures::{
stream,
StreamExt,
TryStreamExt,
};
use url::Url;
use super::{
stream_object_with_retries,
LocalDirStorage,
Storage,
StorageExt,
Upload,
DOWNLOAD_CHUNK_SIZE,
LOCAL_DIR_MIN_PART_SIZE,
};
#[convex_macro::test_runtime]
async fn test_upload(rt: TestRuntime) -> anyhow::Result<()> {
let storage = LocalDirStorage::new(rt)?;
let mut test_upload = storage.start_upload().await?;
test_upload
.write(vec![1; LOCAL_DIR_MIN_PART_SIZE].into())
.await?;
test_upload.write(vec![2, 3, 4].into()).await?;
let _object_key = test_upload.complete().await?;
Ok(())
}
#[convex_macro::test_runtime]
async fn test_upload_auto(rt: TestRuntime) -> anyhow::Result<()> {
let storage = LocalDirStorage::new(rt)?;
let mut test_upload = storage.start_upload().await?;
test_upload
.write(vec![1; LOCAL_DIR_MIN_PART_SIZE].into())
.await?;
test_upload.write(vec![2, 3, 4].into()).await?;
let _object_key = test_upload.complete().await?;
Ok(())
}
#[convex_macro::test_runtime]
async fn test_abort(rt: TestRuntime) -> anyhow::Result<()> {
let storage = LocalDirStorage::new(rt)?;
let mut test_upload = storage.start_upload().await?;
test_upload
.write(vec![1; LOCAL_DIR_MIN_PART_SIZE].into())
.await?;
test_upload.abort().await?;
Ok(())
}
#[convex_macro::test_runtime]
async fn test_local_storage(rt: TestRuntime) -> anyhow::Result<()> {
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt)?);
let mut upload = storage.start_upload().await?;
upload.write(Bytes::from_static(b"pinna park")).await?;
let key = upload.complete().await?;
// Get via .get()
let contents = storage
.get(&key)
.await?
.context("Not found")?
.collect_as_bytes()
.await?;
assert_eq!(&contents, "pinna park");
// Get via signed_url
let uri = storage.signed_url(key, Duration::from_secs(10)).await?;
let mut f = File::open(
uri.parse::<Url>()?
.to_file_path()
.map_err(|()| anyhow!("not a file path?"))?,
)?;
let mut buf = String::new();
f.read_to_string(&mut buf)?;
assert_eq!(&buf, "pinna park");
Ok(())
}
#[convex_macro::test_runtime]
async fn test_storage_get_paginated(rt: TestRuntime) -> anyhow::Result<()> {
// Test that chunks are stitched together in the right order.
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt)?);
let mut test_upload = storage.start_upload().await?;
let prefix_length = (DOWNLOAD_CHUNK_SIZE * 2) as usize;
let suffix_length = (DOWNLOAD_CHUNK_SIZE / 2) as usize;
let length = prefix_length + suffix_length;
test_upload.write(vec![1; prefix_length].into()).await?;
test_upload.write(vec![2; suffix_length].into()).await?;
let object_key = test_upload.complete().await?;
let stream = storage.get(&object_key).await?.unwrap();
assert_eq!(stream.content_length, length as i64);
let bytes = stream.collect_as_bytes().await?;
assert_eq!(bytes.len(), length);
assert_eq!(&bytes[..prefix_length], &vec![1; prefix_length]);
assert_eq!(&bytes[prefix_length..], &vec![2; suffix_length]);
let suffix_stream = storage
.get_range(
&object_key,
(
std::ops::Bound::Included(prefix_length as u64),
std::ops::Bound::Excluded(length as u64),
),
)
.await?
.unwrap();
assert_eq!(suffix_stream.content_length, suffix_length as i64);
let bytes = suffix_stream.collect_as_bytes().await?;
assert_eq!(bytes.len(), suffix_length);
assert_eq!(&bytes, &vec![2; suffix_length]);
Ok(())
}
#[convex_macro::test_runtime]
async fn test_storage_get_with_retries(rt: TestRuntime) -> anyhow::Result<()> {
// Test that if the first storage range request disconnects after
// one chunk, the rest is fetched successfully and everything is
// stitched together.
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt)?);
let mut test_upload = storage.start_upload().await?;
test_upload
.write(vec![0, 1, 2, 3, 4, 5, 6, 7, 8].into())
.await?;
let object_key = test_upload.complete().await?;
let disconnected_stream = stream::iter(vec![
Ok(vec![1, 2, 3].into()),
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
anyhow::anyhow!("err"),
)),
])
.boxed();
let object_key = storage.fully_qualified_key(&object_key);
let stream_with_retries = stream_object_with_retries(
disconnected_stream,
storage.clone(),
object_key.clone(),
1..8,
1,
);
let results: Vec<_> = stream_with_retries.try_collect().await?;
assert_eq!(
results,
vec![Bytes::from(vec![1, 2, 3]), Bytes::from(vec![4, 5, 6, 7])]
);
Ok(())
}
#[convex_macro::test_runtime]
async fn test_storage_delete(rt: TestRuntime) -> anyhow::Result<()> {
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt)?);
let test_upload = storage.start_upload().await?;
let object_key = test_upload.complete().await?;
assert!(storage.get(&object_key).await?.is_some());
storage.delete_object(&object_key).await?;
assert!(storage.get(&object_key).await?.is_none());
Ok(())
}
}