Skip to main content
Glama

Convex MCP server

Official
by get-convex
isolate_worker.rs14.1 kB
use async_trait::async_trait; use common::{ runtime::Runtime, types::UdfType, }; use deno_core::v8; use futures::FutureExt; use sync_types::CanonicalizedUdfPath; use tracing::Instrument; use udf::HttpActionResult; use crate::{ client::{ IsolateWorker, Request, RequestType, SharedIsolateHeapStats, PAUSE_REQUEST, }, environment::{ action::ActionEnvironment, analyze::AnalyzeEnvironment, auth_config::AuthConfigEnvironment, component_definitions::{ AppDefinitionEvaluator, ComponentInitializerEvaluator, }, schema::SchemaEnvironment, udf::DatabaseUdfEnvironment, }, isolate::Isolate, metrics::{ finish_service_request_timer, is_developer_ok, record_component_function_path, service_request_timer, RequestStatus, }, IsolateConfig, }; #[derive(Clone)] pub(crate) struct FunctionRunnerIsolateWorker<RT: Runtime> { rt: RT, isolate_config: IsolateConfig, } impl<RT: Runtime> FunctionRunnerIsolateWorker<RT> { pub(crate) fn new(rt: RT, isolate_config: IsolateConfig) -> Self { Self { rt, isolate_config } } async fn handle_request_inner( &self, isolate: &mut Isolate<RT>, v8_context: v8::Global<v8::Context>, isolate_clean: &mut bool, Request { client_id, inner, parent_trace: _, }: Request<RT>, heap_stats: SharedIsolateHeapStats, ) -> String { match inner { RequestType::Udf { request, environment_data, mut response, queue_timer, reactor_depth, udf_callback, function_started_sender, } => { drop(queue_timer); // TODO: Add metrics with funrun tagging let timer = service_request_timer(&request.udf_type); record_component_function_path(request.path_and_args.path()); let udf_path = request.path_and_args.path().udf_path.to_owned(); let environment = DatabaseUdfEnvironment::new( self.rt.clone(), environment_data, heap_stats.clone(), request, reactor_depth, udf_callback, client_id.clone(), ); let r = environment .run( client_id, isolate, v8_context, isolate_clean, response.closed().boxed(), function_started_sender, ) .await; let status = match &r { Ok((_tx, outcome)) => { if is_developer_ok(outcome) { RequestStatus::Success } else { RequestStatus::DeveloperError } }, Err(_) => RequestStatus::SystemError, }; finish_service_request_timer(timer, status); let _ = response.send(r); format!("UDF: {udf_path:?}") }, RequestType::Action { request, environment_data, mut response, queue_timer, action_callbacks, fetch_client, log_line_sender, function_started_sender, } => { drop(queue_timer); let timer = service_request_timer(&UdfType::Action); let path = request.params.path_and_args.path(); record_component_function_path(path); let udf_path = path.udf_path.to_owned(); let component = path.component.to_owned(); let environment = ActionEnvironment::new( self.rt.clone(), component, environment_data, request.identity, request.transaction, action_callbacks, fetch_client, log_line_sender, None, heap_stats.clone(), request.context, ); let r = environment .run_action( client_id, isolate, v8_context, isolate_clean, request.params.clone(), response.closed().boxed(), function_started_sender, ) .await; let status = match &r { Ok(outcome) => { if outcome.result.is_ok() { RequestStatus::Success } else { RequestStatus::DeveloperError } }, Err(_) => RequestStatus::SystemError, }; finish_service_request_timer(timer, status); let _ = response.send(r); format!("Action: {udf_path:?}") }, RequestType::Analyze { udf_config, modules, environment_variables, response, } => { let r = AnalyzeEnvironment::analyze::<RT>( client_id, isolate, v8_context, isolate_clean, udf_config, modules, environment_variables, ) .await; let _ = response.send(r); "Analyze".to_string() }, RequestType::HttpAction { request, environment_data, response, queue_timer, action_callbacks, fetch_client, log_line_sender, http_response_streamer, function_started_sender, } => { drop(queue_timer); let timer = service_request_timer(&UdfType::HttpAction); let udf_path: CanonicalizedUdfPath = request.http_module_path.path().udf_path.clone(); record_component_function_path(request.http_module_path.path()); let environment = ActionEnvironment::new( self.rt.clone(), request.http_module_path.path().component, environment_data, request.identity, request.transaction, action_callbacks, fetch_client, log_line_sender, Some(http_response_streamer), heap_stats.clone(), request.context, ); let r = environment .run_http_action( client_id, isolate, v8_context, isolate_clean, request.http_module_path, request.routed_path, request.http_request, function_started_sender, ) .await; let status = match &r { Ok(outcome) => match outcome.result { // Note that the stream could potentially encounter errors later HttpActionResult::Streamed => RequestStatus::Success, HttpActionResult::Error(_) => RequestStatus::DeveloperError, }, Err(_) => RequestStatus::SystemError, }; finish_service_request_timer(timer, status); let _ = response.send(r); format!("Http: {udf_path:?}") }, RequestType::EvaluateSchema { schema_bundle, source_map, rng_seed, unix_timestamp, response, } => { let r = SchemaEnvironment::evaluate_schema( client_id, isolate, v8_context, schema_bundle, source_map, rng_seed, unix_timestamp, ) .await; let _ = response.send(r); "EvaluateSchema".to_string() }, RequestType::EvaluateAuthConfig { auth_config_bundle, source_map, environment_variables, response, } => { let r = AuthConfigEnvironment::evaluate_auth_config( client_id, isolate, v8_context, auth_config_bundle, source_map, environment_variables, ) .await; let _ = response.send(r); "EvaluateAuthConfig".to_string() }, RequestType::EvaluateAppDefinitions { app_definition, component_definitions, dependency_graph, user_environment_variables, system_env_vars, response, } => { // AppDefinitionEvaluator doesn't use the prewarmed V8 context // because it uses an arbitrary number of contexts. This call is // rare and not particularly latency-sensitive though drop(v8_context); let env = AppDefinitionEvaluator::new( app_definition, component_definitions, dependency_graph, user_environment_variables, system_env_vars, ); let r = env.evaluate(client_id, isolate).await; let _ = response.send(r); "EvaluateAppDefinitions".to_string() }, RequestType::EvaluateComponentInitializer { evaluated_definitions, path, definition, args, name, response, } => { let env = ComponentInitializerEvaluator::new( evaluated_definitions, path, definition, args, name, ); let r = env.evaluate(client_id, isolate).await; let _ = response.send(r); "EvaluateComponentInitializer".to_string() }, } } } #[async_trait(?Send)] impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> { #[fastrace::trace] async fn handle_request( &self, isolate: &mut Isolate<RT>, v8_context: v8::Global<v8::Context>, isolate_clean: &mut bool, request: Request<RT>, heap_stats: SharedIsolateHeapStats, ) -> String { let pause_client = self.rt.pause_client(); pause_client.wait(PAUSE_REQUEST).await; let client_id = &request.client_id; // Set the scope to be tagged with the client_id just for the duration of // handling the request. It would be nice to get sentry::with_scope to work, but // it uses a synchronous callback and we need `report_error` in the future to // have the client_id tag. sentry::configure_scope(|scope| scope.set_tag("client_id", client_id)); // Also add the tag to tracing so it shows up in DataDog logs. let span = tracing::info_span!("isolate_worker_handle_request", instance_name = client_id); let result = self .handle_request_inner(isolate, v8_context, isolate_clean, request, heap_stats) .instrument(span) .await; sentry::configure_scope(|scope| scope.remove_tag("client_id")); result } fn config(&self) -> &IsolateConfig { &self.isolate_config } fn rt(&self) -> RT { self.rt.clone() } } #[cfg(test)] mod tests { use common::pause::PauseController; use runtime::testing::TestRuntime; use super::FunctionRunnerIsolateWorker; use crate::{ client::PAUSE_RECREATE_CLIENT, test_helpers::{ test_isolate_not_recreated_with_same_client, test_isolate_recreated_with_client_change, }, IsolateConfig, }; #[convex_macro::test_runtime] async fn test_isolate_recreated_with_client_change_function_runner( rt: TestRuntime, pause: PauseController, ) -> anyhow::Result<()> { let isolate_config = IsolateConfig::default(); let hold_guard = pause.hold(PAUSE_RECREATE_CLIENT); let worker = FunctionRunnerIsolateWorker::new(rt.clone(), isolate_config); test_isolate_recreated_with_client_change(rt, worker, hold_guard).await } #[convex_macro::test_runtime] async fn test_isolate_not_recreated_with_same_client_function_runner( rt: TestRuntime, pause: PauseController, ) -> anyhow::Result<()> { let isolate_config = IsolateConfig::default(); let hold_guard = pause.hold(PAUSE_RECREATE_CLIENT); let worker = FunctionRunnerIsolateWorker::new(rt.clone(), isolate_config); test_isolate_not_recreated_with_same_client(rt, worker, hold_guard).await } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server