Skip to main content
Glama

Convex MCP server

Official
by get-convex
fetch.rs13.2 kB
use std::{ net::{ Ipv4Addr, SocketAddrV4, }, time::Duration, }; use axum::{ body::Body, response::Response, routing::{ get, post, }, Router, }; use common::{ assert_obj, http::{ ConvexHttpService, NoopRouteMapper, }, pause::PauseController, runtime::Runtime, testing::{ assert_contains, TestPersistence, }, }; use database::UserFacingModel; use http::{ Request, StatusCode, }; use http_body_util::BodyExt; use itertools::Itertools; use keybroker::Identity; use must_let::must_let; use runtime::{ prod::ProdRuntime, testing::TestRuntime, }; use semver::Version; use serde_json::json; use tokio::select; use value::{ val, ConvexValue, TableNamespace, }; use crate::{ test_helpers::{ UdfTest, UdfTestConfig, }, tests::http_action::{ http_post_request, http_request, }, IsolateConfig, }; #[convex_macro::test_runtime] async fn test_fetch_not_allowed_in_queries(rt: TestRuntime) -> anyhow::Result<()> { let t = UdfTest::default(rt).await?; assert_contains( &t.query_js_error("fetch:fromQuery", assert_obj!()).await?, "Can't use fetch() in queries and mutations. Please consider using an action.", ); Ok(()) } async fn serve(router: Router, port: u16) { let (_shutdown_tx, mut shutdown_rx) = async_broadcast::broadcast::<()>(1); _ = ConvexHttpService::new( router, "http_test", "0.0.1".to_owned(), 1, Duration::from_secs(125), NoopRouteMapper, ) .serve( SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into(), async move { let _ = shutdown_rx.recv().await; }, ) .await } #[convex_macro::prod_rt_test] async fn test_fetch_basic(rt: ProdRuntime) -> anyhow::Result<()> { let redirect_handler = |req: Request<Body>| async move { let target = req .headers() .get("x-location") .cloned() .unwrap_or("/assets/fixture.json".parse().unwrap()); Response::builder() .status(StatusCode::MOVED_PERMANENTLY) .header(hyper::header::LOCATION, target) .body(Body::empty()) .unwrap() }; // Start http server to serve static routes. let router = Router::new() .route( "/assets/fixture.json", get(|| async { Response::builder() .header("content-type", "application/json") .body(Body::from( serde_json::to_string(&json!({ "name": "convex", })) .expect("invalid json"), )) .expect("invalid response") }), ) .route( "/echo_server", post(|req: Request<Body>| async { let (parts, body) = req.into_parts(); let mut response = Response::new(body); response.headers_mut().extend(parts.headers); response }), ) .route("/assets/hello.txt", get(redirect_handler)) .route("/post_redirect_to_get", post(redirect_handler)) .route("/a/b/c", get(redirect_handler)) .route( "/redirect_body", post(|| async { Response::builder() .status(StatusCode::PERMANENT_REDIRECT) .header(hyper::header::LOCATION, "/echo_server") .body(Body::empty()) .unwrap() }), ) .route( "/proxy_reject", get(|| async { Response::builder() .status(StatusCode::PROXY_AUTHENTICATION_REQUIRED) .body(Body::from("Sorry can't do that")) .expect("invalid response") }), ) .route( "/subdir/form_urlencoded.txt", get(|| async { Response::builder() .header("Content-Type", "application/x-www-form-urlencoded") .body(Body::from("field_1=Hi&field_2=%3CConvex%3E")) .expect("invalid response") }), ) .route( "/multipart_form_data.txt", get(|| async { let b = "Preamble\r\n--boundary\t \r\nContent-Disposition: form-data; \ name=\"field_1\"\r\n\r\nvalue_1 \ \r\n\r\n--boundary\r\nContent-Disposition: form-data; \ name=\"field_2\";filename=\"file.js\"\r\nContent-Type: \ text/javascript\r\n\r\nconsole.log(\"Hi\")\r\n--boundary--\r\nEpilogue"; Response::builder() .header("Content-Type", "multipart/form-data;boundary=boundary") .body(Body::from(b)) .expect("invalid response") }), ) .route( "/multipart_form_bad_content_type", get(|| async { let b = "Preamble\r\n--boundary\t \r\nContent-Disposition: form-data; \ name=\"field_1\"\r\n\r\nvalue_1 \ \r\n\r\n--boundary\r\nContent-Disposition: form-data; \ name=\"field_2\";filename=\"file.js\"\r\nContent-Type: \ text/javascript\r\n\r\nconsole.log(\"Hi\")\r\n--boundary--\r\nEpilogue"; Response::builder() .header( "Content-Type", "multipart/form-dataststst;boundary=boundary", ) .body(Body::from(b)) .expect("invalid response") }), ) .route( "/echo_multipart_file", post(|req: Request<Body>| async { let body = req.into_body(); let bytes = body.collect().await.unwrap().to_bytes(); let start = b"--boundary\t \r\n\ Content-Disposition: form-data; name=\"field_1\"\r\n\ \r\n\ value_1 \r\n\ \r\n--boundary\r\n\ Content-Disposition: form-data; name=\"file\"; \ filename=\"file.bin\"\r\n\ Content-Type: application/octet-stream\r\n\ \r\n"; let end = b"\r\n--boundary--\r\n"; let b = [start as &[u8], &bytes[..], end].concat(); Response::builder() .header("content-type", "multipart/form-data;boundary=boundary") .body(Body::from(b)) .expect("invalid response") }), ); let _server = rt.spawn("test_server", serve(router, 4545)); let redirected_router = Router::new().route( "/print_auth", get(|req: Request<Body>| async move { Response::builder() .body(Body::from( serde_json::to_string(&json!({ "auth": match req.headers().get("Authorization") { Some(header) => header.to_str().unwrap(), None => "None", }, })) .expect("invalid json"), )) .expect("invalid response") }), ); let _router = rt.spawn("test_router", serve(redirected_router, 4547)); let t = UdfTest::default(rt).await?; must_let!(let (ConvexValue::String(r), _outcome, log_lines) = t.action_outcome_and_log_lines( "fetch", assert_obj!(), Identity::system(), ).await?); assert_eq!(String::from(r), "success".to_string()); assert!(log_lines.is_empty()); // Interaction between fetch and Request/Response blobs. let response = t .http_action( "http_action", http_request("proxy_response"), Identity::system(), ) .await?; must_let!(let Some(body) = response.body().clone()); assert_eq!(std::str::from_utf8(&body)?, "Hello World"); let round_trip_test = |endpoint: &'static str| async { let response = t .http_action( "http_action", http_post_request(endpoint, "[0,\"Hello\"]".as_bytes().to_vec()), Identity::system(), ) .await?; must_let!(let Some(body) = response.body().clone()); assert_eq!(std::str::from_utf8(&body)?, "[0,\"Hello\"]"); anyhow::Ok(()) }; round_trip_test("round_trip_fetch_blob").await?; round_trip_test("round_trip_fetch_text").await?; round_trip_test("round_trip_fetch_array_buffer").await?; round_trip_test("round_trip_fetch_json").await?; Ok(()) } // TODO(ENG-7281) fix flakes #[ignore] #[convex_macro::prod_rt_test] async fn test_fetch_timing(rt: ProdRuntime) -> anyhow::Result<()> { let rt_ = rt.clone(); // Start http server to serve static routes. let router = Router::new() .route( "/echo_server", post(|req: Request<Body>| async { let (parts, body) = req.into_parts(); let mut response = Response::new(body); response.headers_mut().extend(parts.headers); response }), ) .route( "/timeout", get(|| async move { // To test parallel fetches, we race /timeout against /echo_server. // To make sure /echo_server finishes first, /timeout takes a while. rt_.wait(Duration::from_secs(3)).await; Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from("timeout")) .expect("invalid response") }), ); let _router = rt.spawn("test_router", serve(router, 4546)); let t = UdfTest::default(rt.clone()).await?; t.action("fetch:fetchInParallel", assert_obj!()).await?; let log_lines = t .action_log_lines("fetch:danglingFetch", assert_obj!()) .await?; assert_eq!( log_lines.into_iter().map(|l| l.to_pretty_string_test_only()).collect_vec(), vec![ "[WARN] 1 unawaited operation: [fetch]. Async operations should be awaited or they might not run. See https://docs.convex.dev/functions/actions#dangling-promises for more information." .to_string() ] ); let t = UdfTest::with_timeout(rt, Some(Duration::from_secs(1))).await?; let e = t .action_js_error("fetch:fetchTimeout", assert_obj!()) .await?; assert_contains(&e, "Function execution timed out"); let e = t .action_js_error("fetch:fetchUnendingRequest", assert_obj!()) .await?; assert_contains(&e, "Function execution timed out"); t.action("fetch:fetchBlockedOnTimeouts", assert_obj!()) .await?; Ok(()) } #[convex_macro::prod_rt_test] async fn test_fetch_abort(rt: ProdRuntime) -> anyhow::Result<()> { let t = UdfTest::default_with_config( UdfTestConfig { isolate_config: IsolateConfig::default(), udf_server_version: Version::parse("1000.0.0").unwrap(), }, 3, rt.clone(), ) .await?; let (pause_controller, pause_client) = PauseController::new(); let hold = pause_controller.hold("pause_fetch"); let router = Router::new().route( "/pause", get(|| async move { pause_client.wait("pause_fetch").await; Response::builder().body(Body::from("ok")).unwrap() }), ); let _router = rt.spawn("test_router", serve(router, 4548)); // fetchAbort is an action that fetches from /pause, and in parallel it // waits for triggerAbort. When triggerAbort is called, it aborts the fetch // and waits for the fetch to throw an error, then returns success. let mut result_fut = Box::pin(t.action("fetch:fetchAbort", assert_obj!())); let paused = select! { _ = &mut result_fut => { anyhow::bail!("fetchAbort should pause"); }, paused = hold.wait_for_blocked() => { let paused = paused.unwrap(); trigger_abort(&t).await?; paused }, }; // The action is able to finish before the fetch returns. let result = result_fut.await?; assert_eq!(result, val!("success")); paused.unpause(); Ok(()) } async fn trigger_abort(t: &UdfTest<ProdRuntime, TestPersistence>) -> anyhow::Result<()> { let mut tx = t.database.begin(Identity::system()).await?; // NOTE: you can't run a mutation in prod_rt_test, because time is paused but // raw_mutation advances time. UserFacingModel::new(&mut tx, TableNamespace::Global) .insert("triggerAbort".parse()?, assert_obj!()) .await?; t.database.commit(tx).await?; Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server