Skip to main content
Glama

Convex MCP server

Official
by get-convex
zip_uploader.rs5.78 kB
use async_zip::{ tokio::write::{ EntryStreamWriter, ZipFileWriter, }, Compression, ZipEntryBuilder, }; use bytes::Bytes; use common::{ self, async_compat::FuturesAsyncWriteCompatExt, document::ResolvedDocument, types::TableName, }; use futures::{ pin_mut, AsyncWriteExt, }; use serde_json::{ json, Value as JsonValue, }; use shape_inference::{ export_context::GeneratedSchema, ShapeConfig, }; use storage::ChannelWriter; use tokio::io::{ AsyncBufRead, AsyncWriteExt as _, }; use value::export::ValueFormat; static AFTER_DOCUMENTS_CLEAN: Bytes = Bytes::from_static("\n".as_bytes()); // 0o644 => read-write for owner, read for everyone else. const ZIP_ENTRY_PERMISSIONS: u16 = 0o644; pub static README_MD_CONTENTS: &str = r#"# Welcome to your Convex snapshot export! This ZIP file contains a snapshot of the tables in your Convex deployment. Documents for each table are listed as lines of JSON in <table_name>/documents.jsonl files. For details on the format and how to use this snapshot with npx convex import, check out [the docs](https://docs.convex.dev/database/import-export/export) or ask us in [Discord](http://convex.dev/community). "#; // 'a is lifetime of entire zip file writer. // 'b is lifetime of entry writer for a single table. pub struct ZipSnapshotTableUpload<'a, 'b> { entry_writer: EntryStreamWriter<'b, &'a mut ChannelWriter>, } impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> { async fn new( zip_writer: &'b mut ZipFileWriter<&'a mut ChannelWriter>, path_prefix: &str, table_name: TableName, ) -> anyhow::Result<Self> { let source_path = format!("{path_prefix}{table_name}/documents.jsonl"); let builder = ZipEntryBuilder::new(source_path.into(), Compression::Deflate) .unix_permissions(ZIP_ENTRY_PERMISSIONS); let entry_writer = zip_writer.write_entry_stream(builder.build()).await?; Ok(Self { entry_writer }) } pub async fn write(&mut self, doc: ResolvedDocument) -> anyhow::Result<()> { let json = doc.export(ValueFormat::ConvexCleanJSON); self.write_json_line(json).await } pub async fn write_json_line(&mut self, json: JsonValue) -> anyhow::Result<()> { let buf = serde_json::to_vec(&json)?; self.entry_writer.write_all(&buf).await?; self.entry_writer.write_all(&AFTER_DOCUMENTS_CLEAN).await?; Ok(()) } pub async fn complete(self) -> anyhow::Result<()> { self.entry_writer.close().await?; Ok(()) } } pub struct ZipSnapshotUpload<'a> { writer: ZipFileWriter<&'a mut ChannelWriter>, } impl<'a> ZipSnapshotUpload<'a> { pub async fn new(out: &'a mut ChannelWriter) -> anyhow::Result<Self> { let writer = ZipFileWriter::with_tokio(out); let mut zip_snapshot_upload = Self { writer }; zip_snapshot_upload .stream_full_file("README.md".to_owned(), README_MD_CONTENTS.as_bytes()) .await?; Ok(zip_snapshot_upload) } #[fastrace::trace] pub async fn stream_full_file( &mut self, path: String, contents: impl AsyncBufRead, ) -> anyhow::Result<()> { let builder = ZipEntryBuilder::new(path.into(), Compression::Deflate) .unix_permissions(ZIP_ENTRY_PERMISSIONS); let mut entry_writer = self .writer .write_entry_stream(builder.build()) .await? .compat_write(); pin_mut!(contents); tokio::io::copy_buf(&mut contents, &mut entry_writer).await?; entry_writer.into_inner().close().await?; Ok(()) } pub async fn start_table( &mut self, path_prefix: &str, table_name: TableName, ) -> anyhow::Result<ZipSnapshotTableUpload<'a, '_>> { ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await } /// System tables have known shape, so we don't need to serialize it. pub async fn start_system_table( &mut self, path_prefix: &str, table_name: TableName, ) -> anyhow::Result<ZipSnapshotTableUpload<'a, '_>> { anyhow::ensure!(table_name.is_system()); ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await } pub async fn write_generated_schema<T: ShapeConfig>( &mut self, path_prefix: &str, table_name: &TableName, generated_schema: GeneratedSchema<T>, ) -> anyhow::Result<()> { let generated_schema_path = format!("{path_prefix}{table_name}/generated_schema.jsonl"); let builder = ZipEntryBuilder::new(generated_schema_path.into(), Compression::Deflate) .unix_permissions(ZIP_ENTRY_PERMISSIONS); let mut entry_writer = self .writer .write_entry_stream(builder.build()) .await? .compat_write(); let generated_schema_str = generated_schema.inferred_shape.to_string(); entry_writer .write_all(serde_json::to_string(&generated_schema_str)?.as_bytes()) .await?; entry_writer.write_all(b"\n").await?; for (override_id, override_export_context) in generated_schema.overrides.into_iter() { let override_json = json!({override_id.encode(): JsonValue::from(override_export_context)}); entry_writer .write_all(serde_json::to_string(&override_json)?.as_bytes()) .await?; entry_writer.write_all(b"\n").await?; } entry_writer.into_inner().close().await?; Ok(()) } pub async fn complete(self) -> anyhow::Result<()> { self.writer.close().await?; Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server