Skip to main content
Glama

Convex MCP server

Official
by get-convex
disk_index.rs18.9 kB
use std::{ path::Path, sync::{ Arc, LazyLock, }, time::SystemTime, }; use anyhow::Context; use async_zip_0_0_9::{ read::stream::ZipFileReader, write::ZipFileWriter, Compression, ZipEntryBuilder, ZipEntryBuilderExt, }; use bytes::Bytes; use cmd_util::env::env_config; use common::{ bootstrap_model::index::{ text_index::FragmentedTextSegment, vector_index::FragmentedVectorSegment, }, runtime::{ tokio_spawn_blocking, Runtime, }, types::ObjectKey, }; use storage::{ ChannelWriter, Storage, StorageExt, Upload, UploadExt, }; use tantivy::{ Index, IndexReader, IndexWriter, }; use tokio::{ fs, io::{ AsyncBufRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, }, sync::mpsc, }; use tokio_stream::wrappers::ReceiverStream; use vector::qdrant_segments::{ VectorDiskSegmentPaths, VectorDiskSegmentValues, }; use walkdir::WalkDir; use crate::{ constants::CONVEX_EN_TOKENIZER, convex_en, metrics::{ self, }, NewTextSegment, SearchFileType, TantivySearchIndexSchema, TextSegmentPaths, }; static SEARCH_INDEXING_MEMORY_ARENA_BYTES: LazyLock<usize> = LazyLock::new(|| env_config("SEARCH_INDEXING_MEMORY_ARENA_BYTES", 50_000_000)); #[fastrace::trace] pub async fn index_reader_for_directory<P: AsRef<Path>>( directory: P, ) -> anyhow::Result<IndexReader> { let timer = metrics::index_reader_for_directory_timer(); let directory = directory.as_ref().to_path_buf(); let index = tokio_spawn_blocking("disk_index_open", move || Index::open_in_dir(directory)).await??; index .tokenizers() .register(CONVEX_EN_TOKENIZER, convex_en()); let reader = index.reader()?; timer.finish(); Ok(reader) } pub async fn index_writer_for_directory<P: AsRef<Path>>( directory: P, tantivy_schema: &TantivySearchIndexSchema, ) -> anyhow::Result<IndexWriter> { let directory = directory.as_ref().to_path_buf(); let schema = tantivy_schema.schema.clone(); let index = tokio_spawn_blocking("disk_index_create", move || { Index::create_in_dir(&directory, schema) }) .await??; index .tokenizers() .register(CONVEX_EN_TOKENIZER, convex_en()); Ok(index.writer(*SEARCH_INDEXING_MEMORY_ARENA_BYTES)?) } #[cfg(any(test, feature = "testing"))] pub async fn download_single_file_original<P: AsRef<Path>>( key: &ObjectKey, path: P, storage: Arc<dyn Storage>, ) -> anyhow::Result<()> { let stream = storage.get(key).await?.unwrap().into_tokio_reader(); let mut file = fs::File::create(path).await?; let mut reader = BufReader::with_capacity(2 << 16, stream); tokio::io::copy_buf(&mut reader, &mut file).await?; file.flush().await?; Ok(()) } pub async fn download_single_file_zip<P: AsRef<Path>>( key: &ObjectKey, path: P, storage: Arc<dyn Storage>, ) -> anyhow::Result<()> { // Start the file download let stream = storage .get(key) .await? .context(format!("Failed to find stored file! {key:?}"))? .into_tokio_reader(); // Open the target file let file = fs::File::create(path).await?; let mut file = BufWriter::new(file); // Require the stream to be a zip containing a single file, extract the data for // that single file and write it to our target path. let mut reader = ZipFileReader::new(stream); let mut is_written = false; while !reader.finished() { // Some entries may just be blank, so we skip them. if let Some(entry) = reader.entry_reader().await? { // Some entries may be directories, which we don't care about. if entry.entry().filename().ends_with('/') { continue; } // If it is a file, make sure we haven't already read one (we're expecting // exactly one!) if is_written { anyhow::bail!( "ZIP contained multiple files! latest: {:?}", entry.entry().filename() ); } is_written = true; entry.copy_to_end_crc(&mut file, 2 << 15).await?; // Keep reading to make sure we don't get something unexpected (like // multiple files) } } file.flush().await?; Ok(()) } pub async fn upload_text_segment<RT: Runtime>( rt: &RT, storage: Arc<dyn Storage>, new_segment: NewTextSegment, ) -> anyhow::Result<FragmentedTextSegment> { let TextSegmentPaths { index_path, id_tracker_path, alive_bit_set_path, deleted_terms_path, } = new_segment.paths; let upload_index = upload_index_archive_from_path(index_path, storage.clone(), SearchFileType::Text); let upload_id_tracker = upload_single_file_from_path( id_tracker_path, storage.clone(), SearchFileType::TextIdTracker, ); let upload_bitset = upload_single_file_from_path( alive_bit_set_path, storage.clone(), SearchFileType::TextAliveBitset, ); let upload_deleted_terms = upload_single_file_from_path( deleted_terms_path, storage.clone(), SearchFileType::TextDeletedTerms, ); let result = futures::try_join!( upload_index, upload_id_tracker, upload_bitset, upload_deleted_terms )?; let (segment_key, id_tracker_key, alive_bitset_key, deleted_terms_table_key) = result; Ok(FragmentedTextSegment { segment_key, id_tracker_key, deleted_terms_table_key, alive_bitset_key, num_indexed_documents: new_segment.num_indexed_documents, // Brand-new text segments will never have any deleted documents. Deleted documents will // instead have just been excluded from the segment. num_deleted_documents: 0, size_bytes_total: new_segment.size_bytes_total, id: rt.new_uuid_v4().to_string(), }) } pub async fn upload_vector_segment<RT: Runtime>( rt: &RT, storage: Arc<dyn Storage>, new_segment: VectorDiskSegmentValues, ) -> anyhow::Result<FragmentedVectorSegment> { let VectorDiskSegmentPaths { segment, uuids, deleted_bitset, } = new_segment.paths; let upload_segment = upload_single_file_from_path( segment, storage.clone(), SearchFileType::FragmentedVectorSegment, ); let upload_id_tracker = upload_single_file_from_path(uuids, storage.clone(), SearchFileType::VectorIdTracker); let upload_bitset = upload_single_file_from_path( deleted_bitset, storage.clone(), SearchFileType::VectorDeletedBitset, ); let (segment_key, id_tracker_key, deleted_bitset_key) = futures::try_join!(upload_segment, upload_id_tracker, upload_bitset)?; Ok(FragmentedVectorSegment { segment_key, id_tracker_key, deleted_bitset_key, num_vectors: new_segment.num_vectors, num_deleted: new_segment.num_deleted, id: rt.new_uuid_v4().to_string(), }) } pub async fn upload_single_file_from_path<P: AsRef<Path>>( path: P, storage: Arc<dyn Storage>, upload_type: SearchFileType, ) -> anyhow::Result<ObjectKey> { let filename = path .as_ref() .file_name() .and_then(|name| name.to_str()) .with_context(|| format!("invalid path: {:?}", path.as_ref()))? .to_string(); let file = fs::File::open(path).await?; let mut file = BufReader::new(file); upload_single_file(&mut file, filename, storage, upload_type).await } pub async fn upload_single_file<R: AsyncBufRead + Unpin>( reader: &mut R, filename: String, storage: Arc<dyn Storage>, upload_type: SearchFileType, ) -> anyhow::Result<ObjectKey> { let timer = metrics::upload_archive_timer(upload_type); let (sender, receiver) = mpsc::channel::<Bytes>(1); let mut upload = storage.start_upload().await?; let uploader = upload.write_parallel(ReceiverStream::new(receiver)); let writer = ChannelWriter::new(sender, 5 * (1 << 20)); // FragmentedVectorSegment files are tarballs already. Compression provides a // relatively small improvement in file size. Extracting a zip and then a // second tarball is expensive. TODO(CX-5191): We should think about // compressing the tar files as they're created. let file_type = if upload_type == SearchFileType::FragmentedVectorSegment { SingleFileFormat::ORIGINAL } else { SingleFileFormat::ZIP }; let archiver = write_single_file(reader, filename, writer, file_type); tokio::try_join!(archiver, uploader)?; let key = upload.complete().await?; timer.finish(); Ok(key) } #[derive(PartialEq)] enum SingleFileFormat { /// Zip the file during upload with compression ZIP, /// Just upload the original file without any additional processing or /// compression. ORIGINAL, } async fn write_single_file<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin>( reader: &mut R, filename: String, mut out: W, format: SingleFileFormat, ) -> anyhow::Result<()> { if format == SingleFileFormat::ZIP { let mut writer = ZipFileWriter::new(&mut out); zip_single_file(reader, filename, &mut writer).await?; writer.close().await?; } else { raw_single_file(reader, &mut out).await?; } out.shutdown().await?; Ok(()) } pub async fn upload_index_archive_from_path<P: AsRef<Path>>( directory: P, storage: Arc<dyn Storage>, upload_type: SearchFileType, ) -> anyhow::Result<ObjectKey> { let timer = metrics::upload_archive_timer(upload_type); let (sender, receiver) = mpsc::channel::<Bytes>(1); let mut upload = storage.start_upload().await?; let uploader = upload.write_parallel(ReceiverStream::new(receiver)); let writer = ChannelWriter::new(sender, 5 * (1 << 20)); let archiver = write_index_archive(directory, writer); let ((), ()) = futures::try_join!(archiver, uploader)?; let key = upload.complete().await?; timer.finish(); Ok(key) } async fn write_index_archive<P: AsRef<Path>>( directory: P, mut out: impl AsyncWrite + Send + Unpin, ) -> anyhow::Result<()> { let mut writer = ZipFileWriter::new(&mut out); for entry in WalkDir::new(&directory).sort_by_file_name() { let entry = entry?; if !entry.file_type().is_file() { continue; } let filename = entry .path() .strip_prefix(&directory)? .to_str() .map(|s| s.to_owned()) .context("Invalid path inside directory")?; let file = fs::File::open(entry.path()).await?; let mut file = BufReader::new(file); zip_single_file(&mut file, filename, &mut writer).await?; } writer.close().await?; out.shutdown().await?; Ok(()) } async fn raw_single_file<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin>( reader: &mut R, writer: &mut W, ) -> anyhow::Result<()> { let bytes_written = tokio::io::copy_buf(reader, writer).await?; tracing::trace!("Copied {bytes_written} bytes"); Ok(()) } async fn zip_single_file<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin>( reader: &mut R, filename: String, writer: &mut ZipFileWriter<W>, ) -> anyhow::Result<()> { let entry = ZipEntryBuilder::new(filename, Compression::Zstd) .unix_permissions(0o644) // Pin the mtime to prevent flakes in CI, where we've observed the mtime incrementing by // one when traversing the test directory multiple times. .last_modification_date(SystemTime::UNIX_EPOCH.into()) .build(); let mut stream = writer.write_entry_stream(entry).await?; tokio::io::copy_buf(reader, &mut stream).await?; stream.close().await?; Ok(()) } #[cfg(test)] mod tests { use std::sync::Arc; use anyhow::{ Context, Ok, }; use async_zip_0_0_9::read::mem::ZipFileReader; use common::runtime::testing::TestDriver; use runtime::prod::ProdRuntime; use storage::{ LocalDirStorage, Storage, StorageExt, }; use tokio::{ fs, io::{ AsyncReadExt, AsyncWriteExt, BufReader, }, }; use super::{ upload_index_archive_from_path, write_index_archive, }; use crate::{ disk_index::{ download_single_file_original, download_single_file_zip, upload_single_file_from_path, write_single_file, SingleFileFormat, }, SearchFileType, }; #[convex_macro::prod_rt_test] async fn test_upload_download_single_file_zip(rt: ProdRuntime) -> anyhow::Result<()> { let storage = Arc::new(LocalDirStorage::new(rt)?); let dir = tempfile::tempdir()?; let file_path = dir.path().join("test"); let mut file = fs::File::create(file_path.clone()).await?; file.write_all(b"file content").await?; file.sync_all().await?; let key = upload_single_file_from_path(file_path, storage.clone(), SearchFileType::VectorSegment) .await?; let target_file_path = dir.path().join("output"); download_single_file_zip(&key, target_file_path.clone(), storage.clone()).await?; let mut target_file = fs::File::open(target_file_path).await?; let mut contents = vec![]; target_file.read_to_end(&mut contents).await?; Ok(()) } #[convex_macro::prod_rt_test] async fn test_upload_download_single_file_original(rt: ProdRuntime) -> anyhow::Result<()> { let storage = Arc::new(LocalDirStorage::new(rt)?); let dir = tempfile::tempdir()?; let file_path = dir.path().join("test"); let mut file = fs::File::create(file_path.clone()).await?; file.write_all(b"file content").await?; file.sync_all().await?; let key = upload_single_file_from_path( file_path, storage.clone(), SearchFileType::FragmentedVectorSegment, ) .await?; let target_file_path = dir.path().join("output"); // Download the file again. download_single_file_original(&key, &target_file_path, storage).await?; // Ensure it matches the original. let mut target_file = fs::File::open(target_file_path).await?; let mut contents = vec![]; target_file.read_to_end(&mut contents).await?; Ok(()) } #[tokio::test] async fn test_write_single_file() -> anyhow::Result<()> { let dir = tempfile::tempdir()?; let file_path = dir.path().join("test"); let mut file = fs::File::create(file_path.clone()).await?; file.write_all(b"file content").await?; file.sync_all().await?; let mut buffer: Vec<u8> = Vec::new(); let file = fs::File::open(file_path).await?; let mut file = BufReader::new(file); write_single_file( &mut file, "test".to_string(), &mut buffer, SingleFileFormat::ZIP, ) .await?; let mut reader = ZipFileReader::new(&buffer).await?; let reader = reader.entry_reader(0).await?; let value = reader.read_to_string_crc().await?; assert_eq!(value, "file content"); Ok(()) } #[tokio::test] async fn test_write_index_archive() -> anyhow::Result<()> { // Set up a directory with a top-level file and a file in a subdirectory. let dir = tempfile::tempdir()?; { let mut h = fs::File::create(dir.path().join("top")).await?; h.write_all(b"file content").await?; h.sync_all().await?; } fs::create_dir(dir.path().join("mid")).await?; { let mut h = fs::File::create(dir.path().join("mid").join("bottom")).await?; h.write_all(b"more file content").await?; h.sync_all().await?; } // Output the zip file into a buffer (no need for a real file). let mut buffer: Vec<u8> = Vec::new(); write_index_archive(dir.path(), &mut buffer).await?; // Now read it back and verify it's what we expect. let mut reader = ZipFileReader::new(&buffer).await?; assert_eq!(reader.entries().len(), 2); // Entry order is alphabetical let mut entries = vec![]; for i in 0..reader.entries().len() { let entry_reader = reader.entry_reader(i).await?; entries.push(( entry_reader.entry().filename().to_owned(), entry_reader.read_to_string_crc().await?, )); } entries.sort_by_key(|(filename, _)| filename.clone()); assert_eq!( entries[0], ("mid/bottom".to_owned(), "more file content".to_owned()) ); assert_eq!(entries[1], ("top".to_owned(), "file content".to_owned())); Ok(()) } #[tokio::test] async fn test_upload_index_archive() -> anyhow::Result<()> { let upper_dir = tempfile::tempdir()?; // Set up a directory with a top-level file and a file in a subdirectory. // We specifically choose the name of this directory so we can verify the output // filename. let dir = upper_dir.path().join("directory_name"); tokio::fs::create_dir(&dir).await?; { let mut h = fs::File::create(dir.as_path().join("top")).await?; h.write_all(b"file content").await?; h.sync_all().await?; } fs::create_dir(dir.join("mid")).await?; { let mut h = fs::File::create(dir.as_path().join("mid").join("bottom")).await?; h.write_all(b"more file content").await?; h.sync_all().await?; } let td = TestDriver::new(); let rt = td.rt(); let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt)?); let key = upload_index_archive_from_path(&dir, storage.clone(), SearchFileType::Text).await?; // Read it back out from storage, into a buffer. let uploaded_file_bytes = storage .get(&key) .await? .context("Not found")? .collect_as_bytes() .await?; // Ensure it's the same zip file we'd get from calling `write_index_archive`. let mut in_memory_bytes: Vec<u8> = Vec::new(); write_index_archive(&dir, &mut in_memory_bytes).await?; assert_eq!(uploaded_file_bytes, in_memory_bytes); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server