Skip to main content
Glama

Convex MCP server

Official
by get-convex
lib.rs7.89 kB
#![feature(let_chains)] #[cfg(test)] mod tests; use std::{ io, ops::Deref, pin::Pin, sync::Arc, task::{ ready, Context, Poll, }, }; use anyhow::Context as _; use bytes::Bytes; use common::types::{ FullyQualifiedObjectKey, ObjectKey, }; use errors::ErrorMetadata; use futures::Stream; pub use rc_zip; use rc_zip::{ fsm::{ ArchiveFsm, EntryFsm, FsmResult, }, parse::{ Archive, Entry, }, }; use storage::{ Storage, StorageExt as _, }; use tokio::io::{ AsyncRead, AsyncReadExt, ReadBuf, }; use tokio_util::io::StreamReader; pub struct StorageZipArchive { storage: Arc<dyn Storage>, object_key: FullyQualifiedObjectKey, archive: Archive, } impl Deref for StorageZipArchive { type Target = Archive; fn deref(&self) -> &Self::Target { &self.archive } } impl StorageZipArchive { /// Reads the central directory of the given zip file in object storage. pub async fn open(storage: Arc<dyn Storage>, object_key: &ObjectKey) -> anyhow::Result<Self> { let fq_key = storage.fully_qualified_key(object_key); Self::open_fq(storage, fq_key).await } pub async fn open_fq( storage: Arc<dyn Storage>, object_key: FullyQualifiedObjectKey, ) -> anyhow::Result<Self> { let attributes = storage .get_fq_object_attributes(&object_key) .await? .with_context(|| format!("Could not find object with key {object_key:?}"))?; let mut fsm = ArchiveFsm::new(attributes.size); let mut read_position = u64::MAX; // arbitrary value that would never be used let mut read_stream: Option<StreamReader<_, _>> = None; loop { if let Some(offset) = fsm.wants_read() { if offset == read_position && let Some(reader) = &mut read_stream { // Continue reading anyhow::ensure!(!fsm.space().is_empty(), "wants read but no buffer?"); let read_bytes = reader.read(fsm.space()).await?; fsm.fill(read_bytes); read_position += read_bytes as u64; eprintln!("read {:?}", offset..read_position); if read_bytes == 0 { read_stream = None; continue; } } else { let stream = if read_position == offset { // If we are continuing a sequential read, then assume // that we're reading the central directory; read more // data at once storage .get_fq_object_exact_range(&object_key, offset..attributes.size) .stream } else { let read_len = fsm.space().len() as u64; let end = attributes.size.min(offset + read_len); storage .get_small_range_with_retries(&object_key, offset..end) .await? .stream }; read_position = offset; read_stream = Some(StreamReader::new(stream)); } } match fsm .process() .context(ErrorMetadata::bad_request("InvalidZip", "invalid zip file"))? { FsmResult::Continue(next) => fsm = next, FsmResult::Done(archive) => { return Ok(Self { storage, object_key, archive, }) }, } } } /// Creates a reader for an entry in the archive. /// To get an `Entry`, use [`Archive::entries`] via `StorageZipArchive`'s /// `Deref` impl. pub fn read_entry(&self, entry: Entry) -> StorageZipEntryReader { let start = entry.header_offset; // The absolute max amount of data that could be read includes the local // file header, compressed data, and data descriptor. The local file // header is variable-size but could contain up to 2 64KiB fields (file // name & extra fields), and then we add 1KiB for the remaining // fixed-size stuff. const MAX_HEADER_SIZE: u64 = (1 << 16) * 2 + 1024; let end = self .archive .size() .min(start + entry.compressed_size + MAX_HEADER_SIZE); let read_stream = StreamReader::new( self.storage .get_fq_object_exact_range(&self.object_key, start..end) .stream, ); StorageZipEntryReader { read_stream, entry_fsm: Some(EntryFsm::new(Some(entry), None)), } } } /// Reads the content of a single file in a zip archive in storage. pub struct StorageZipEntryReader { read_stream: StreamReader<Pin<Box<dyn Stream<Item = Result<Bytes, io::Error>> + Send + 'static>>, Bytes>, entry_fsm: Option<EntryFsm>, } impl AsyncRead for StorageZipEntryReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { let this = &mut *self; loop { let Some(fsm) = &mut this.entry_fsm else { // we previously hit EOF or an error return Poll::Ready(Ok(())); }; let mut read_stream_eof = false; if fsm.wants_read() { let mut read_buf = ReadBuf::new(fsm.space()); ready!(Pin::new(&mut this.read_stream).poll_read(cx, &mut read_buf))?; let read_bytes = read_buf.filled().len(); fsm.fill(read_bytes); if read_bytes == 0 { read_stream_eof = true; } } if buf.remaining() == 0 { // Defensive check; this is mostly invalid but we should not // infinite loop here return Poll::Ready(Ok(())); } let fsm = this.entry_fsm.take().unwrap(); // N.B.: use block_in_place because decompression is happening here match common::runtime::block_in_place(|| fsm.process(buf.initialize_unfilled())) { Ok(FsmResult::Continue((fsm, outcome))) => { let fsm = this.entry_fsm.insert(fsm); buf.advance(outcome.bytes_written); if outcome.bytes_written == 0 && buf.remaining() > 0 { if read_stream_eof { return Poll::Ready(Err(io::Error::new( io::ErrorKind::UnexpectedEof, "Hit EOF while reading zip entry", ))); } // This would otherwise signal EOF; try reading again instead. if !fsm.wants_read() { // guard against an infinite loop return Poll::Ready(Err(io::Error::other( "bug: EntryFsm wrote nothing but doesn't want read?", ))); } continue; } return Poll::Ready(Ok(())); }, Ok(FsmResult::Done(_buffer)) => return Poll::Ready(Ok(())), // zip parse or decompression error Err(e) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::InvalidData, e))), } } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server