Skip to main content
Glama

Convex MCP server

Official
by get-convex
parse.rs25.3 kB
use std::{ collections::BTreeMap, io, str::FromStr, sync::{ Arc, LazyLock, }, }; use anyhow::Context; use bytes::Bytes; use common::{ bootstrap_model::tables::TABLES_TABLE, components::{ ComponentName, ComponentPath, }, knobs::TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, types::{ FieldName, FullyQualifiedObjectKey, }, }; use errors::ErrorMetadata; use futures::{ pin_mut, AsyncBufReadExt, AsyncReadExt, StreamExt, TryStreamExt, }; use futures_async_stream::try_stream; use model::{ file_storage::FILE_STORAGE_VIRTUAL_TABLE, snapshot_imports::types::ImportFormat, }; use regex::Regex; use serde_json::{ json, Value as JsonValue, }; use shape_inference::{ export_context::{ ExportContext, GeneratedSchema, }, ProdConfig, Shape, ShapeConfig, }; use storage::{ Storage, StorageExt, }; use storage_zip_reader::{ rc_zip::parse::Entry, StorageZipArchive, }; use tokio::io::{ AsyncBufReadExt as _, AsyncRead, BufReader, }; use tokio_util::io::ReaderStream; use value::{ id_v6::DeveloperDocumentId, TableName, }; use crate::snapshot_import::import_error::ImportError; #[derive(Debug)] pub enum ImportUnit { Object(JsonValue), NewTable(ComponentPath, TableName), GeneratedSchema(ComponentPath, TableName, GeneratedSchema<ProdConfig>), StorageFileChunk(DeveloperDocumentId, Bytes), } static COMPONENT_NAME_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?_components/([^/]+)/$").unwrap()); static GENERATED_SCHEMA_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/generated_schema\.jsonl$").unwrap()); static DOCUMENTS_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/documents\.jsonl$").unwrap()); // _storage/(ID) with optional ignored prefix and extension like // snapshot/_storage/(ID).png static STORAGE_FILE_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(.*/)?_storage/([^/.]+)(?:\.[^/]+)?$").unwrap()); fn map_zip_io_error(e: io::Error) -> anyhow::Error { if e.kind() == io::ErrorKind::InvalidData { // Content errors become InvalidData errors anyhow::Error::from(e).context(ErrorMetadata::bad_request("InvalidZip", "invalid zip file")) } else { // S3 errors get mapped into ErrorKind::Other e.into() } } fn map_csv_error(e: csv_async::Error) -> anyhow::Error { let pos_line = |pos: &Option<csv_async::Position>| pos.as_ref().map_or(0, |pos| pos.line() as usize); match e.kind() { csv_async::ErrorKind::Utf8 { pos, .. } => { ImportError::CsvInvalidRow(pos_line(pos), e).into() }, csv_async::ErrorKind::UnequalLengths { pos, .. } => { ImportError::CsvRowMissingFields(pos_line(pos)).into() }, // IO and Seek are errors from the underlying stream. csv_async::ErrorKind::Io(_) | csv_async::ErrorKind::Seek // We're not using serde for CSV parsing, so these errors are unexpected | csv_async::ErrorKind::Serialize(_) | csv_async::ErrorKind::Deserialize { .. } => e.into(), _ => e.into(), } } /// Parse and stream units from the imported file, starting with a NewTable /// for each table and then Objects for each object to import into the table. /// stream_body returns the file as streamed bytes. stream_body() can be called /// multiple times to read the file multiple times, for cases where the file /// must be read out of order, e.g. because the _tables table must be imported /// first. /// Objects are yielded with the following guarantees: /// 1. When an Object is yielded, it is in the table corresponding to the most /// recently yielded NewTable. /// 2. When a StorageFileChunk is yielded, it is in the _storage table /// corresponding to the most recently yielded NewTable. /// 3. All StorageFileChunks for a single file are yielded contiguously, in /// order. /// 4. If a table has a GeneratedSchema, the GeneratedSchema will be yielded /// before any Objects in that table. #[try_stream(ok = ImportUnit, error = anyhow::Error)] pub async fn parse_objects( format: ImportFormat, component_path: ComponentPath, storage: Arc<dyn Storage>, fq_object_key: FullyQualifiedObjectKey, ) { let stream_body = || async { storage .get_fq_object(&fq_object_key) .await? .with_context(|| format!("Missing import object {fq_object_key:?}")) }; match format { ImportFormat::Csv(table_name) => { let reader = stream_body().await?; yield ImportUnit::NewTable(component_path, table_name); let mut reader = csv_async::AsyncReader::from_reader(reader.into_reader()); if !reader.has_headers() { anyhow::bail!(ImportError::CsvMissingHeaders); } let field_names = { let headers = reader.headers().await.map_err(map_csv_error)?; headers .iter() .map(|s| { let trimmed = s.trim_matches(' '); let field_name = FieldName::from_str(trimmed) .map_err(|e| ImportError::CsvInvalidHeader(trimmed.to_string(), e))?; Ok(field_name) }) .collect::<anyhow::Result<Vec<_>>>()? }; let mut enumerate_rows = reader.records().enumerate(); while let Some((i, row_r)) = enumerate_rows.next().await { let lineno = i + 1; let parsed_row = row_r .map_err(map_csv_error)? .iter() .map(parse_csv_cell) .collect::<Vec<JsonValue>>(); let mut obj = BTreeMap::new(); if field_names.len() != parsed_row.len() { anyhow::bail!(ImportError::CsvRowMissingFields(lineno)); } for (field_name, value) in field_names.iter().zip(parsed_row.into_iter()) { obj.insert(field_name.to_string(), value); } yield ImportUnit::Object(serde_json::to_value(obj)?); } }, ImportFormat::JsonLines(table_name) => { let mut reader = stream_body().await?.into_reader(); yield ImportUnit::NewTable(component_path, table_name); let mut line = String::new(); let mut lineno = 1; while reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)? > 0 { // Check for UTF-8 BOM at the start of the first line if lineno == 1 && line.as_bytes().starts_with(&[0xEF, 0xBB, 0xBF]) { anyhow::bail!(ImportError::Utf8BomNotSupported); } let v: serde_json::Value = serde_json::from_str(&line) .map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; yield ImportUnit::Object(v); line.clear(); lineno += 1; } }, ImportFormat::JsonArray(table_name) => { let reader = stream_body().await?; yield ImportUnit::NewTable(component_path, table_name); let mut buf = Vec::new(); let mut truncated_reader = reader .into_reader() .take((*TRANSACTION_MAX_USER_WRITE_SIZE_BYTES as u64) + 1); truncated_reader.read_to_end(&mut buf).await?; if buf.len() > *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES { anyhow::bail!(ImportError::JsonArrayTooLarge(buf.len())); } let v: serde_json::Value = { // Check for UTF-8 BOM and reject it if buf.starts_with(&[0xEF, 0xBB, 0xBF]) { anyhow::bail!(ImportError::Utf8BomNotSupported); } serde_json::from_slice(&buf).map_err(ImportError::NotJson)? }; let array = v.as_array().ok_or(ImportError::NotJsonArray)?; for value in array.iter() { yield ImportUnit::Object(value.clone()); } }, ImportFormat::Zip => { let base_component_path = component_path; let zip_reader = StorageZipArchive::open_fq(storage, fq_object_key).await?; let num_entries = zip_reader.entries().count(); { // First pass, all the things we can store in memory: // a. _tables/documents.jsonl // b. _storage/documents.jsonl // c. user_table/generated_schema.jsonl // _tables needs to be imported before user tables so we can // pick table numbers correctly for schema validation. // Each generated schema must be parsed before the corresponding // table/documents.jsonl file, so we correctly infer types from // export-formatted JsonValues. let mut table_metadata: BTreeMap<_, Vec<_>> = BTreeMap::new(); let mut storage_metadata: BTreeMap<_, Vec<_>> = BTreeMap::new(); let mut generated_schemas: BTreeMap<_, Vec<_>> = BTreeMap::new(); for entry in zip_reader.entries() { let documents_table_name = parse_documents_jsonl_table_name(&entry.name, &base_component_path)?; if let Some((component_path, table_name)) = documents_table_name.clone() && table_name == *TABLES_TABLE { let entry_reader = zip_reader.read_entry(entry.clone()); table_metadata.insert( component_path, parse_documents_jsonl(entry, entry_reader, &base_component_path) .try_collect() .await?, ); } else if let Some((component_path, table_name)) = documents_table_name && table_name == *FILE_STORAGE_VIRTUAL_TABLE { let entry_reader = zip_reader.read_entry(entry.clone()); storage_metadata.insert( component_path, parse_documents_jsonl(entry, entry_reader, &base_component_path) .try_collect() .await?, ); } else if let Some((component_path, table_name)) = parse_table_filename( &entry.name, &base_component_path, &GENERATED_SCHEMA_PATTERN, )? { let entry_reader = zip_reader.read_entry(entry.clone()); tracing::info!( "importing zip file containing generated_schema {table_name}" ); let generated_schema = parse_generated_schema(&entry.name, entry_reader).await?; generated_schemas .entry(component_path.clone()) .or_default() .push(ImportUnit::GeneratedSchema( component_path, table_name, generated_schema, )); } } for table_unit in table_metadata.into_values().flatten() { yield table_unit; } for generated_schema_unit in generated_schemas.into_values().flatten() { yield generated_schema_unit; } for (component_path, storage_metadata) in storage_metadata { if !storage_metadata.is_empty() { // Yield NewTable for _storage and Object for each storage file's metadata. for storage_unit in storage_metadata { yield storage_unit; } // Yield StorageFileChunk for each file in this component. for (i, entry) in zip_reader.entries().enumerate() { if let Some((file_component_path, storage_id)) = parse_storage_filename(&entry.name, &base_component_path)? && file_component_path == component_path { let entry_reader = zip_reader.read_entry(entry.clone()); tracing::info!( "importing zip file containing storage file {} [{i}/{}]", storage_id.encode(), num_entries ); let mut byte_stream = ReaderStream::new(entry_reader); let mut empty = true; while let Some(chunk) = byte_stream.try_next().await.map_err(map_zip_io_error)? { empty = false; yield ImportUnit::StorageFileChunk(storage_id, chunk); } // In case it's an empty file, make sure we send at // least one chunk. if empty { yield ImportUnit::StorageFileChunk(storage_id, Bytes::new()); } } } } } } // Second pass: user tables. for entry in zip_reader.entries() { if let Some((_, table_name)) = parse_documents_jsonl_table_name(&entry.name, &base_component_path)? && !table_name.is_system() { let entry_reader = zip_reader.read_entry(entry.clone()); let stream = parse_documents_jsonl(entry, entry_reader, &base_component_path); pin_mut!(stream); while let Some(unit) = stream.try_next().await? { yield unit; } } } }, } } pub fn parse_component_path( mut filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<ComponentPath> { let mut component_names = Vec::new(); while let Some(captures) = COMPONENT_NAME_PATTERN.captures(filename) { filename = captures.get(1).map_or("", |c| c.as_str()); let component_name_str = captures .get(2) .expect("regex has two capture groups") .as_str(); let component_name: ComponentName = component_name_str.parse().map_err(|e| { ErrorMetadata::bad_request( "InvalidComponentName", format!("component name '{component_name_str}' invalid: {e}"), ) })?; component_names.push(component_name); } component_names.reverse(); let mut component_path = base_component_path.clone(); for component_name in component_names { component_path = component_path.push(component_name); } Ok(component_path) } fn parse_table_filename( filename: &str, base_component_path: &ComponentPath, regex: &Regex, ) -> anyhow::Result<Option<(ComponentPath, TableName)>> { match regex.captures(filename) { None => Ok(None), Some(captures) => { let table_name_str = captures .get(2) .expect("regex has two capture groups") .as_str(); let table_name = table_name_str.parse().map_err(|e| { ErrorMetadata::bad_request( "InvalidTableName", format!("table name '{table_name_str}' invalid: {e}"), ) })?; let prefix = captures.get(1).map_or("", |c| c.as_str()); let component_path = parse_component_path(prefix, base_component_path)?; Ok(Some((component_path, table_name))) }, } } fn parse_storage_filename( filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<Option<(ComponentPath, DeveloperDocumentId)>> { match STORAGE_FILE_PATTERN.captures(filename) { None => Ok(None), Some(captures) => { let storage_id_str = captures .get(2) .expect("regex has two capture groups") .as_str(); if storage_id_str == "documents" { return Ok(None); } let storage_id = DeveloperDocumentId::decode(storage_id_str).map_err(|e| { ErrorMetadata::bad_request( "InvalidStorageId", format!("_storage id '{storage_id_str}' invalid: {e}"), ) })?; let prefix = captures.get(1).map_or("", |c| c.as_str()); let component_path = parse_component_path(prefix, base_component_path)?; Ok(Some((component_path, storage_id))) }, } } fn parse_documents_jsonl_table_name( filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<Option<(ComponentPath, TableName)>> { parse_table_filename(filename, base_component_path, &DOCUMENTS_PATTERN) } #[try_stream(ok = ImportUnit, error = anyhow::Error)] async fn parse_documents_jsonl<'a>( entry: &'a Entry, reader: impl AsyncRead + Unpin + 'a, base_component_path: &'a ComponentPath, ) { let (component_path, table_name) = parse_documents_jsonl_table_name(&entry.name, base_component_path)? .context("expected documents.jsonl file")?; tracing::info!("importing zip file containing table {table_name}"); yield ImportUnit::NewTable(component_path, table_name); let mut line = String::new(); let mut lineno = 1; let mut reader = BufReader::new(reader); while reader .read_line(&mut line) .await .map_err(map_zip_io_error)? > 0 { let v: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; yield ImportUnit::Object(v); line.clear(); lineno += 1; } } async fn parse_generated_schema<T: ShapeConfig>( filename: &str, entry_reader: impl tokio::io::AsyncRead + Unpin, ) -> anyhow::Result<GeneratedSchema<T>> { let mut line = String::new(); let mut lineno = 1; let mut entry_reader = BufReader::new(entry_reader); entry_reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)?; let inferred_type_json: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; let inferred_type = Shape::from_str(inferred_type_json.as_str().with_context(|| { ImportError::InvalidConvexValue( lineno, anyhow::anyhow!("first line of generated_schema must be a string"), ) })?) .map_err(|e| { ErrorMetadata::bad_request( "InvalidGeneratedSchema", format!("cannot parse {filename}: {e:#}"), ) })?; line.clear(); lineno += 1; let mut overrides = BTreeMap::new(); while entry_reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)? > 0 { let mut v: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; let o = v.as_object_mut().with_context(|| { ImportError::InvalidConvexValue(lineno, anyhow::anyhow!("overrides should be object")) })?; if o.len() != 1 { anyhow::bail!(ImportError::InvalidConvexValue( lineno, anyhow::anyhow!("override object should have one item") )); } let (key, value) = o.into_iter().next().context("must have one item")?; let export_context = ExportContext::try_from(value.clone()) .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; overrides.insert( DeveloperDocumentId::decode(key) .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?, export_context, ); line.clear(); lineno += 1; } let generated_schema = GeneratedSchema { inferred_shape: inferred_type, overrides, }; Ok(generated_schema) } // For now, we only parse out floats and strings in CSV files. pub fn parse_csv_cell(s: &str) -> JsonValue { if let Ok(r) = s.parse::<f64>() { return json!(r); } json!(s) } #[cfg(test)] mod tests { use common::components::ComponentPath; use crate::snapshot_import::parse::{ parse_documents_jsonl_table_name, parse_storage_filename, parse_table_filename, GENERATED_SCHEMA_PATTERN, }; #[test] fn test_filename_regex() -> anyhow::Result<()> { let (_, table_name) = parse_documents_jsonl_table_name("users/documents.jsonl", &ComponentPath::root())? .unwrap(); assert_eq!(table_name, "users".parse()?); // Regression test, checking that the '.' is escaped. assert!( parse_documents_jsonl_table_name("users/documentsxjsonl", &ComponentPath::root())? .is_none() ); // When an export is unzipped and re-zipped, sometimes there's a prefix. let (_, table_name) = parse_documents_jsonl_table_name( "snapshot/users/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(table_name, "users".parse()?); let (_, table_name) = parse_table_filename( "users/generated_schema.jsonl", &ComponentPath::root(), &GENERATED_SCHEMA_PATTERN, )? .unwrap(); assert_eq!(table_name, "users".parse()?); let (_, storage_id) = parse_storage_filename( "_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); let (_, storage_id) = parse_storage_filename( "snapshot/_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); // No file extension. let (_, storage_id) = parse_storage_filename( "_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); Ok(()) } #[test] fn test_component_path_regex() -> anyhow::Result<()> { let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "some/parentdir/_components/waitlist/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/_components/ratelimit/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist/ratelimit"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/_components/ratelimit/tbl/documents.jsonl", &"friendship".parse()?, )? .unwrap(); assert_eq!( &String::from(component_path), "friendship/waitlist/ratelimit" ); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "tbl/documents.jsonl", &"waitlist/ratelimit".parse()?, )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist/ratelimit"); assert_eq!(&table_name.to_string(), "tbl"); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server