Skip to main content
Glama

Convex MCP server

Official
by get-convex
execution_scope.rs26.1 kB
use std::{ collections::HashMap, ffi::c_char, marker::PhantomData, mem, ops::{ Deref, DerefMut, }, ptr, str, sync::Arc, }; use anyhow::{ anyhow, bail, Context as _, }; use async_recursion::async_recursion; use common::{ errors::JsError, runtime::Runtime, static_span, types::UdfType, }; use deno_core::{ v8, ModuleResolutionError, ModuleSpecifier, }; use errors::ErrorMetadata; use model::modules::{ module_versions::{ FullModuleSource, ModuleSource, }, user_error::{ ModuleNotFoundError, SystemModuleNotFoundError, }, }; use serde_json::Value as JsonValue; use value::heap_size::HeapSize; use crate::{ array_buffer_allocator::ArrayBufferMemoryLimit, bundled_js::system_udf_file, environment::{ IsolateEnvironment, ModuleCodeCacheResult, }, helpers::{ self, to_rust_string, }, isolate::{ CONVEX_SCHEME, SYSTEM_PREFIX, }, metrics, module_map::{ ModuleId, ModuleMap, }, request_scope::RequestState, termination::IsolateHandle, IsolateHeapStats, }; /// V8 will invoke our promise_reject_callback when it determines that a /// promise rejected without a handler. If there isn't a handler, we'd like to /// crash the UDF and pass this error on to the user. However, there are /// common situations where user code can only add a promise handler after the /// promise rejects: since promises and async functions run synchronously until /// their first suspend point, any handler registered with `.catch()` may be too /// late! /// /// ```js /// function fetch(url): Promise<Response> { /// if (!url) return Promise.reject("1 argument required"); /// ... /// } /// fetchWrapper().catch(x => console.log('caught', e)); /// ``` /// /// By the time the `fetchWrapper()` function above returns, the promise /// returned has already rejected. To distinguish between promises rejected /// with no rejection handling and promises which are handled soon enough we /// fully drain the microtask queue to give the current task and give /// other microtasks a chance to add a rejection handler. If at that point no /// rejection handler has been added to a promise, it's time to crash the UDF. /// /// This choice matches the behavior in Node.js and the HTML spec where this /// is called an "unhandled promise rejection." /// https://nodejs.org/api/process.html#event-unhandledrejection /// https://html.spec.whatwg.org/multipage/webappapis.html#unhandled-promise-rejections /// /// Although the promises in question are implicit in the async function syntax, /// this more complex code will exhibit similar behavior. /// /// ```js /// (async () => { /// try { /// await (async () => { /// throw new Error("will invoke promise_reject_callback") /// await Promise.resolve(); /// throw new Error("will not invoke PromiseRejectWithNoHandler") /// })(); /// } catch {} /// })() /// ``` pub struct PendingUnhandledPromiseRejections { pub exceptions: HashMap<v8::Global<v8::Promise>, v8::Global<v8::Value>>, } impl PendingUnhandledPromiseRejections { pub fn new() -> Self { PendingUnhandledPromiseRejections { exceptions: HashMap::new(), } } } pub struct PendingDynamicImports { pub allow_dynamic_imports: bool, pub imports: Vec<(ModuleSpecifier, v8::Global<v8::PromiseResolver>)>, } impl PendingDynamicImports { pub fn new(allow_dynamic_imports: bool) -> Self { PendingDynamicImports { allow_dynamic_imports, imports: Vec::new(), } } pub fn push(&mut self, specifier: ModuleSpecifier, resolver: v8::Global<v8::PromiseResolver>) { self.imports.push((specifier, resolver)); } pub fn take(&mut self) -> Vec<(ModuleSpecifier, v8::Global<v8::PromiseResolver>)> { self.imports.split_off(0) } } /// Most functionality for executing JS and manipulating objects executes within /// a [`v8::HandleScope`]. The [`ExecutionScope`] wrapper is a convenience /// struct that represents executing code within a [`RequestScope`]. pub struct ExecutionScope<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> { v8_scope: &'a mut v8::HandleScope<'b>, _v8_context: v8::Local<'b, v8::Context>, _pd: PhantomData<(RT, E)>, } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> Deref for ExecutionScope<'a, 'b, RT, E> { type Target = v8::HandleScope<'b>; fn deref(&self) -> &v8::HandleScope<'b> { self.v8_scope } } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> DerefMut for ExecutionScope<'a, 'b, RT, E> { fn deref_mut(&mut self) -> &mut v8::HandleScope<'b> { self.v8_scope } } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> ExecutionScope<'a, 'b, RT, E> { pub fn new(v8_scope: &'a mut v8::HandleScope<'b>) -> Self { let v8_context = v8_scope.get_current_context(); Self { v8_scope, _v8_context: v8_context, _pd: PhantomData, } } pub fn handle(&self) -> &IsolateHandle { self.v8_scope .get_slot() .expect("IsolateHandle disappeared?") } pub fn state(&mut self) -> anyhow::Result<&RequestState<RT, E>> { self.v8_scope .get_slot() .ok_or_else(|| anyhow!("ContextState disappeared?")) } // TODO: Delete this method and use with_state_mut everywhere instead in // order to make it impossible to hold state across await points. pub fn state_mut(&mut self) -> anyhow::Result<&mut RequestState<RT, E>> { self.v8_scope .get_slot_mut() .ok_or_else(|| anyhow!("ContextState disappeared?")) } pub fn with_state_mut<T>( &mut self, f: impl FnOnce(&mut RequestState<RT, E>) -> T, ) -> anyhow::Result<T> { let state = self.state_mut()?; Ok(f(state)) } pub fn record_heap_stats(&mut self) -> anyhow::Result<()> { let stats = self.get_heap_statistics(); let array_buffer_size = self .get_slot::<Arc<ArrayBufferMemoryLimit>>() .context("missing ArrayBufferMemoryLimit?")? .used(); self.with_state_mut(|state| { let blobs_heap_size = state.blob_parts.heap_size(); let streams_heap_size = state.streams.heap_size() + state.stream_listeners.heap_size(); state.environment.record_heap_stats(IsolateHeapStats::new( stats, blobs_heap_size, streams_heap_size, array_buffer_size, )); }) } pub fn module_map(&mut self) -> &ModuleMap { self.v8_scope.get_slot().expect("ModuleMap disappeared?") } pub fn module_map_mut(&mut self) -> &mut ModuleMap { self.v8_scope .get_slot_mut() .expect("ModuleMap disappeared?") } #[allow(unused)] pub fn pending_unhandled_promise_rejections(&mut self) -> &PendingUnhandledPromiseRejections { self.v8_scope .get_slot_mut() .expect("No PendingUnhandledPromiseRejections found") } pub fn pending_unhandled_promise_rejections_mut( &mut self, ) -> &mut PendingUnhandledPromiseRejections { self.v8_scope .get_slot_mut() .expect("No PendingUnhandledPromiseRejections found") } pub fn pending_dynamic_imports_mut(&mut self) -> &mut PendingDynamicImports { self.v8_scope .get_slot_mut() .expect("No PendingDynamicImports found") } pub fn with_try_catch<R>( &mut self, f: impl FnOnce(&mut v8::HandleScope<'b>) -> R, ) -> anyhow::Result<Result<R, JsError>> { let mut tc_scope = v8::TryCatch::new(self.v8_scope); let r = f(&mut tc_scope); if let Some(e) = tc_scope.exception() { drop(tc_scope); return Ok(Err(self.format_traceback(e)?)); } Ok(Ok(r)) } pub async fn eval_user_module( &mut self, udf_type: UdfType, is_dynamic: bool, name: &ModuleSpecifier, ) -> anyhow::Result<Result<v8::Local<'a, v8::Module>, JsError>> { let timer = metrics::eval_user_module_timer(udf_type, is_dynamic); let module = match self.eval_module(name).await { Ok(id) => id, Err(e) => { // TODO: It's a bit awkward that we're calling these "JsError"s, since they // don't originate from JavaScript. if let Some(e) = e.downcast_ref::<ModuleNotFoundError>() { return Ok(Err(JsError::from_message(format!("{e}")))); } if let Some(e) = e.downcast_ref::<SystemModuleNotFoundError>() { return Ok(Err(JsError::from_message(format!("{e}")))); } if let Some(e) = e.downcast_ref::<ModuleResolutionError>() { return Ok(Err(JsError::from_message(format!("{e}")))); } match e.downcast::<JsError>() { Ok(e) => return Ok(Err(e)), Err(e) => return Err(e), } }, }; timer.finish(); Ok(Ok(module)) } #[fastrace::trace] pub async fn eval_module( &mut self, name: &ModuleSpecifier, ) -> anyhow::Result<v8::Local<'a, v8::Module>> { let _s = static_span!(); // These first two steps of registering and then instantiating the module // correspond to `JsRuntime::load_module`. This function is idempotent, // so it's safe to rerun. let id = self.register_module(name).await?; // NB: This part is separate from `self.register_module()` since module // registration is recursive, compiling and registering dependencies, // where instantiation and evaluation are not. self.instantiate_and_eval_module(id)?; let module_map = self.module_map(); let module = module_map .handle_by_id(id) .ok_or_else(|| anyhow!("Non-existent module ID {id}"))?; let module = v8::Local::new(self, module); Ok(module) } #[async_recursion(?Send)] async fn register_module(&mut self, name: &ModuleSpecifier) -> anyhow::Result<ModuleId> { let _s = static_span!(); { let module_map = self.module_map(); if let Some(id) = module_map.get_by_name(name) { return Ok(id); } } let (id, import_specifiers) = { let (module_source, code_cache) = self.lookup_source(name).await?; // Step 1: Compile the module and discover its imports. let timer = metrics::compile_module_timer(matches!( &code_cache, ModuleCodeCacheResult::Cached(..) )); // Create a nested scope so that objects can be GC'd let mut scope = v8::HandleScope::new(&mut **self); let mut scope = ExecutionScope::<RT, E>::new(&mut scope); let name_str = v8::String::new(&mut scope, name.as_str()) .ok_or_else(|| anyhow!("Failed to create name string"))?; let source_str = make_source_string(&mut scope, &module_source.source)?; let origin = helpers::module_origin(&mut scope, name_str); let (mut v8_source, options) = match &code_cache { ModuleCodeCacheResult::Cached(data) => ( v8::script_compiler::Source::new_with_cached_data( source_str, Some(&origin), v8::CachedData::new(data), ), v8::script_compiler::CompileOptions::ConsumeCodeCache, ), ModuleCodeCacheResult::Uncached(_) => ( v8::script_compiler::Source::new(source_str, Some(&origin)), v8::script_compiler::CompileOptions::NoCompileOptions, ), }; let module = scope .with_try_catch(|s| { v8::script_compiler::compile_module2( s, &mut v8_source, options, v8::script_compiler::NoCacheReason::NoReason, ) })?? .ok_or_else(|| anyhow!("Unexpected module compilation error"))?; match code_cache { ModuleCodeCacheResult::Cached(data) => { // N.B.: this is not reflected in rusty-v8's lifetimes, // but the pointer behind the `v8::CachedData` passed to // `v8::Source` must stay alive through the call to // `compile_module2`. // At this point however it's already been deserialized and // is safe to drop. let _: Arc<[u8]> = data; }, ModuleCodeCacheResult::Uncached(callback) => { let timer = metrics::create_code_cache_timer(); let module_script = module.get_unbound_module_script(&mut scope); if let Some(cached_data) = module_script.create_code_cache() { callback(cached_data[..].into()); timer.finish(); } }, } assert_eq!(module.get_status(), v8::ModuleStatus::Uninstantiated); let mut import_specifiers = vec![]; let module_requests = module.get_module_requests(); for i in 0..module_requests.length() { let module_request: v8::Local<v8::ModuleRequest> = module_requests .get(&mut scope, i) .ok_or_else(|| anyhow!("Module request {} out of bounds", i))? .try_into()?; let import_specifier = helpers::to_rust_string(&mut scope, &module_request.get_specifier())?; let module_specifier = deno_core::resolve_import(&import_specifier, name.as_str())?; let offset = module_request.get_source_offset(); let location = module.source_offset_to_location(offset); import_specifiers.push((module_specifier, location)); } timer.finish(); // Step 2: Register the module with the module map. let id = { let module_v8 = v8::Global::<v8::Module>::new(&mut scope, module); let module_map = scope.module_map_mut(); module_map.register(name, module_v8, module_source) }; (id, import_specifiers) }; // Step 3: Recursively load the dependencies. Since we've already registered // ourselves, this won't create an infinite loop on import cycles. for (import_specifier, location) in import_specifiers { self.register_module(&import_specifier).await.map_err(|e| { let Err(e) = self.nicely_show_line_number_on_error(name, location, e); e })?; } Ok(id) } async fn lookup_source( &mut self, module_specifier: &ModuleSpecifier, ) -> anyhow::Result<(Arc<FullModuleSource>, ModuleCodeCacheResult)> { let _s = static_span!(); if module_specifier.scheme() != CONVEX_SCHEME { anyhow::bail!(ErrorMetadata::bad_request( "UnsupportedScheme", format!( "Unsupported scheme ({}) in {}", module_specifier.scheme(), module_specifier ), )); } if module_specifier.has_authority() { anyhow::bail!(ErrorMetadata::bad_request( "UnsupportedAuthority", format!( "Module URL {} must not have an authority. Has {}", module_specifier, module_specifier.authority() ), )); } if module_specifier.cannot_be_a_base() { anyhow::bail!(ErrorMetadata::bad_request( "CannotBeABase", format!( "Module URL {module_specifier} is a cannot-be-a-base URL which is disallowed." ), )); } let module_path = module_specifier .path() .strip_prefix('/') .ok_or_else(|| anyhow!("Path for {:?} did not start with a slash", module_specifier))?; let timer = metrics::lookup_source_timer(module_path.starts_with(SYSTEM_PREFIX)); // Overlay our "_system/" files on top of the user's UDFs. if let Some(system_path) = module_path.strip_prefix(SYSTEM_PREFIX) { let (source, source_map) = system_udf_file(system_path) .ok_or_else(|| SystemModuleNotFoundError::new(system_path))?; let result = FullModuleSource { source: source.into(), source_map: source_map.as_ref().map(|s| s.to_string()), }; timer.finish(); // TODO: should we code-cache system UDFs? return Ok((Arc::new(result), ModuleCodeCacheResult::noop())); } let state = self.state_mut()?; let result = state .environment .lookup_source(module_path, &mut state.timeout, &mut state.permit) .await? .ok_or_else(|| ModuleNotFoundError::new(module_path))?; timer.finish(); Ok(result) } #[fastrace::trace] fn instantiate_and_eval_module(&mut self, id: ModuleId) -> anyhow::Result<()> { let _s = static_span!(); let module = { let module_map = self.module_map(); let handle = module_map .handle_by_id(id) .ok_or_else(|| anyhow!("ModuleInfo not found for {}", id))?; v8::Local::new(self, handle) }; match module.get_status() { v8::ModuleStatus::Errored => bail!("Module {} is in errored state", id), v8::ModuleStatus::Evaluated => return Ok(()), _ => (), } // Instantiate the module, loading its dependencies. { let timer = metrics::instantiate_module_timer(); let result = self.with_try_catch(|s| { module.instantiate_module(s, Self::module_resolve_callback) })??; if matches!(result, Some(false) | None) { anyhow::bail!("Unexpected successful instantiate result: {result:?}"); } anyhow::ensure!(module.get_status() == v8::ModuleStatus::Instantiated); timer.finish(); }; let value = { let timer = metrics::evaluate_module_timer(); let result = self .with_try_catch(|s| module.evaluate(s))?? .ok_or_else(|| anyhow!("Missing result from successful module evaluation"))?; // TODO: Check if we have a terminating error here. timer.finish(); result }; let status = module.get_status(); anyhow::ensure!( status == v8::ModuleStatus::Evaluated || status == v8::ModuleStatus::Errored ); let promise = v8::Local::<v8::Promise>::try_from(value) .map_err(|e| anyhow!("Module evaluation did not return a promise: {:?}", e))?; match promise.state() { v8::PromiseState::Pending => { bail!(JsError::from_message( "Top-level awaits in source files are unsupported".to_string() )) }, v8::PromiseState::Fulfilled => { anyhow::ensure!(status == v8::ModuleStatus::Evaluated); }, v8::PromiseState::Rejected => { let e = promise.result(self.v8_scope); return Err(self.format_traceback(e)?.into()); }, } Ok(()) } fn module_resolve_callback<'c>( context: v8::Local<'c, v8::Context>, specifier: v8::Local<'c, v8::String>, _import_assertions: v8::Local<'c, v8::FixedArray>, referrer: v8::Local<'c, v8::Module>, ) -> Option<v8::Local<'c, v8::Module>> { let scope = &mut unsafe { v8::CallbackScope::new(context) }; match Self::_module_resolve_callback(scope, referrer, specifier) { Ok(m) => Some(m), Err(e) => { helpers::throw_type_error(scope, format!("{e:?}")); None }, } } fn _module_resolve_callback<'c>( scope: &mut v8::CallbackScope<'c>, referrer: v8::Local<'c, v8::Module>, specifier: v8::Local<'c, v8::String>, ) -> anyhow::Result<v8::Local<'c, v8::Module>> { let mut scope = ExecutionScope::<RT, E>::new(scope); let referrer_global = v8::Global::new(&mut scope, referrer); let specifier_str = helpers::to_rust_string(&mut scope, &specifier)?; let module_map = scope.module_map(); let referrer_name = module_map .name_by_handle(&referrer_global) .ok_or_else(|| anyhow::anyhow!("Couldn't find referring module"))? .to_string(); let resolved_specifier = deno_core::resolve_import(&specifier_str, &referrer_name)?; let id = module_map .get_by_name(&resolved_specifier) .ok_or_else(|| anyhow!("Couldn't find {resolved_specifier}"))?; let handle = module_map .handle_by_id(id) .ok_or_else(|| anyhow!("Couldn't find {specifier_str} in {referrer_name}"))?; Ok(v8::Local::new(&mut scope, handle)) } pub fn syscall( &mut self, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) -> anyhow::Result<()> { let _s = static_span!(); if args.length() != 2 { // There's not really an expected developer mistake that would lead to them // calling Convex.syscall incorrectly -- the bug must be in our // convex/server code. Treat this as a system error. anyhow::bail!("syscall(op, arg_object) takes two arguments"); } let op_name: v8::Local<v8::String> = args.get(0).try_into()?; let op_name = to_rust_string(self, &op_name)?; let timer = metrics::syscall_timer(&op_name); let args_v8: v8::Local<v8::String> = args.get(1).try_into()?; let args_s = to_rust_string(self, &args_v8)?; let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { anyhow::anyhow!(ErrorMetadata::bad_request( "SyscallArgsInvalidJson", format!("Received invalid json: {e}"), )) })?; let state = self.state_mut()?; let result = state.environment.syscall(&op_name[..], args_v)?; let value_s = serde_json::to_string(&result)?; let value_v8 = v8::String::new(self, &value_s[..]) .ok_or_else(|| anyhow!("Failed to create result string"))?; rv.set(value_v8.into()); timer.finish(); Ok(()) } pub fn async_syscall( &mut self, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) -> anyhow::Result<()> { if args.length() != 2 { anyhow::bail!("asyncSyscall(op, arg_object) takes two arguments"); } let op_name: v8::Local<v8::String> = args.get(0).try_into()?; let op_name = to_rust_string(self, &op_name)?; let args_v8: v8::Local<v8::String> = args.get(1).try_into()?; let args_s = to_rust_string(self, &args_v8)?; let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { anyhow::anyhow!(ErrorMetadata::bad_request( "SyscallArgsInvalidJson", format!("Received invalid json: {e}"), )) })?; let resolver = v8::PromiseResolver::new(self) .ok_or_else(|| anyhow!("Failed to create PromiseResolver"))?; let promise = resolver.get_promise(self); let resolver = v8::Global::new(self, resolver); { let state = self.state_mut()?; state .environment .start_async_syscall(op_name, args_v, resolver)?; } rv.set(promise.into()); Ok(()) } } fn make_source_string<'s>( scope: &mut v8::HandleScope<'s, ()>, module_source: &ModuleSource, ) -> anyhow::Result<v8::Local<'s, v8::String>> { if module_source.is_ascii() { // Common case: we can use an external string and skip copying the // module to the V8 heap let owned_source: Arc<str> = module_source.source_arc().clone(); // SAFETY: we know that `module_source` is ASCII and we have bumped the // refcount, so the string will not be mutated or freed until we call // the destructor let ptr = owned_source.as_ptr(); let len = owned_source.len(); mem::forget(owned_source); unsafe extern "C" fn destroy(ptr: *mut c_char, len: usize) { drop(Arc::from_raw(ptr::from_raw_parts::<str>( ptr.cast::<u8>().cast_const(), len, ))); } // N.B.: new_external_onebyte_raw takes a mut pointer but it does not mutate it unsafe { v8::String::new_external_onebyte_raw( scope, ptr.cast::<c_char>().cast_mut(), len, destroy, ) } } else { v8::String::new(scope, module_source) } .ok_or_else(|| anyhow!("Failed to create source string")) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server