Skip to main content
Glama

Convex MCP server

Official
by get-convex
stream.rs5.33 kB
use anyhow::Context; use common::runtime::Runtime; use errors::ErrorMetadata; use futures::{ stream::BoxStream, StreamExt, }; use super::task_executor::TaskExecutor; use crate::environment::action::task::{ FormPart, FormPartFile, TaskResponse, }; // The maximum size of a multipart form body is 20 MiB. // Matches Response body size limit (HTTP_ACTION_BODY_LIMIT) for simplicity. // Multipart forms are parsed in memory (because FormData allows accessing // entries in arbitrary order), so this limit protects the server from // running out of memory. pub const MULTIPART_BODY_LIMIT: u64 = 20 << 20; impl<RT: Runtime> TaskExecutor<RT> { // Sends a stream to javascript by sending TaskResponse::StreamExtend // repeatedly. Any errors are sent with StreamExtend, and the number of bytes // sent are returned on success. pub async fn send_stream( &self, stream_id: uuid::Uuid, stream: Option<BoxStream<'static, anyhow::Result<bytes::Bytes>>>, ) -> Result<usize, ()> { let mut size = 0; if let Some(mut stream) = stream { while let Some(chunk) = stream.next().await { match chunk { Err(e) => { _ = self.task_retval_sender.send(TaskResponse::StreamExtend { stream_id, chunk: Err( ErrorMetadata::bad_request("StreamFailed", e.to_string()).into() ), }); return Err(()); }, Ok(chunk) => { size += chunk.len(); _ = self.task_retval_sender.send(TaskResponse::StreamExtend { stream_id, chunk: Ok(Some(chunk)), }); }, } } } // Successfully sent all chunks. _ = self.task_retval_sender.send(TaskResponse::StreamExtend { stream_id, chunk: Ok(None), }); Ok(size) } pub async fn run_parse_multi_part( &self, content_type: String, request_stream: BoxStream<'static, anyhow::Result<bytes::Bytes>>, ) -> anyhow::Result<Vec<FormPart>> { // NOTE we use multer instead of axum::extract::Multipart because // of https://github.com/tokio-rs/axum/issues/3131 let boundary = multer::parse_boundary(&content_type).with_context(|| { ErrorMetadata::bad_request( "InvalidMultiPartForm", format!("multi-part form invalid boundary: '{content_type}'"), ) })?; let mut multipart = multer::Multipart::with_constraints( request_stream, boundary, multer::Constraints::new() .size_limit(multer::SizeLimit::new().whole_stream(MULTIPART_BODY_LIMIT)), ); let mut results = vec![]; while let Some(field) = multipart.next_field().await.map_err(map_multer_error)? { let name = field .name() .with_context(|| { ErrorMetadata::bad_request( "InvalidMultiPartForm", "multi-part form entry missing 'name'", ) })? .to_string(); let (file, text) = match field.file_name() { None => (None, Some(field.text().await.map_err(map_multer_error)?)), Some(file_name) => { let file_name = Some(file_name.to_string()); let content_type = field.content_type().map(|c| c.to_string()); let data = field .bytes() .await .map_err(map_multer_error)? .to_vec() .into(); ( Some(FormPartFile { content_type, file_name, data, }), None, ) }, }; results.push(FormPart { name, text, file }); } Ok(results) } } fn map_multer_error(e: multer::Error) -> anyhow::Error { match &e { // Internal errors. multer::Error::StreamReadFailed(_) | multer::Error::LockFailure | multer::Error::UnknownField { .. } => e.into(), // User errors. multer::Error::StreamSizeExceeded { .. } | multer::Error::FieldSizeExceeded { .. } | multer::Error::IncompleteFieldData { .. } | multer::Error::IncompleteHeaders | multer::Error::ReadHeaderFailed(_) | multer::Error::DecodeHeaderName { .. } | multer::Error::DecodeHeaderValue { .. } | multer::Error::IncompleteStream | multer::Error::NoMultipart | multer::Error::DecodeContentType(_) | multer::Error::NoBoundary => ErrorMetadata::bad_request( "InvalidMultiPartForm", format!("invalid multi-part form: '{e}'"), ) .into(), _ => e.into(), } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server