http_routing.rs•16.9 kB
use anyhow::Context;
use common::{
components::{
CanonicalizedComponentFunctionPath,
ComponentPath,
Reference,
},
errors::JsError,
execution_context::ExecutionContext,
http::RoutedHttpPath,
log_lines::{
run_function_and_collect_log_lines,
LogLevel,
LogLine,
SystemLogMetadata,
},
runtime::{
tokio_spawn,
Runtime,
},
types::{
FunctionCaller,
ModuleEnvironment,
RoutableMethod,
},
RequestId,
};
use database::{
BootstrapComponentsModel,
Transaction,
};
use errors::ErrorMetadataAnyhowExt;
use function_runner::server::HttpActionMetadata;
use futures::{
FutureExt,
StreamExt,
};
use http::StatusCode;
use keybroker::Identity;
use model::modules::{
ModuleModel,
HTTP_MODULE_PATH,
};
use sync_types::{
CanonicalizedUdfPath,
FunctionName,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use udf::{
validation::ValidatedHttpPath,
HttpActionOutcome,
HttpActionRequest,
HttpActionRequestHead,
HttpActionResponsePart,
HttpActionResponseStreamer,
HttpActionResult,
};
use usage_tracking::FunctionUsageTracker;
use value::sha256::Sha256Digest;
use super::ApplicationFunctionRunner;
use crate::function_log::HttpActionStatusCode;
impl<RT: Runtime> ApplicationFunctionRunner<RT> {
#[fastrace::trace]
pub async fn run_http_action(
&self,
request_id: RequestId,
http_request: HttpActionRequest,
mut response_streamer: HttpActionResponseStreamer,
identity: Identity,
caller: FunctionCaller,
) -> anyhow::Result<udf::HttpActionResult> {
let start = self.runtime.monotonic_now();
let usage_tracker = FunctionUsageTracker::new();
let mut tx = self
.database
.begin_with_usage(identity.clone(), usage_tracker.clone())
.await?;
let (component_path, routed_path) =
match self.route_http_action(&mut tx, &http_request.head).await? {
Some(r) => r,
None => {
drop(tx);
let response_parts = udf::HttpActionResponsePart::from_text(
StatusCode::NOT_FOUND,
"This Convex deployment does not have HTTP actions enabled.".to_string(),
);
for part in response_parts {
response_streamer.send_part(part)??;
}
return Ok(udf::HttpActionResult::Streamed);
},
};
let path = CanonicalizedComponentFunctionPath {
component: component_path,
udf_path: CanonicalizedUdfPath::new(
HTTP_MODULE_PATH.clone(),
FunctionName::default_export(),
),
};
let validated_path = match ValidatedHttpPath::new(&mut tx, path).await? {
Ok(validated_path) => validated_path,
Err(e) => return Ok(udf::HttpActionResult::Error(e)),
};
let unix_timestamp = self.runtime.unix_timestamp();
let context = ExecutionContext::new(request_id, &caller);
let request_head = http_request.head.clone();
let route = http_request.head.route_for_failure();
let (log_line_sender, log_line_receiver) = mpsc::unbounded_channel();
// We want to intercept the response head so we can log it on function
// completion, but still stream the response as it comes in, so we
// create another channel here.
let (isolate_response_sender, isolate_response_receiver) = mpsc::unbounded_channel();
let http_response_streamer = HttpActionResponseStreamer::new(isolate_response_sender);
let outcome_future = self
.isolate_functions
.execute_http_action(
tx,
log_line_sender,
HttpActionMetadata {
http_response_streamer,
http_module_path: validated_path,
routed_path,
http_request,
},
context.clone(),
)
.boxed();
let context_ = context.clone();
// Stream `response_stream` from the isolate, to `response_streamer`
// in the application.
let stream_result_fut = tokio_spawn(
"http_action_response_streamer",
Self::forward_http_action_stream(
UnboundedReceiverStream::new(isolate_response_receiver),
response_streamer,
),
);
// NOTE: this will run in parallel with `stream_result_fut`, which is
// running on a spawned coroutine.
let send_log_line = |log_line| {
self.function_log.log_http_action_progress(
route.clone(),
unix_timestamp,
context_.clone(),
vec![log_line].into(),
// http actions are always run in Isolate
ModuleEnvironment::Isolate,
)
};
let (outcome_result, mut log_lines) =
run_function_and_collect_log_lines(outcome_future, log_line_receiver, &send_log_line)
.await;
let (result_for_logging, response_sha256) = stream_result_fut.await??;
match outcome_result {
Ok(outcome) => {
let result = outcome.result.clone();
let result_for_logging = match &result {
HttpActionResult::Error(e) => Err(e.clone()),
HttpActionResult::Streamed => Ok(result_for_logging.ok_or_else(|| {
anyhow::anyhow!(
"Result should be populated for successfully completed HTTP action"
)
})?),
};
self.function_log
.log_http_action(
outcome,
result_for_logging,
log_lines,
start.elapsed(),
caller,
usage_tracker,
context,
response_sha256,
)
.await;
Ok(result)
},
Err(e) if e.is_deterministic_user_error() || e.is_client_disconnect() => {
let is_client_disconnect = e.is_client_disconnect();
let js_err = JsError::from_error(e);
match result_for_logging {
Some(r) => {
let outcome = HttpActionOutcome::new(
None,
request_head,
identity.into(),
unix_timestamp,
HttpActionResult::Streamed,
None,
None,
);
let new_log_line = LogLine::new_system_log_line(
if is_client_disconnect {
// Not developer's fault, but we should let them know
// since it indicates there will be no more logs or
// response parts.
LogLevel::Info
} else {
LogLevel::Warn
},
vec![js_err.to_string()],
outcome.unix_timestamp,
SystemLogMetadata {
code: if is_client_disconnect {
"info:httpActionClientDisconnect".to_string()
} else {
"error:httpAction".to_string()
},
},
);
send_log_line(new_log_line.clone());
log_lines.push(new_log_line);
self.function_log
.log_http_action(
outcome.clone(),
Ok(r),
log_lines,
start.elapsed(),
caller,
usage_tracker,
context,
response_sha256,
)
.await;
Ok(HttpActionResult::Streamed)
},
None => {
let result = udf::HttpActionResult::Error(js_err.clone());
let outcome = HttpActionOutcome::new(
None,
request_head,
identity.into(),
unix_timestamp,
result.clone(),
None,
None,
);
self.function_log
.log_http_action(
outcome.clone(),
Err(js_err),
log_lines,
start.elapsed(),
caller,
usage_tracker,
context,
response_sha256,
)
.await;
Ok(result)
},
}
},
Err(e) => {
self.function_log
.log_http_action_system_error(
&e,
request_head,
identity.into(),
start,
caller,
log_lines,
context,
response_sha256,
)
.await;
Err(e)
},
}
}
// Forwards from `response_stream` to `response_streamer`.
async fn forward_http_action_stream(
mut response_stream: UnboundedReceiverStream<HttpActionResponsePart>,
mut response_streamer: HttpActionResponseStreamer,
) -> anyhow::Result<(Option<HttpActionStatusCode>, Sha256Digest)> {
let mut result_for_logging = None;
loop {
// If the `response_stream` is still open, detect when `response_streamer`
// closes, which makes us close `response_stream`.
// This signals to the isolate that the client has disconnected.
let streamer_close = if response_stream.as_ref().is_closed() {
// We need the conditional to avoid a busy-loop. If `response_streamer`
// is closed, `response_streamer.sender.closed()` will resolve immediately,
// so we make sure that only happens once.
futures::future::Either::Left(futures::future::pending())
} else {
futures::future::Either::Right(response_streamer.sender.closed())
};
tokio::select! {
_ = streamer_close => {
response_stream.close();
},
part = response_stream.next() => {
let Some(part) = part else {
break;
};
if let HttpActionResponsePart::Head(h) = &part {
result_for_logging = Some(HttpActionStatusCode(h.status));
}
// If the `response_streamer` is closed, the inner Result
// will have an error. That's fine; we want to keep letting
// the isolate send data and `response_streamer` will keep
// accumulating data into its hash.
let _ = response_streamer.send_part(part)?;
}
}
}
let response_sha256 = response_streamer.complete();
Ok((result_for_logging, response_sha256))
}
async fn route_http_action(
&self,
tx: &mut Transaction<RT>,
head: &HttpActionRequestHead,
) -> anyhow::Result<Option<(ComponentPath, RoutedHttpPath)>> {
let mut model = BootstrapComponentsModel::new(tx);
let mut current_component_path = ComponentPath::root();
let mut routed_path = RoutedHttpPath(head.url.path().to_string());
let method = RoutableMethod::try_from(head.method.clone())?;
loop {
let (definition_id, current_id) =
model.must_component_path_to_ids(¤t_component_path)?;
let definition = model.load_definition_metadata(definition_id).await?;
let http_routes = ModuleModel::new(model.tx)
.get_http(current_id)
.await?
.map(|m| {
m.into_value()
.analyze_result
.context("Missing analyze result for http module")?
.http_routes
.context("Missing http routes")
})
.transpose()?;
if http_routes.is_none() && definition.http_mounts.is_empty() {
return Ok(None);
}
// First, try matching an exact path from `http.js`, which will always
// be the most specific match.
if let Some(ref http_routes) = http_routes {
if http_routes.route_exact(&routed_path[..], method) {
return Ok(Some((current_component_path, routed_path)));
}
}
// Next, try finding the most specific prefix match from both `http.js`
// and the component-level mounts.
enum CurrentMatch<'a> {
CurrentHttpJs,
MountedComponent(&'a Reference),
}
let mut longest_match = None;
if let Some(ref http_routes) = http_routes {
if let Some(match_suffix) = http_routes.route_prefix(&routed_path, method) {
longest_match = Some((match_suffix, CurrentMatch::CurrentHttpJs));
}
}
for (mount_path, reference) in &definition.http_mounts {
let Some(match_suffix) = routed_path.strip_prefix(&mount_path[..]) else {
continue;
};
let new_match = RoutedHttpPath(format!("/{match_suffix}"));
if let Some((ref existing_suffix, _)) = longest_match {
// If the existing longest match has a shorter suffix, then it
// matches a longer prefix.
if existing_suffix.len() < match_suffix.len() {
continue;
}
}
longest_match = Some((new_match, CurrentMatch::MountedComponent(reference)));
}
match longest_match {
None => {
// If we couldn't match the route, forward the request to the current
// component's `http.js` if present. This lets the JS layer uniformly handle
// 404s when defined.
if http_routes.is_some() {
return Ok(Some((
current_component_path,
RoutedHttpPath(routed_path.to_string()),
)));
} else {
return Ok(None);
}
},
Some((_, CurrentMatch::CurrentHttpJs)) => {
return Ok(Some((
current_component_path,
RoutedHttpPath(routed_path.to_string()),
)));
},
Some((match_suffix, CurrentMatch::MountedComponent(reference))) => {
let Reference::ChildComponent {
component: name,
attributes,
} = reference
else {
anyhow::bail!("Invalid reference in component definition: {reference:?}");
};
anyhow::ensure!(attributes.is_empty());
current_component_path = current_component_path.join(name.clone());
routed_path = match_suffix;
continue;
},
}
}
}
}