Skip to main content
Glama

Convex MCP server

Official
by get-convex
client.rs57 kB
use std::{ collections::{ BTreeMap, BTreeSet, HashMap, VecDeque, }, env, sync::{ Arc, Once, }, time::Duration, }; use ::metrics::{ IntoLabel, Timer, }; use async_trait::async_trait; use common::{ auth::AuthConfig, bootstrap_model::components::{ definition::ComponentDefinitionMetadata, handles::FunctionHandle, }, codel_queue::{ new_codel_queue_async, CoDelQueueReceiver, CoDelQueueSender, ExpiredInQueue, }, components::{ CanonicalizedComponentFunctionPath, ComponentDefinitionPath, ComponentId, ComponentName, ComponentPath, Resource, }, errors::{ recapture_stacktrace, JsError, }, execution_context::ExecutionContext, fastrace_helpers::{ initialize_root_from_parent, EncodedSpan, }, http::{ fetch::FetchClient, RoutedHttpPath, }, knobs::{ FUNRUN_ISOLATE_ACTIVE_THREADS, HEAP_WORKER_REPORT_INTERVAL_SECONDS, ISOLATE_IDLE_TIMEOUT, ISOLATE_MAX_LIFETIME, ISOLATE_QUEUE_SIZE, REUSE_ISOLATES, V8_THREADS, }, log_lines::LogLine, query_journal::QueryJournal, runtime::{ shutdown_and_join, Runtime, SpawnHandle, UnixTimestamp, }, schemas::DatabaseSchema, static_span, types::{ ModuleEnvironment, UdfType, }, utils::ensure_utc, }; use database::{ shutdown_error, Transaction, }; use deno_core::{ v8, v8::V8, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use fastrace::{ func_path, future::FutureExt as _, }; use file_storage::TransactionalFileStorage; use futures::{ select_biased, stream::{ FuturesUnordered, StreamExt, }, }; use keybroker::{ Identity, KeyBroker, }; use model::{ config::types::ModuleConfig, environment_variables::types::{ EnvVarName, EnvVarValue, }, file_storage::{ types::FileStorageEntry, FileStorageId, }, modules::module_versions::{ AnalyzedModule, ModuleSource, SourceMap, }, udf_config::types::UdfConfig, }; use parking_lot::Mutex; use prometheus::VMHistogram; use serde_json::Value as JsonValue; use sync_types::{ types::SerializedArgs, CanonicalizedModulePath, }; use tokio::sync::{ mpsc, oneshot, }; use tokio_stream::wrappers::ReceiverStream; use udf::{ validation::{ ValidatedHttpPath, ValidatedPathAndArgs, }, ActionOutcome, EvaluateAppDefinitionsResult, FunctionOutcome, FunctionResult, HttpActionOutcome, HttpActionResponseStreamer, }; use usage_tracking::FunctionUsageStats; use value::{ id_v6::DeveloperDocumentId, identifier::Identifier, }; use vector::PublicVectorSearchQueryResult; use crate::{ concurrency_limiter::ConcurrencyLimiter, isolate::{ Isolate, IsolateHeapStats, }, isolate_worker::FunctionRunnerIsolateWorker, metrics::{ self, create_context_timer, log_aggregated_heap_stats, log_pool_max, log_pool_running_count, log_worker_stolen, queue_timer, }, module_cache::ModuleCache, }; // We gather prometheus stats every 30 seconds, so we should make sure we log // active permits more frequently than that. const ACTIVE_CONCURRENCY_PERMITS_LOG_FREQUENCY: Duration = Duration::from_secs(10); pub const PAUSE_RECREATE_CLIENT: &str = "recreate_client"; pub const PAUSE_REQUEST: &str = "pause_request"; pub const NO_AVAILABLE_WORKERS: &str = "There are no available workers to process the request"; #[derive(Clone)] pub struct IsolateConfig { // Name of isolate pool, used in metrics. pub name: &'static str, // Typically, the user timeout is configured based on environment. This // allows us to set an upper bound to it that we use for tests. max_user_timeout: Option<Duration>, limiter: ConcurrencyLimiter, } impl IsolateConfig { pub fn new(name: &'static str, limiter: ConcurrencyLimiter) -> Self { Self { name, max_user_timeout: None, limiter, } } #[cfg(any(test, feature = "testing"))] pub fn new_with_max_user_timeout( name: &'static str, max_user_timeout: Option<Duration>, limiter: ConcurrencyLimiter, ) -> Self { Self { name, max_user_timeout, limiter, } } } #[cfg(any(test, feature = "testing"))] impl Default for IsolateConfig { fn default() -> Self { Self { name: "test", max_user_timeout: None, limiter: ConcurrencyLimiter::unlimited(), } } } #[async_trait] pub trait ActionCallbacks: Send + Sync { // Executing UDFs async fn execute_query( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult>; async fn execute_mutation( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult>; async fn execute_action( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult>; // Storage async fn storage_get_url( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<String>>; async fn storage_delete( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<()>; // Used to get a file content from an action running in v8. async fn storage_get_file_entry( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<(ComponentPath, FileStorageEntry)>>; // Used to store an already uploaded file from an action running in v8. async fn storage_store_file_entry( &self, identity: Identity, component: ComponentId, entry: FileStorageEntry, ) -> anyhow::Result<(ComponentPath, DeveloperDocumentId)>; // Scheduler async fn schedule_job( &self, identity: Identity, scheduling_component: ComponentId, scheduled_path: CanonicalizedComponentFunctionPath, udf_args: SerializedArgs, scheduled_ts: UnixTimestamp, context: ExecutionContext, ) -> anyhow::Result<DeveloperDocumentId>; async fn cancel_job( &self, identity: Identity, virtual_id: DeveloperDocumentId, ) -> anyhow::Result<()>; // Vector Search async fn vector_search( &self, identity: Identity, query: JsonValue, ) -> anyhow::Result<(Vec<PublicVectorSearchQueryResult>, FunctionUsageStats)>; // Components async fn lookup_function_handle( &self, identity: Identity, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath>; async fn create_function_handle( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle>; } pub struct UdfRequest<RT: Runtime> { pub path_and_args: ValidatedPathAndArgs, pub udf_type: UdfType, pub transaction: Transaction<RT>, pub journal: QueryJournal, pub context: ExecutionContext, } pub struct HttpActionRequest<RT: Runtime> { pub http_module_path: ValidatedHttpPath, pub routed_path: RoutedHttpPath, pub http_request: udf::HttpActionRequest, pub transaction: Transaction<RT>, pub identity: Identity, pub context: ExecutionContext, } pub struct ActionRequest<RT: Runtime> { pub params: ActionRequestParams, pub transaction: Transaction<RT>, pub identity: Identity, pub context: ExecutionContext, } #[derive(Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(Debug))] pub struct ActionRequestParams { pub path_and_args: ValidatedPathAndArgs, } #[derive(Clone)] pub struct EnvironmentData<RT: Runtime> { pub key_broker: KeyBroker, pub default_system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, pub file_storage: TransactionalFileStorage<RT>, pub module_loader: Arc<dyn ModuleCache<RT>>, } pub struct Request<RT: Runtime> { pub client_id: String, pub inner: RequestType<RT>, pub parent_trace: EncodedSpan, } impl<RT: Runtime> Request<RT> { pub fn new(client_id: String, inner: RequestType<RT>, parent_trace: EncodedSpan) -> Self { Self { client_id, inner, parent_trace, } } } pub enum RequestType<RT: Runtime> { Udf { request: UdfRequest<RT>, environment_data: EnvironmentData<RT>, response: oneshot::Sender<anyhow::Result<(Transaction<RT>, FunctionOutcome)>>, queue_timer: Timer<VMHistogram>, reactor_depth: usize, udf_callback: Box<dyn UdfCallback<RT>>, function_started_sender: Option<oneshot::Sender<()>>, }, Action { request: ActionRequest<RT>, environment_data: EnvironmentData<RT>, response: oneshot::Sender<anyhow::Result<ActionOutcome>>, queue_timer: Timer<VMHistogram>, action_callbacks: Arc<dyn ActionCallbacks>, fetch_client: Arc<dyn FetchClient>, log_line_sender: mpsc::UnboundedSender<LogLine>, function_started_sender: Option<oneshot::Sender<()>>, }, HttpAction { request: HttpActionRequest<RT>, environment_data: EnvironmentData<RT>, response: oneshot::Sender<anyhow::Result<HttpActionOutcome>>, queue_timer: Timer<VMHistogram>, action_callbacks: Arc<dyn ActionCallbacks>, fetch_client: Arc<dyn FetchClient>, log_line_sender: mpsc::UnboundedSender<LogLine>, http_response_streamer: HttpActionResponseStreamer, function_started_sender: Option<oneshot::Sender<()>>, }, Analyze { udf_config: UdfConfig, modules: BTreeMap<CanonicalizedModulePath, ModuleConfig>, environment_variables: BTreeMap<EnvVarName, EnvVarValue>, response: oneshot::Sender< anyhow::Result<Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>, JsError>>, >, }, EvaluateSchema { schema_bundle: ModuleSource, source_map: Option<SourceMap>, rng_seed: [u8; 32], unix_timestamp: UnixTimestamp, response: oneshot::Sender<anyhow::Result<DatabaseSchema>>, }, EvaluateAuthConfig { auth_config_bundle: ModuleSource, source_map: Option<SourceMap>, environment_variables: BTreeMap<EnvVarName, EnvVarValue>, response: oneshot::Sender<anyhow::Result<AuthConfig>>, }, EvaluateAppDefinitions { app_definition: ModuleConfig, component_definitions: BTreeMap<ComponentDefinitionPath, ModuleConfig>, dependency_graph: BTreeSet<(ComponentDefinitionPath, ComponentDefinitionPath)>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, response: oneshot::Sender<anyhow::Result<EvaluateAppDefinitionsResult>>, }, EvaluateComponentInitializer { evaluated_definitions: BTreeMap<ComponentDefinitionPath, ComponentDefinitionMetadata>, path: ComponentDefinitionPath, definition: ModuleConfig, args: BTreeMap<Identifier, Resource>, name: ComponentName, response: oneshot::Sender<anyhow::Result<BTreeMap<Identifier, Resource>>>, }, } #[async_trait] pub trait UdfCallback<RT: Runtime>: Send + Sync { async fn execute_udf( &self, client_id: String, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, environment_data: EnvironmentData<RT>, transaction: Transaction<RT>, journal: QueryJournal, context: ExecutionContext, reactor_depth: usize, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)>; } impl<RT: Runtime> Request<RT> { fn expire(self, error: ExpiredInQueue) { let error = anyhow::anyhow!(error).context(ErrorMetadata::overloaded( "ExpiredInQueue", "Too many concurrent requests in a short period of time. Spread out your requests out \ over time or throttle them to avoid errors.", )); match self.inner { RequestType::Udf { response, .. } => { let _ = response.send(Err(error)); }, RequestType::Action { response, .. } => { let _ = response.send(Err(error)); }, RequestType::HttpAction { response, .. } => { let _ = response.send(Err(error)); }, RequestType::Analyze { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateSchema { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateAuthConfig { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateAppDefinitions { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateComponentInitializer { response, .. } => { let _ = response.send(Err(error)); }, } } fn reject(self) { let error = ErrorMetadata::rejected_before_execution("WorkerOverloaded", NO_AVAILABLE_WORKERS) .into(); match self.inner { RequestType::Udf { response, .. } => { let _ = response.send(Err(error)); }, RequestType::Action { response, .. } => { let _ = response.send(Err(error)); }, RequestType::HttpAction { response, .. } => { let _ = response.send(Err(error)); }, RequestType::Analyze { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateSchema { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateAuthConfig { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateAppDefinitions { response, .. } => { let _ = response.send(Err(error)); }, RequestType::EvaluateComponentInitializer { response, .. } => { let _ = response.send(Err(error)); }, } } } impl<RT: Runtime> Clone for IsolateClient<RT> { fn clone(&self) -> Self { Self { rt: self.rt.clone(), handles: self.handles.clone(), scheduler: self.scheduler.clone(), sender: self.sender.clone(), concurrency_logger: self.concurrency_logger.clone(), } } } pub fn initialize_v8() { ensure_utc().expect("Failed to setup timezone"); static V8_INIT: Once = Once::new(); V8_INIT.call_once(|| { let _s = static_span!("initialize_v8"); // `deno_core_icudata` internally loads this with proper 16-byte alignment. assert!(v8::icu::set_common_data_74(deno_core_icudata::ICU_DATA).is_ok()); // Calls into `v8::platform::v8__Platform__NewUnprotectedDefaultPlatform` // Can configure with... // - thread_pool_size (default: zero): number of worker threads for background // jobs, picks a reasonable default based on number of cores if set to zero // - idle_task_support (default: false): platform will except idle tasks and // will rely on embedder calling `v8::platform::RunIdleTasks`. Idle tasks are // low-priority tasks that are run with a deadline indicating how long the // scheduler expects to be idle (e.g. unused remainder of a frame budget) // - in_process_stack_dumping (default: false) // - tracing_controller (default: null): if null, the platform creates a // `v8::platform::TracingController` instance and uses it // Why "unprotected"? The "protected" default platform utilizes Memory // Protection Keys (PKU), which requires that all threads utilizing V8 are // descendents of the thread that initialized V8. Unfortunately, this is // not compatible with how Rust tests run and additionally, the version of V8 // used at the time of this comment has a bug with PKU on certain Intel CPUs. // See https://github.com/denoland/rusty_v8/issues/1381 let platform = v8::new_unprotected_default_platform(*V8_THREADS, false).make_shared(); // Calls into `v8::V8::InitializePlatform`, sets global platform. V8::initialize_platform(platform); // TODO: Figure out what V8 uses entropy for and set it here. // V8::set_entropy_source(...); // Set V8 command line flags. // https://github.com/v8/v8/blob/master/src/flags/flag-definitions.h let mut argv = vec![ "".to_owned(), // first arg is ignored "--harmony-import-assertions".to_owned(), // See https://github.com/denoland/deno/issues/2544 "--no-wasm-async-compilation".to_string(), // Disable `eval` or `new Function()`. "--disallow-code-generation-from-strings".to_string(), // We ensure 4MiB of stack space on all of our threads, so // tell V8 it can use up to 2MiB of stack space itself. The // default is 1MiB. Note that the flag is in KiB (https://github.com/v8/v8/blob/master/src/flags/flag-definitions.h#L1594). "--stack-size=2048".to_string(), ]; if let Ok(flags) = env::var("ISOLATE_V8_FLAGS") { argv.extend( flags .split(" ") .filter(|s| !s.is_empty()) .map(|s| s.to_owned()), ); tracing::info!("Final V8 flags: {:?}", argv); } // v8 returns the args that were misunderstood let misunderstood = V8::set_flags_from_command_line(argv); assert_eq!(misunderstood, vec![""]); // Calls into `v8::V8::Initialize` V8::initialize(); crate::udf_runtime::initialize().expect("Failed to set up UDF runtime"); }); } /// The V8 code all expects to run on a single thread, which makes it ineligible /// for Tokio's scheduler, which wants the ability to move work across scheduler /// threads. Instead, we'll manage our V8 threads ourselves. /// /// [`IsolateClient`] is the "client" entry point to our V8 threads. pub struct IsolateClient<RT: Runtime> { rt: RT, handles: Arc<Mutex<Vec<IsolateWorkerHandle>>>, scheduler: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, sender: CoDelQueueSender<RT, Request<RT>>, concurrency_logger: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, } impl<RT: Runtime> IsolateClient<RT> { pub fn new( rt: RT, max_percent_per_client: usize, max_isolate_workers: usize, isolate_config: Option<IsolateConfig>, ) -> anyhow::Result<Self> { let concurrency_limit = if *FUNRUN_ISOLATE_ACTIVE_THREADS > 0 { ConcurrencyLimiter::new(*FUNRUN_ISOLATE_ACTIVE_THREADS) } else { ConcurrencyLimiter::unlimited() }; let concurrency_logger = rt.spawn( "concurrency_logger", concurrency_limit.go_log(rt.clone(), ACTIVE_CONCURRENCY_PERMITS_LOG_FREQUENCY), ); let isolate_config = isolate_config.unwrap_or(IsolateConfig::new("funrun", concurrency_limit)); initialize_v8(); // NB: We don't call V8::Dispose or V8::ShutdownPlatform since we just assume a // single V8 instance per process and don't need to clean up its // resources. let (sender, receiver) = new_codel_queue_async::<_, Request<_>>(rt.clone(), *ISOLATE_QUEUE_SIZE); let handles = Arc::new(Mutex::new(Vec::new())); let handles_clone = handles.clone(); let rt_clone = rt.clone(); let scheduler = rt.spawn("shared_isolate_scheduler", async move { // The scheduler thread pops a worker from available_workers and // pops a request from the CoDelQueueReceiver. Then it sends the request // to the worker. let isolate_worker = FunctionRunnerIsolateWorker::new(rt_clone.clone(), isolate_config); let scheduler = SharedIsolateScheduler::new( rt_clone, isolate_worker, max_isolate_workers, handles_clone, max_percent_per_client, ); scheduler.run(receiver).await }); Ok(Self { rt, sender, scheduler: Arc::new(Mutex::new(Some(scheduler))), concurrency_logger: Arc::new(Mutex::new(Some(concurrency_logger))), handles, }) } pub fn aggregate_heap_stats(&self) -> IsolateHeapStats { let mut total = IsolateHeapStats::default(); for handle in self.handles.lock().iter() { total += handle.heap_stats.get(); } total } #[fastrace::trace] pub async fn execute_udf( &self, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, transaction: Transaction<RT>, journal: QueryJournal, context: ExecutionContext, environment_data: EnvironmentData<RT>, reactor_depth: usize, instance_name: String, function_started_sender: Option<oneshot::Sender<()>>, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> { let (tx, rx) = oneshot::channel(); let request = RequestType::Udf { request: UdfRequest { path_and_args, udf_type, transaction, journal, context, }, environment_data, response: tx, queue_timer: queue_timer(), reactor_depth, udf_callback: Box::new(self.clone()), function_started_sender, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; let (tx, outcome) = Self::receive_response(rx).await??; Ok((tx, outcome)) } #[fastrace::trace] pub async fn execute_action( &self, path_and_args: ValidatedPathAndArgs, transaction: Transaction<RT>, action_callbacks: Arc<dyn ActionCallbacks>, fetch_client: Arc<dyn FetchClient>, log_line_sender: mpsc::UnboundedSender<LogLine>, context: ExecutionContext, environment_data: EnvironmentData<RT>, instance_name: String, function_started_sender: Option<oneshot::Sender<()>>, ) -> anyhow::Result<ActionOutcome> { let (tx, rx) = oneshot::channel(); let request = RequestType::Action { request: ActionRequest { params: ActionRequestParams { path_and_args }, identity: transaction.identity().clone(), transaction, context, }, response: tx, queue_timer: queue_timer(), action_callbacks, fetch_client, log_line_sender, environment_data, function_started_sender, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match Self::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } /// Execute an HTTP action. /// HTTP actions can run other UDFs, so they take in a ActionCallbacks from /// the application layer. This creates a transient reference cycle. #[fastrace::trace] pub async fn execute_http_action( &self, http_module_path: ValidatedHttpPath, routed_path: RoutedHttpPath, http_request: udf::HttpActionRequest, identity: Identity, action_callbacks: Arc<dyn ActionCallbacks>, fetch_client: Arc<dyn FetchClient>, log_line_sender: mpsc::UnboundedSender<LogLine>, http_response_streamer: HttpActionResponseStreamer, transaction: Transaction<RT>, context: ExecutionContext, environment_data: EnvironmentData<RT>, instance_name: String, function_started_sender: Option<oneshot::Sender<()>>, ) -> anyhow::Result<HttpActionOutcome> { let (tx, rx) = oneshot::channel(); let request = RequestType::HttpAction { request: HttpActionRequest { http_module_path, routed_path, http_request, identity, transaction, context, }, environment_data, response: tx, queue_timer: queue_timer(), action_callbacks, fetch_client, log_line_sender, http_response_streamer, function_started_sender, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match Self::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } /// Analyze a set of user-defined modules. #[fastrace::trace] pub async fn analyze( &self, udf_config: UdfConfig, modules: BTreeMap<CanonicalizedModulePath, ModuleConfig>, environment_variables: BTreeMap<EnvVarName, EnvVarValue>, instance_name: String, ) -> anyhow::Result<Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>, JsError>> { anyhow::ensure!( modules .values() .all(|m| m.environment == ModuleEnvironment::Isolate), "Can only analyze Isolate modules" ); let (tx, rx) = oneshot::channel(); let request = RequestType::Analyze { modules, response: tx, udf_config, environment_variables, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match IsolateClient::<RT>::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } #[fastrace::trace] pub async fn evaluate_app_definitions( &self, app_definition: ModuleConfig, component_definitions: BTreeMap<ComponentDefinitionPath, ModuleConfig>, dependency_graph: BTreeSet<(ComponentDefinitionPath, ComponentDefinitionPath)>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, instance_name: String, ) -> anyhow::Result<EvaluateAppDefinitionsResult> { anyhow::ensure!( app_definition.environment == ModuleEnvironment::Isolate, "Can only evaluate Isolate modules" ); anyhow::ensure!( component_definitions .values() .all(|m| m.environment == ModuleEnvironment::Isolate), "Can only evaluate Isolate modules" ); let (tx, rx) = oneshot::channel(); let request = RequestType::EvaluateAppDefinitions { app_definition, component_definitions, dependency_graph, user_environment_variables, system_env_vars, response: tx, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match IsolateClient::<RT>::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } #[fastrace::trace] pub async fn evaluate_component_initializer( &self, evaluated_definitions: BTreeMap<ComponentDefinitionPath, ComponentDefinitionMetadata>, path: ComponentDefinitionPath, definition: ModuleConfig, args: BTreeMap<Identifier, Resource>, name: ComponentName, instance_name: String, ) -> anyhow::Result<BTreeMap<Identifier, Resource>> { let (tx, rx) = oneshot::channel(); let request = RequestType::EvaluateComponentInitializer { evaluated_definitions, path, definition, args, name, response: tx, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match IsolateClient::<RT>::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } #[fastrace::trace] pub async fn evaluate_schema( &self, schema_bundle: ModuleSource, source_map: Option<SourceMap>, rng_seed: [u8; 32], unix_timestamp: UnixTimestamp, instance_name: String, ) -> anyhow::Result<DatabaseSchema> { let (tx, rx) = oneshot::channel(); let request = RequestType::EvaluateSchema { schema_bundle, source_map, rng_seed, unix_timestamp, response: tx, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; match IsolateClient::<RT>::receive_response(rx).await? { Ok(outcome) => Ok(outcome), Err(e) => Err(recapture_stacktrace(e).await), } } #[fastrace::trace] pub async fn evaluate_auth_config( &self, auth_config_bundle: ModuleSource, source_map: Option<SourceMap>, environment_variables: BTreeMap<EnvVarName, EnvVarValue>, explanation: &str, instance_name: String, ) -> anyhow::Result<AuthConfig> { let (tx, rx) = oneshot::channel(); let request = RequestType::EvaluateAuthConfig { auth_config_bundle, source_map, environment_variables, response: tx, }; self.send_request(Request::new( instance_name, request, EncodedSpan::from_parent(), ))?; let result = IsolateClient::<RT>::receive_response(rx).await?; match result { Ok(outcome) => Ok(outcome), Err(e) => { let is_env_var_error = e .to_string() .starts_with("Uncaught Error: Environment variable"); let err = recapture_stacktrace(e).await; if err.is_rejected_before_execution() { return Err(err); } let error = err.to_string(); if is_env_var_error { // Reformatting the underlying message to be nicer // here. Since we lost the underlying ErrorMetadata into the JSError, // we do some string matching instead. CX-4531 Err(anyhow::anyhow!(ErrorMetadata::bad_request( "AuthConfigMissingEnvironmentVariable", error.trim_start_matches("Uncaught Error: ").to_string(), ))) } else { Err(anyhow::anyhow!(ErrorMetadata::bad_request( "InvalidAuthConfig", format!("{explanation}: {error}"), ))) } }, } } pub async fn shutdown(&self) -> anyhow::Result<()> { { let handles: Vec<_> = { let mut handles = self.handles.lock(); for handle in &mut *handles { handle.handle.shutdown(); } handles.drain(..).collect() }; for handle in handles.into_iter() { shutdown_and_join(handle.handle).await?; } } if let Some(mut scheduler) = self.scheduler.lock().take() { scheduler.shutdown(); } if let Some(mut concurrency_logger) = self.concurrency_logger.lock().take() { concurrency_logger.shutdown(); } Ok(()) } fn send_request(&self, request: Request<RT>) -> anyhow::Result<()> { self.sender .try_send(request) .map_err(|_| metrics::execute_full_error())?; Ok(()) } async fn receive_response<T>(rx: oneshot::Receiver<T>) -> anyhow::Result<T> { // The only reason a oneshot response channel wil be dropped prematurely if the // isolate worker is shutting down. rx.await.map_err(|_| shutdown_error()) } } #[async_trait] impl<RT: Runtime> UdfCallback<RT> for IsolateClient<RT> { async fn execute_udf( &self, client_id: String, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, environment_data: EnvironmentData<RT>, transaction: Transaction<RT>, journal: QueryJournal, context: ExecutionContext, reactor_depth: usize, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> { self.execute_udf( udf_type, path_and_args, transaction, journal, context, environment_data, reactor_depth, client_id, None, /* function_started_sender */ ) .await } } pub struct SharedIsolateScheduler<RT: Runtime, W: IsolateWorker<RT>> { rt: RT, worker: W, /// Vec of channels for sending work to individual workers. worker_senders: Vec< mpsc::Sender<( Request<RT>, oneshot::Sender<ActiveWorkerState>, ActiveWorkerState, )>, >, /// Map from client_id to stack of workers (implemented with a deque). The /// most recently used worker for a given client is at the front of the /// deque. These workers were previously used by this client, but may /// safely be "stolen" for use by another client. A worker with a /// `last_used_ts` older than `ISOLATE_IDLE_TIMEOUT` has already been /// recreated and there will be no penalty for reassigning this worker to a /// new client. available_workers: HashMap<String, VecDeque<IdleWorkerState>>, /// Set of futures awaiting a response from an active worker. in_progress_workers: FuturesUnordered<oneshot::Receiver<ActiveWorkerState>>, /// Counts the number of active workers per client. Should only contain a /// key if the value is greater than 0. in_progress_count: HashMap<String, usize>, /// The max number of workers this scheduler is permitted to create. max_workers: usize, handles: Arc<Mutex<Vec<IsolateWorkerHandle>>>, max_percent_per_client: usize, } struct IdleWorkerState { worker_id: usize, last_used_ts: tokio::time::Instant, } struct ActiveWorkerState { worker_id: usize, client_id: String, } impl<RT: Runtime, W: IsolateWorker<RT>> SharedIsolateScheduler<RT, W> { pub fn new( rt: RT, worker: W, max_workers: usize, handles: Arc<Mutex<Vec<IsolateWorkerHandle>>>, max_percent_per_client: usize, ) -> Self { Self { rt, worker, worker_senders: Vec::new(), in_progress_workers: FuturesUnordered::new(), in_progress_count: HashMap::new(), available_workers: HashMap::new(), max_workers, handles, max_percent_per_client, } } fn handle_completed_worker(&mut self, completed_worker: ActiveWorkerState) { let new_count = match self .in_progress_count .remove_entry(&completed_worker.client_id) { Some((client_id, count)) if count > 1 => { self.in_progress_count.insert(client_id, count - 1); count - 1 }, Some((_, 1)) => { // Nothing to do; we've already removed the entry above. 0 }, _ => panic!( "Inconsistent state in `in_progress_count` map; the count of active workers for \ client {} must be >= 1", completed_worker.client_id ), }; log_pool_running_count( self.worker.config().name, new_count, &completed_worker.client_id, ); self.available_workers .entry(completed_worker.client_id) .or_default() .push_front(IdleWorkerState { worker_id: completed_worker.worker_id, last_used_ts: self.rt.monotonic_now(), }); } pub async fn run(mut self, receiver: CoDelQueueReceiver<RT, Request<RT>>) { log_pool_max(self.worker.config().name, self.max_workers); let mut receiver = receiver.fuse(); let mut report_stats = self.rt.wait(*HEAP_WORKER_REPORT_INTERVAL_SECONDS); loop { select_biased! { completed_worker = self.in_progress_workers.select_next_some() => { let Ok(completed_worker): Result<ActiveWorkerState, _> = completed_worker else { tracing::warn!("Worker has shut down uncleanly. Shutting down {} scheduler.", self.worker.config().name); return; }; self.handle_completed_worker(completed_worker); } request = receiver.next() => { let Some((request, expired)) = request else { tracing::warn!("Request sender went away; {} scheduler shutting down", self.worker.config().name); return }; if let Some(expired) = expired { request.expire(expired); continue; } let Some(worker_id) = self.get_worker(&request.client_id) else { request.reject(); continue; }; let (done_sender, done_receiver) = oneshot::channel(); self.in_progress_workers.push(done_receiver); let entry = self .in_progress_count .entry(request.client_id.clone()) .or_default(); *entry += 1; log_pool_running_count( self.worker.config().name, *entry, &request.client_id, ); let client_id = request.client_id.clone(); if self.worker_senders[worker_id] .try_send(( request, done_sender, ActiveWorkerState { client_id, worker_id, }, )) .is_err() { // Available worker should have an empty channel, so if we fail // here it must be shut down. We should shut down too. tracing::warn!( "Worker died or dropped channel. Shutting down {} scheduler.", self.worker.config().name ); return; } }, _ = report_stats => { let heap_stats = self.aggregate_heap_stats(); log_aggregated_heap_stats(&heap_stats); report_stats = self.rt.wait(*HEAP_WORKER_REPORT_INTERVAL_SECONDS); }, } } } /// Find a worker for the given `client_id`.` /// Returns `None` if no worker could be allocated for this client (i.e. /// this client has reached it's capacity with the scheduler). /// /// Note that the returned worker id is removed from the /// `self.available_workers` state, so the caller is responsible for using /// the worker and returning it back to `self.available_workers` after it is /// done. fn get_worker(&mut self, client_id: &str) -> Option<usize> { // Make sure this client isn't overloading the scheduler. let active_worker_count = self .in_progress_count .get(client_id) .copied() .unwrap_or_default(); if (active_worker_count * 100) / (self.max_workers) >= self.max_percent_per_client { tracing::warn!( "Client {} is using >= {}% of scheduler capacity; rejecting new request", client_id, self.max_percent_per_client, ); return None; } // Try to find an existing worker for this client. if let Some((client_id, mut workers)) = self.available_workers.remove_entry(client_id) { let worker = workers .pop_front() .expect("Available worker map should never contain an empty list"); if !workers.is_empty() { self.available_workers.insert(client_id, workers); } return Some(worker.worker_id); } // If we've recently started up and haven't yet created `max_workers` threads, // create a new worker instead of "stealing" some other client's worker. if self.worker_senders.len() < self.max_workers { let new_worker = self.worker.clone(); let heap_stats = SharedIsolateHeapStats::new(); let heap_stats_ = heap_stats.clone(); let (work_sender, work_receiver) = mpsc::channel(1); let handle = self.rt.spawn_thread("isolate", move || { new_worker.service_requests(work_receiver, heap_stats_) }); self.worker_senders.push(work_sender); self.handles .lock() .push(IsolateWorkerHandle { handle, heap_stats }); tracing::info!( "Created {} isolate worker {}", self.worker.config().name, self.worker_senders.len() - 1 ); return Some(self.worker_senders.len() - 1); } // No existing worker for this client and we've already started the max number // of workers -- just grab the least recently used worker. This worker is least // likely to be reused by its' previous client. let Some((key, workers)) = self.available_workers .iter_mut() .min_by(|(_, workers1), (_, workers2)| { workers1 .back() .expect("Available worker map should never contain an empty list") .last_used_ts .cmp( &workers2 .back() .expect("Available worker map should never contain an empty list") .last_used_ts, ) }) else { // No available workers. return None; }; log_worker_stolen( workers .back() .expect("Available worker map should never contain an empty list") .last_used_ts .elapsed(), ); let worker_id = workers .pop_back() .expect("Available worker map should never contain an empty list"); if workers.is_empty() { // This variable shadowing drops the mutable reference to // `self.available_workers`. let key = key.clone(); self.available_workers.remove(&key); } Some(worker_id.worker_id) } fn aggregate_heap_stats(&self) -> IsolateHeapStats { let mut total = IsolateHeapStats::default(); for handle in self.handles.lock().iter() { total += handle.heap_stats.get(); } total } } pub struct IsolateWorkerHandle { pub handle: Box<dyn SpawnHandle>, heap_stats: SharedIsolateHeapStats, } #[derive(Clone)] pub struct SharedIsolateHeapStats(Arc<Mutex<IsolateHeapStats>>); impl SharedIsolateHeapStats { pub(crate) fn new() -> Self { Self(Arc::new(Mutex::new(IsolateHeapStats::default()))) } pub(crate) fn get(&self) -> IsolateHeapStats { *self.0.lock() } pub fn store(&self, stats: IsolateHeapStats) { *self.0.lock() = stats; } } #[async_trait(?Send)] pub trait IsolateWorker<RT: Runtime>: Clone + Send + 'static { async fn service_requests<T>( self, reqs: mpsc::Receiver<(Request<RT>, oneshot::Sender<T>, T)>, heap_stats: SharedIsolateHeapStats, ) { let IsolateConfig { max_user_timeout, limiter, .. } = self.config(); let mut reqs = std::pin::pin!(ReceiverStream::new(reqs).peekable()); let mut ready: Option<(oneshot::Sender<_>, _)> = None; 'recreate_isolate: loop { let mut last_client_id: Option<String> = None; let mut last_request: Option<String> = None; let mut isolate = Isolate::new(self.rt(), *max_user_timeout, limiter.clone()); heap_stats.store(isolate.heap_stats()); loop { let v8_context = { let _create_context_timer = create_context_timer(); let mut scope = isolate.handle_scope(); let context = v8::Context::new(&mut scope, v8::ContextOptions::default()); v8::Global::new(&mut scope, context) }; // Check again whether the isolate has enough free heap memory // before starting the next request if let Some(debug_str) = &last_request && should_recreate_isolate(&mut isolate, debug_str) { continue 'recreate_isolate; } heap_stats.store(isolate.heap_stats()); if let Some((done, done_token)) = ready.take() { // Inform the scheduler that this thread is ready to accept a new request. let _ = done.send(done_token); } tokio::select! { // If the isolate isn't "tainted", no need to wait for the idle timeout. _ = self.rt().wait(*ISOLATE_IDLE_TIMEOUT), if last_client_id.is_some() => { tracing::debug!("Restarting isolate for {last_client_id:?} due to idle timeout"); metrics::log_recreate_isolate("idle_timeout"); continue 'recreate_isolate; }, // First peek the request to decide if we need to make a new isolate. req = reqs.as_mut().peek() => { let Some((req, ..)) = req else { return; }; let reused = last_client_id.is_some(); // If we receive a request from a different client (i.e. a different instance), // recreate the isolate. We don't allow an isolate to be reused // across clients for security isolation. if last_client_id.get_or_insert_with(|| { req.client_id.clone() }) != &req.client_id { let pause_client = self.rt().pause_client(); pause_client.wait(PAUSE_RECREATE_CLIENT).await; tracing::debug!("Restarting isolate due to client change, previous: {:?}, new: {:?}", last_client_id, req.client_id); metrics::log_recreate_isolate("client_id_changed"); continue 'recreate_isolate; } else if reused { tracing::debug!("Reusing isolate for client {}", req.client_id); } // Ok, we're ready to accept the request for real. let Some((req, done, done_token)) = reqs.next().await else { return }; // Note that we won't reply to `done` until the next // `v8_context` is created. This improves latency in the // common case since requests will be routed to a thread // that has a context ready to go. ready = Some((done, done_token)); let root = initialize_root_from_parent( func_path!(), req.parent_trace.clone(), ); root.add_property(|| ("reused_isolate", reused.as_label())); // Require the layer below to opt into isolate reuse by setting `isolate_clean`. let mut isolate_clean = false; let debug_str = self .handle_request( &mut isolate, v8_context, &mut isolate_clean, req, heap_stats.clone(), ) .in_span(root) .await; if !isolate_clean || should_recreate_isolate(&mut isolate, &debug_str) { continue 'recreate_isolate; } last_request = Some(debug_str); } } } } } async fn handle_request( &self, isolate: &mut Isolate<RT>, v8_context: v8::Global<v8::Context>, isolate_clean: &mut bool, req: Request<RT>, heap_stats: SharedIsolateHeapStats, ) -> String; fn config(&self) -> &IsolateConfig; fn rt(&self) -> RT; } pub(crate) fn should_recreate_isolate<RT: Runtime>( isolate: &mut Isolate<RT>, last_executed: &str, ) -> bool { if !*REUSE_ISOLATES { metrics::log_recreate_isolate("env_disabled"); return true; } if let Err(e) = isolate.check_isolate_clean() { tracing::error!( "Restarting Isolate {}: {e:?}, last request: {last_executed:?}", e.reason() ); metrics::log_recreate_isolate(e.reason()); return true; } if isolate.created().elapsed() > *ISOLATE_MAX_LIFETIME { metrics::log_recreate_isolate("max_lifetime"); return true; } false } #[cfg(test)] mod tests { use cmd_util::env::env_config; use common::pause::PauseController; use database::test_helpers::DbFixtures; use errors::ErrorMetadataAnyhowExt; use model::test_helpers::DbFixturesWithModel; use pb::common::FunctionResult as FunctionResultProto; use proptest::prelude::*; use runtime::testing::TestRuntime; use sync_types::testing::assert_roundtrips; use tokio::sync::oneshot; use super::FunctionResult; use crate::{ client::{ initialize_v8, NO_AVAILABLE_WORKERS, PAUSE_REQUEST, }, test_helpers::bogus_udf_request, IsolateClient, }; proptest! { #![proptest_config( ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() } )] #[test] fn test_function_result_proto_roundtrips(left in any::<FunctionResult>()) { assert_roundtrips::<FunctionResult, FunctionResultProto>(left); } } #[convex_macro::test_runtime] async fn test_scheduler_workers_limit_requests( rt: TestRuntime, pause1: PauseController, ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 100, 1, None)?; let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; let client1 = "client1"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); let request = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request)?; // Pausing a request while being executed should make the next request be // rejected because there are no available workers. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); let request2 = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request2)?; let response = IsolateClient::<TestRuntime>::receive_response(rx2).await?; let err = response.unwrap_err(); assert!(err.is_rejected_before_execution(), "{err:?}"); assert!(err.to_string().contains(NO_AVAILABLE_WORKERS)); Ok(()) } #[convex_macro::test_runtime] async fn test_scheduler_does_not_throttle_different_clients( rt: TestRuntime, pause1: PauseController, ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 50, 2, None)?; let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let client1 = "client1"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); let request = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request)?; // Pausing a request should not affect the next one because we have 2 workers // and 2 requests from different clients. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); let client2 = "client2"; let request2 = bogus_udf_request(&db, client2, sender).await?; function_runner_core.send_request(request2)?; IsolateClient::<TestRuntime>::receive_response(rx2).await??; Ok(()) } #[convex_macro::test_runtime] async fn test_scheduler_throttles_same_client( rt: TestRuntime, pause1: PauseController, ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 50, 2, None)?; let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let client = "client"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); let request = bogus_udf_request(&db, client, sender).await?; function_runner_core.send_request(request)?; // Pausing the first request and sending a second should make the second fail // because there's only one worker left and it is reserved for other clients. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); let request2 = bogus_udf_request(&db, client, sender).await?; function_runner_core.send_request(request2)?; let response = IsolateClient::<TestRuntime>::receive_response(rx2).await?; let err = response.unwrap_err(); assert!(err.is_rejected_before_execution()); assert!(err.to_string().contains(NO_AVAILABLE_WORKERS)); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server