Skip to main content
Glama
parse.rs21 kB
use std::{ collections::BTreeMap, io, str::FromStr, sync::{ Arc, LazyLock, }, }; use anyhow::Context; use bytes::Bytes; use common::{ bootstrap_model::tables::TABLES_TABLE, components::{ ComponentName, ComponentPath, }, knobs::TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, types::{ FieldName, FullyQualifiedObjectKey, }, }; use errors::ErrorMetadata; use futures::{ stream::{ self, BoxStream, }, AsyncBufReadExt, AsyncReadExt, StreamExt, TryStreamExt, }; use futures_async_stream::{ try_stream, try_stream_block, }; use model::{ file_storage::FILE_STORAGE_VIRTUAL_TABLE, snapshot_imports::types::ImportFormat, }; use regex::Regex; use serde_json::{ json, Value as JsonValue, }; use shape_inference::{ export_context::{ ExportContext, GeneratedSchema, }, ProdConfig, Shape, ShapeConfig, }; use storage::{ Storage, StorageExt, }; use storage_zip_reader::StorageZipArchive; use tokio::io::{ AsyncBufReadExt as _, AsyncRead, BufReader, }; use tokio_util::io::ReaderStream; use value::{ id_v6::DeveloperDocumentId, TableName, }; use crate::snapshot_import::import_error::ImportError; pub type ImportDocumentStream = BoxStream<'static, anyhow::Result<JsonValue>>; pub type ImportStorageFileStream = BoxStream<'static, anyhow::Result<Bytes>>; pub struct ParsedImport { pub generated_schemas: Vec<(ComponentPath, TableName, GeneratedSchema<ProdConfig>)>, pub documents: Vec<(ComponentPath, TableName, ImportDocumentStream)>, pub storage_files: Vec<(ComponentPath, DeveloperDocumentId, ImportStorageFileStream)>, } impl ParsedImport { fn single_table( component_path: ComponentPath, table_name: TableName, documents: ImportDocumentStream, ) -> Self { Self { generated_schemas: vec![], documents: vec![(component_path, table_name, documents)], storage_files: vec![], } } } static COMPONENT_NAME_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?_components/([^/]+)/$").unwrap()); static GENERATED_SCHEMA_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/generated_schema\.jsonl$").unwrap()); static DOCUMENTS_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^(.*/)?([^/]+)/documents\.jsonl$").unwrap()); // _storage/(ID) with optional ignored prefix and extension like // snapshot/_storage/(ID).png static STORAGE_FILE_PATTERN: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(.*/)?_storage/([^/.]+)(?:\.[^/]+)?$").unwrap()); fn map_zip_io_error(e: io::Error) -> anyhow::Error { if e.kind() == io::ErrorKind::InvalidData { // Content errors become InvalidData errors anyhow::Error::from(e).context(ErrorMetadata::bad_request("InvalidZip", "invalid zip file")) } else { // S3 errors get mapped into ErrorKind::Other e.into() } } fn map_csv_error(e: csv_async::Error) -> anyhow::Error { let pos_line = |pos: &Option<csv_async::Position>| pos.as_ref().map_or(0, |pos| pos.line()); match e.kind() { csv_async::ErrorKind::Utf8 { pos, .. } => { ImportError::CsvInvalidRow(pos_line(pos), e).into() }, csv_async::ErrorKind::UnequalLengths { pos, .. } => { ImportError::CsvRowMissingFields(pos_line(pos)).into() }, // IO and Seek are errors from the underlying stream. csv_async::ErrorKind::Io(_) | csv_async::ErrorKind::Seek // We're not using serde for CSV parsing, so these errors are unexpected | csv_async::ErrorKind::Serialize(_) | csv_async::ErrorKind::Deserialize { .. } => e.into(), _ => e.into(), } } /// Parse the imported file, returning separate streams for each table or /// storage file. pub async fn parse_import_file( format: ImportFormat, component_path: ComponentPath, storage: Arc<dyn Storage>, fq_object_key: FullyQualifiedObjectKey, ) -> anyhow::Result<ParsedImport> { let stream_body = || async { storage .get_fq_object(&fq_object_key) .await? .with_context(|| format!("Missing import object {fq_object_key:?}")) }; match format { ImportFormat::Csv(table_name) => Ok(ParsedImport::single_table( component_path, table_name, parse_csv_import(stream_body().await?).boxed(), )), ImportFormat::JsonLines(table_name) => { let mut reader = stream_body().await?.into_reader(); Ok(ParsedImport::single_table( component_path, table_name, try_stream_block!({ let mut line = String::new(); let mut lineno = 1; while reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)? > 0 { // Check for UTF-8 BOM at the start of the first line if lineno == 1 && line.as_bytes().starts_with(&[0xEF, 0xBB, 0xBF]) { anyhow::bail!(ImportError::Utf8BomNotSupported); } let v: serde_json::Value = serde_json::from_str(&line) .map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; yield v; line.clear(); lineno += 1; } }) .boxed(), )) }, ImportFormat::JsonArray(table_name) => { let reader = stream_body().await?; let mut buf = Vec::new(); let mut truncated_reader = reader .into_reader() .take((*TRANSACTION_MAX_USER_WRITE_SIZE_BYTES as u64) + 1); truncated_reader.read_to_end(&mut buf).await?; if buf.len() > *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES { anyhow::bail!(ImportError::JsonArrayTooLarge(buf.len())); } let v: serde_json::Value = { // Check for UTF-8 BOM and reject it if buf.starts_with(&[0xEF, 0xBB, 0xBF]) { anyhow::bail!(ImportError::Utf8BomNotSupported); } serde_json::from_slice(&buf).map_err(ImportError::NotJson)? }; let JsonValue::Array(array) = v else { anyhow::bail!(ImportError::NotJsonArray) }; Ok(ParsedImport::single_table( component_path, table_name, stream::iter(array.into_iter().map(Ok)).boxed(), )) }, ImportFormat::Zip => { let base_component_path = component_path; let zip_reader = StorageZipArchive::open_fq(storage, fq_object_key).await?; let mut import = ParsedImport { generated_schemas: vec![], documents: vec![], storage_files: vec![], }; for entry in zip_reader.entries() { if let Some((component_path, table_name)) = parse_documents_jsonl_table_name(&entry.name, &base_component_path)? { if table_name.is_system() && table_name != *TABLES_TABLE && table_name != *FILE_STORAGE_VIRTUAL_TABLE { tracing::info!("Skipping system table entry {}", entry.name); continue; } let entry_reader = zip_reader.read_entry(entry.clone()); tracing::info!( "importing zip file containing table {component_path}:{table_name}" ); import.documents.push(( component_path, table_name, parse_documents_jsonl(entry_reader).boxed(), )); } else if let Some((component_path, table_name)) = parse_table_filename( &entry.name, &base_component_path, &GENERATED_SCHEMA_PATTERN, )? { let entry_reader = zip_reader.read_entry(entry.clone()); tracing::info!("importing zip file containing generated_schema {table_name}"); let generated_schema = parse_generated_schema(&entry.name, entry_reader).await?; import .generated_schemas .push((component_path, table_name, generated_schema)); } else if let Some((component_path, storage_id)) = parse_storage_filename(&entry.name, &base_component_path)? { let entry_reader = zip_reader.read_entry(entry.clone()); import.storage_files.push(( component_path, storage_id, ReaderStream::new(entry_reader) .map_err(anyhow::Error::from) .boxed(), )); } } Ok(import) }, } } #[try_stream(ok = JsonValue, error = anyhow::Error)] async fn parse_csv_import(reader: storage::StorageGetStream) { let mut reader = csv_async::AsyncReader::from_reader(reader.into_reader()); if !reader.has_headers() { // TODO: this will never happen. anyhow::bail!(ImportError::CsvMissingHeaders); } let field_names = { let headers = reader.headers().await.map_err(map_csv_error)?; headers .iter() .map(|s| { let trimmed = s.trim_matches(' '); let field_name = FieldName::from_str(trimmed) .map_err(|e| ImportError::CsvInvalidHeader(trimmed.to_string(), e))?; Ok(field_name) }) .collect::<anyhow::Result<Vec<_>>>()? }; let mut lineno = 0; let mut rows = reader.records(); while let Some(row_r) = rows.next().await { lineno += 1; let parsed_row = row_r .map_err(map_csv_error)? .iter() .map(parse_csv_cell) .collect::<Vec<JsonValue>>(); let mut obj = BTreeMap::new(); if field_names.len() != parsed_row.len() { anyhow::bail!(ImportError::CsvRowMissingFields(lineno)); } for (field_name, value) in field_names.iter().zip(parsed_row.into_iter()) { obj.insert(field_name.to_string(), value); } yield serde_json::to_value(obj)?; } } pub fn parse_component_path( mut filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<ComponentPath> { let mut component_names = Vec::new(); while let Some(captures) = COMPONENT_NAME_PATTERN.captures(filename) { filename = captures.get(1).map_or("", |c| c.as_str()); let component_name_str = captures .get(2) .expect("regex has two capture groups") .as_str(); let component_name: ComponentName = component_name_str.parse().map_err(|e| { ErrorMetadata::bad_request( "InvalidComponentName", format!("component name '{component_name_str}' invalid: {e}"), ) })?; component_names.push(component_name); } component_names.reverse(); let mut component_path = base_component_path.clone(); for component_name in component_names { component_path = component_path.push(component_name); } Ok(component_path) } fn parse_table_filename( filename: &str, base_component_path: &ComponentPath, regex: &Regex, ) -> anyhow::Result<Option<(ComponentPath, TableName)>> { match regex.captures(filename) { None => Ok(None), Some(captures) => { let table_name_str = captures .get(2) .expect("regex has two capture groups") .as_str(); let table_name = table_name_str.parse().map_err(|e| { ErrorMetadata::bad_request( "InvalidTableName", format!("table name '{table_name_str}' invalid: {e}"), ) })?; let prefix = captures.get(1).map_or("", |c| c.as_str()); let component_path = parse_component_path(prefix, base_component_path)?; Ok(Some((component_path, table_name))) }, } } fn parse_storage_filename( filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<Option<(ComponentPath, DeveloperDocumentId)>> { match STORAGE_FILE_PATTERN.captures(filename) { None => Ok(None), Some(captures) => { let storage_id_str = captures .get(2) .expect("regex has two capture groups") .as_str(); if storage_id_str == "documents" { return Ok(None); } let storage_id = DeveloperDocumentId::decode(storage_id_str).map_err(|e| { ErrorMetadata::bad_request( "InvalidStorageId", format!("_storage id '{storage_id_str}' invalid: {e}"), ) })?; let prefix = captures.get(1).map_or("", |c| c.as_str()); let component_path = parse_component_path(prefix, base_component_path)?; Ok(Some((component_path, storage_id))) }, } } fn parse_documents_jsonl_table_name( filename: &str, base_component_path: &ComponentPath, ) -> anyhow::Result<Option<(ComponentPath, TableName)>> { parse_table_filename(filename, base_component_path, &DOCUMENTS_PATTERN) } #[try_stream(ok = JsonValue, error = anyhow::Error)] async fn parse_documents_jsonl(reader: impl AsyncRead + Unpin) { let mut line = String::new(); let mut lineno = 1; let mut reader = BufReader::new(reader); while reader .read_line(&mut line) .await .map_err(map_zip_io_error)? > 0 { let v: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; yield v; line.clear(); lineno += 1; } } async fn parse_generated_schema<T: ShapeConfig>( filename: &str, entry_reader: impl tokio::io::AsyncRead + Unpin, ) -> anyhow::Result<GeneratedSchema<T>> { let mut line = String::new(); let mut lineno = 1; let mut entry_reader = BufReader::new(entry_reader); entry_reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)?; let inferred_type_json: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; let inferred_type = Shape::from_str(inferred_type_json.as_str().with_context(|| { ImportError::InvalidConvexValue( lineno, anyhow::anyhow!("first line of generated_schema must be a string"), ) })?) .map_err(|e| { ErrorMetadata::bad_request( "InvalidGeneratedSchema", format!("cannot parse {filename}: {e:#}"), ) })?; line.clear(); lineno += 1; let mut overrides = BTreeMap::new(); while entry_reader .read_line(&mut line) .await .map_err(ImportError::NotUtf8)? > 0 { let mut v: serde_json::Value = serde_json::from_str(&line).map_err(|e| ImportError::JsonInvalidRow(lineno, e))?; let o = v.as_object_mut().with_context(|| { ImportError::InvalidConvexValue(lineno, anyhow::anyhow!("overrides should be object")) })?; if o.len() != 1 { anyhow::bail!(ImportError::InvalidConvexValue( lineno, anyhow::anyhow!("override object should have one item") )); } let (key, value) = o.into_iter().next().context("must have one item")?; let export_context = ExportContext::try_from(value.clone()) .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; overrides.insert( DeveloperDocumentId::decode(key) .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?, export_context, ); line.clear(); lineno += 1; } let generated_schema = GeneratedSchema { inferred_shape: inferred_type, overrides, }; Ok(generated_schema) } // For now, we only parse out floats and strings in CSV files. pub fn parse_csv_cell(s: &str) -> JsonValue { if let Ok(r) = s.parse::<f64>() { return json!(r); } json!(s) } #[cfg(test)] mod tests { use common::components::ComponentPath; use crate::snapshot_import::parse::{ parse_documents_jsonl_table_name, parse_storage_filename, parse_table_filename, GENERATED_SCHEMA_PATTERN, }; #[test] fn test_filename_regex() -> anyhow::Result<()> { let (_, table_name) = parse_documents_jsonl_table_name("users/documents.jsonl", &ComponentPath::root())? .unwrap(); assert_eq!(table_name, "users".parse()?); // Regression test, checking that the '.' is escaped. assert!( parse_documents_jsonl_table_name("users/documentsxjsonl", &ComponentPath::root())? .is_none() ); // When an export is unzipped and re-zipped, sometimes there's a prefix. let (_, table_name) = parse_documents_jsonl_table_name( "snapshot/users/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(table_name, "users".parse()?); let (_, table_name) = parse_table_filename( "users/generated_schema.jsonl", &ComponentPath::root(), &GENERATED_SCHEMA_PATTERN, )? .unwrap(); assert_eq!(table_name, "users".parse()?); let (_, storage_id) = parse_storage_filename( "_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); let (_, storage_id) = parse_storage_filename( "snapshot/_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f.gif", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); // No file extension. let (_, storage_id) = parse_storage_filename( "_storage/kg2ah8mk1xtg35g7zyexyc96e96yr74f", &ComponentPath::root(), )? .unwrap(); assert_eq!(&storage_id.to_string(), "kg2ah8mk1xtg35g7zyexyc96e96yr74f"); Ok(()) } #[test] fn test_component_path_regex() -> anyhow::Result<()> { let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "some/parentdir/_components/waitlist/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/_components/ratelimit/tbl/documents.jsonl", &ComponentPath::root(), )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist/ratelimit"); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "_components/waitlist/_components/ratelimit/tbl/documents.jsonl", &"friendship".parse()?, )? .unwrap(); assert_eq!( &String::from(component_path), "friendship/waitlist/ratelimit" ); assert_eq!(&table_name.to_string(), "tbl"); let (component_path, table_name) = parse_documents_jsonl_table_name( "tbl/documents.jsonl", &"waitlist/ratelimit".parse()?, )? .unwrap(); assert_eq!(&String::from(component_path), "waitlist/ratelimit"); assert_eq!(&table_name.to_string(), "tbl"); Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server