Skip to main content
Glama

Convex MCP server

Official
by get-convex
http_routing.rs16.9 kB
use anyhow::Context; use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentPath, Reference, }, errors::JsError, execution_context::ExecutionContext, http::RoutedHttpPath, log_lines::{ run_function_and_collect_log_lines, LogLevel, LogLine, SystemLogMetadata, }, runtime::{ tokio_spawn, Runtime, }, types::{ FunctionCaller, ModuleEnvironment, RoutableMethod, }, RequestId, }; use database::{ BootstrapComponentsModel, Transaction, }; use errors::ErrorMetadataAnyhowExt; use function_runner::server::HttpActionMetadata; use futures::{ FutureExt, StreamExt, }; use http::StatusCode; use keybroker::Identity; use model::modules::{ ModuleModel, HTTP_MODULE_PATH, }; use sync_types::{ CanonicalizedUdfPath, FunctionName, }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use udf::{ validation::ValidatedHttpPath, HttpActionOutcome, HttpActionRequest, HttpActionRequestHead, HttpActionResponsePart, HttpActionResponseStreamer, HttpActionResult, }; use usage_tracking::FunctionUsageTracker; use value::sha256::Sha256Digest; use super::ApplicationFunctionRunner; use crate::function_log::HttpActionStatusCode; impl<RT: Runtime> ApplicationFunctionRunner<RT> { #[fastrace::trace] pub async fn run_http_action( &self, request_id: RequestId, http_request: HttpActionRequest, mut response_streamer: HttpActionResponseStreamer, identity: Identity, caller: FunctionCaller, ) -> anyhow::Result<udf::HttpActionResult> { let start = self.runtime.monotonic_now(); let usage_tracker = FunctionUsageTracker::new(); let mut tx = self .database .begin_with_usage(identity.clone(), usage_tracker.clone()) .await?; let (component_path, routed_path) = match self.route_http_action(&mut tx, &http_request.head).await? { Some(r) => r, None => { drop(tx); let response_parts = udf::HttpActionResponsePart::from_text( StatusCode::NOT_FOUND, "This Convex deployment does not have HTTP actions enabled.".to_string(), ); for part in response_parts { response_streamer.send_part(part)??; } return Ok(udf::HttpActionResult::Streamed); }, }; let path = CanonicalizedComponentFunctionPath { component: component_path, udf_path: CanonicalizedUdfPath::new( HTTP_MODULE_PATH.clone(), FunctionName::default_export(), ), }; let validated_path = match ValidatedHttpPath::new(&mut tx, path).await? { Ok(validated_path) => validated_path, Err(e) => return Ok(udf::HttpActionResult::Error(e)), }; let unix_timestamp = self.runtime.unix_timestamp(); let context = ExecutionContext::new(request_id, &caller); let request_head = http_request.head.clone(); let route = http_request.head.route_for_failure(); let (log_line_sender, log_line_receiver) = mpsc::unbounded_channel(); // We want to intercept the response head so we can log it on function // completion, but still stream the response as it comes in, so we // create another channel here. let (isolate_response_sender, isolate_response_receiver) = mpsc::unbounded_channel(); let http_response_streamer = HttpActionResponseStreamer::new(isolate_response_sender); let outcome_future = self .isolate_functions .execute_http_action( tx, log_line_sender, HttpActionMetadata { http_response_streamer, http_module_path: validated_path, routed_path, http_request, }, context.clone(), ) .boxed(); let context_ = context.clone(); // Stream `response_stream` from the isolate, to `response_streamer` // in the application. let stream_result_fut = tokio_spawn( "http_action_response_streamer", Self::forward_http_action_stream( UnboundedReceiverStream::new(isolate_response_receiver), response_streamer, ), ); // NOTE: this will run in parallel with `stream_result_fut`, which is // running on a spawned coroutine. let send_log_line = |log_line| { self.function_log.log_http_action_progress( route.clone(), unix_timestamp, context_.clone(), vec![log_line].into(), // http actions are always run in Isolate ModuleEnvironment::Isolate, ) }; let (outcome_result, mut log_lines) = run_function_and_collect_log_lines(outcome_future, log_line_receiver, &send_log_line) .await; let (result_for_logging, response_sha256) = stream_result_fut.await??; match outcome_result { Ok(outcome) => { let result = outcome.result.clone(); let result_for_logging = match &result { HttpActionResult::Error(e) => Err(e.clone()), HttpActionResult::Streamed => Ok(result_for_logging.ok_or_else(|| { anyhow::anyhow!( "Result should be populated for successfully completed HTTP action" ) })?), }; self.function_log .log_http_action( outcome, result_for_logging, log_lines, start.elapsed(), caller, usage_tracker, context, response_sha256, ) .await; Ok(result) }, Err(e) if e.is_deterministic_user_error() || e.is_client_disconnect() => { let is_client_disconnect = e.is_client_disconnect(); let js_err = JsError::from_error(e); match result_for_logging { Some(r) => { let outcome = HttpActionOutcome::new( None, request_head, identity.into(), unix_timestamp, HttpActionResult::Streamed, None, None, ); let new_log_line = LogLine::new_system_log_line( if is_client_disconnect { // Not developer's fault, but we should let them know // since it indicates there will be no more logs or // response parts. LogLevel::Info } else { LogLevel::Warn }, vec![js_err.to_string()], outcome.unix_timestamp, SystemLogMetadata { code: if is_client_disconnect { "info:httpActionClientDisconnect".to_string() } else { "error:httpAction".to_string() }, }, ); send_log_line(new_log_line.clone()); log_lines.push(new_log_line); self.function_log .log_http_action( outcome.clone(), Ok(r), log_lines, start.elapsed(), caller, usage_tracker, context, response_sha256, ) .await; Ok(HttpActionResult::Streamed) }, None => { let result = udf::HttpActionResult::Error(js_err.clone()); let outcome = HttpActionOutcome::new( None, request_head, identity.into(), unix_timestamp, result.clone(), None, None, ); self.function_log .log_http_action( outcome.clone(), Err(js_err), log_lines, start.elapsed(), caller, usage_tracker, context, response_sha256, ) .await; Ok(result) }, } }, Err(e) => { self.function_log .log_http_action_system_error( &e, request_head, identity.into(), start, caller, log_lines, context, response_sha256, ) .await; Err(e) }, } } // Forwards from `response_stream` to `response_streamer`. async fn forward_http_action_stream( mut response_stream: UnboundedReceiverStream<HttpActionResponsePart>, mut response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<(Option<HttpActionStatusCode>, Sha256Digest)> { let mut result_for_logging = None; loop { // If the `response_stream` is still open, detect when `response_streamer` // closes, which makes us close `response_stream`. // This signals to the isolate that the client has disconnected. let streamer_close = if response_stream.as_ref().is_closed() { // We need the conditional to avoid a busy-loop. If `response_streamer` // is closed, `response_streamer.sender.closed()` will resolve immediately, // so we make sure that only happens once. futures::future::Either::Left(futures::future::pending()) } else { futures::future::Either::Right(response_streamer.sender.closed()) }; tokio::select! { _ = streamer_close => { response_stream.close(); }, part = response_stream.next() => { let Some(part) = part else { break; }; if let HttpActionResponsePart::Head(h) = &part { result_for_logging = Some(HttpActionStatusCode(h.status)); } // If the `response_streamer` is closed, the inner Result // will have an error. That's fine; we want to keep letting // the isolate send data and `response_streamer` will keep // accumulating data into its hash. let _ = response_streamer.send_part(part)?; } } } let response_sha256 = response_streamer.complete(); Ok((result_for_logging, response_sha256)) } async fn route_http_action( &self, tx: &mut Transaction<RT>, head: &HttpActionRequestHead, ) -> anyhow::Result<Option<(ComponentPath, RoutedHttpPath)>> { let mut model = BootstrapComponentsModel::new(tx); let mut current_component_path = ComponentPath::root(); let mut routed_path = RoutedHttpPath(head.url.path().to_string()); let method = RoutableMethod::try_from(head.method.clone())?; loop { let (definition_id, current_id) = model.must_component_path_to_ids(&current_component_path)?; let definition = model.load_definition_metadata(definition_id).await?; let http_routes = ModuleModel::new(model.tx) .get_http(current_id) .await? .map(|m| { m.into_value() .analyze_result .context("Missing analyze result for http module")? .http_routes .context("Missing http routes") }) .transpose()?; if http_routes.is_none() && definition.http_mounts.is_empty() { return Ok(None); } // First, try matching an exact path from `http.js`, which will always // be the most specific match. if let Some(ref http_routes) = http_routes { if http_routes.route_exact(&routed_path[..], method) { return Ok(Some((current_component_path, routed_path))); } } // Next, try finding the most specific prefix match from both `http.js` // and the component-level mounts. enum CurrentMatch<'a> { CurrentHttpJs, MountedComponent(&'a Reference), } let mut longest_match = None; if let Some(ref http_routes) = http_routes { if let Some(match_suffix) = http_routes.route_prefix(&routed_path, method) { longest_match = Some((match_suffix, CurrentMatch::CurrentHttpJs)); } } for (mount_path, reference) in &definition.http_mounts { let Some(match_suffix) = routed_path.strip_prefix(&mount_path[..]) else { continue; }; let new_match = RoutedHttpPath(format!("/{match_suffix}")); if let Some((ref existing_suffix, _)) = longest_match { // If the existing longest match has a shorter suffix, then it // matches a longer prefix. if existing_suffix.len() < match_suffix.len() { continue; } } longest_match = Some((new_match, CurrentMatch::MountedComponent(reference))); } match longest_match { None => { // If we couldn't match the route, forward the request to the current // component's `http.js` if present. This lets the JS layer uniformly handle // 404s when defined. if http_routes.is_some() { return Ok(Some(( current_component_path, RoutedHttpPath(routed_path.to_string()), ))); } else { return Ok(None); } }, Some((_, CurrentMatch::CurrentHttpJs)) => { return Ok(Some(( current_component_path, RoutedHttpPath(routed_path.to_string()), ))); }, Some((match_suffix, CurrentMatch::MountedComponent(reference))) => { let Reference::ChildComponent { component: name, attributes, } = reference else { anyhow::bail!("Invalid reference in component definition: {reference:?}"); }; anyhow::ensure!(attributes.is_empty()); current_component_path = current_component_path.join(name.clone()); routed_path = match_suffix; continue; }, } } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server