Skip to main content
Glama

Convex MCP server

Official
by get-convex
runner.rs34 kB
use std::{ cmp::Ordering, collections::BTreeMap, sync::Arc, time::Duration, }; use anyhow::Context as AnyhowContext; use common::{ bootstrap_model::components::handles::FunctionHandle, components::{ CanonicalizedComponentFunctionPath, CanonicalizedComponentModulePath, ComponentId, ComponentPath, Reference, ResolvedComponentFunctionPath, Resource, }, errors::JsError, execution_context::ExecutionContext, knobs::ISOLATE_MAX_USER_HEAP_SIZE, log_lines::{ LogLevel, LogLine, }, query::Query, query_journal::QueryJournal, runtime::{ Runtime, UnixTimestamp, }, sync::spsc, types::{ PersistenceVersion, UdfType, }, version::Version, }; use database::{ query::TableFilter, DeveloperQuery, Transaction, }; use errors::ErrorMetadata; use keybroker::KeyBroker; use model::{ config::module_loader::ModuleLoader, environment_variables::{ EnvironmentVariablesModel, PreloadedEnvironmentVariables, }, file_storage::{ types::FileStorageEntry, BatchKey, FileStorageId, }, modules::user_error::ModuleNotFoundError, udf_config::UdfConfigModel, virtual_system_mapping, }; use parking_lot::Mutex; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde_json::Value as JsonValue; use tokio::sync::{ mpsc::{ self, error::TrySendError, }, oneshot, Semaphore, }; use udf::{ validation::{ validate_schedule_args, ValidatedPathAndArgs, }, SyscallTrace, UdfOutcome, }; use value::{ ConvexArray, ConvexObject, ConvexValue, JsonPackedValue, NamespacedTableMapping, TableMapping, TableName, TableNamespace, TableNumber, TabletIdAndTableNumber, }; use super::{ client::{ AsyncOpCompletion, AsyncSyscallCompletion, Completions, EvaluateResult, IsolateThreadClient, IsolateThreadRequest, PendingAsyncSyscall, QueryId, }, context::Context, environment::{ Environment, EnvironmentOutcome, }, session::Session, thread::Thread, }; use crate::{ client::initialize_v8, environment::{ helpers::{ module_loader::{ module_specifier_from_path, path_from_module_specifier, }, MAX_LOG_LINES, }, udf::{ async_syscall::{ AsyncSyscallBatch, AsyncSyscallProvider, DatabaseSyscallsV1, ManagedQuery, }, syscall::{ syscall_impl, SyscallProvider, }, DatabaseUdfEnvironment, }, }, }; fn handle_request( session: &mut Session, context: &mut Context, request: IsolateThreadRequest, ) -> anyhow::Result<()> { match request { IsolateThreadRequest::RegisterModule { name, source, source_map, response, } => { let result = context.enter(session, |mut ctx| { ctx.register_module(&name, &source, source_map) }); response .send(result) .map_err(|_| anyhow::anyhow!("Canceled"))?; }, IsolateThreadRequest::EvaluateModule { name, response } => { let result = context.enter(session, |mut ctx| { ctx.evaluate_module(&name)?; anyhow::Ok(()) }); response .send(result) .map_err(|_| anyhow::anyhow!("Canceled"))?; }, IsolateThreadRequest::StartFunction { udf_type, udf_path, arguments, response, } => { let r = context.start_function(session, udf_type, udf_path, arguments); response.send(r).map_err(|_| anyhow::anyhow!("Canceled"))?; }, IsolateThreadRequest::PollFunction { function_id, completions, response, } => { let r = context.poll_function(session, function_id, completions); response.send(r).map_err(|_| anyhow::anyhow!("Canceled"))?; }, IsolateThreadRequest::Shutdown { response } => { let r = context.enter(session, |mut ctx| ctx.shutdown()); response.send(r).map_err(|_| anyhow::anyhow!("Canceled"))?; }, } Ok(()) } async fn v8_thread( mut receiver: mpsc::Receiver<IsolateThreadRequest>, environment: Box<dyn Environment>, ) -> anyhow::Result<()> { let mut thread = Thread::new(); let mut session = Session::new(&mut thread); let mut context = Context::new(&mut session, environment)?; while let Some(request) = receiver.recv().await { handle_request(&mut session, &mut context, request)?; } drop(context); drop(session); drop(thread); Ok(()) } #[derive(Debug, Copy, Clone)] pub struct SeedData { pub rng_seed: [u8; 32], pub unix_timestamp: UnixTimestamp, } #[derive(Debug)] enum UdfPhase { Importing { rng: ChaCha12Rng, }, Executing { rng: ChaCha12Rng, observed_time: bool, observed_rng: bool, }, Finalized, } struct UdfEnvironment<RT: Runtime> { rt: RT, is_system: bool, log_line_sender: spsc::Sender<LogLine>, lines_logged: usize, import_time_seed: SeedData, execution_time_seed: SeedData, phase: UdfPhase, shared: UdfShared<RT>, #[allow(unused)] env_vars: PreloadedEnvironmentVariables, } impl<RT: Runtime> UdfEnvironment<RT> { pub fn new( rt: RT, is_system: bool, import_time_seed: SeedData, execution_time_seed: SeedData, shared: UdfShared<RT>, env_vars: PreloadedEnvironmentVariables, log_line_sender: spsc::Sender<LogLine>, ) -> Self { let rng = ChaCha12Rng::from_seed(import_time_seed.rng_seed); Self { rt, is_system, log_line_sender, lines_logged: 0, import_time_seed, execution_time_seed, phase: UdfPhase::Importing { rng }, shared, env_vars, } } fn check_executing(&self) -> anyhow::Result<()> { let UdfPhase::Executing { .. } = self.phase else { // TODO: Is this right? Should we just be using JsError? anyhow::bail!(ErrorMetadata::bad_request( "NoDbDuringImport", "Can't use database at import time", )) }; Ok(()) } fn emit_log_line(&mut self, line: LogLine) -> anyhow::Result<()> { anyhow::ensure!(self.lines_logged < MAX_LOG_LINES); self.lines_logged += 1; if let Err(e) = self.log_line_sender.try_send(line) { match e { // In this case it's not much use to continue executing JS since the Tokio // thread has gone away. TrySendError::Closed(..) => anyhow::bail!("Log line receiver disconnected"), // If the Tokio thread is processing messages slower than we're streaming them // out, fail with a system error to shed load. TrySendError::Full(_) => { anyhow::bail!("Log lines produced faster than Tokio thread can consume them") }, } } Ok(()) } } impl<RT: Runtime> SyscallProvider<RT> for UdfEnvironment<RT> { fn table_filter(&self) -> TableFilter { if self.is_system { TableFilter::IncludePrivateSystemTables } else { TableFilter::ExcludePrivateSystemTables } } fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TabletIdAndTableNumber>> { self.check_executing()?; self.shared.lookup_table(name) } fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>> { self.check_executing()?; self.shared.lookup_virtual_table(name) } fn component_argument(&self, _name: &str) -> anyhow::Result<Option<ConvexValue>> { todo!(); } fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<QueryId> { self.check_executing()?; let query_id = self.shared.start_query(query, version); Ok(query_id) } fn cleanup_query(&mut self, query_id: u32) -> bool { self.shared.cleanup_query(query_id) } } impl<RT: Runtime> Environment for UdfEnvironment<RT> { fn syscall(&mut self, name: &str, args: JsonValue) -> anyhow::Result<JsonValue> { syscall_impl(self, name, args) } fn trace( &mut self, level: common::log_lines::LogLevel, messages: Vec<String>, ) -> anyhow::Result<()> { let line = match self.lines_logged.cmp(&(MAX_LOG_LINES - 1)) { Ordering::Less => { LogLine::new_developer_log_line( level, messages, // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), ) }, Ordering::Equal => { // Add a message about omitting log lines once LogLine::new_developer_log_line( LogLevel::Error, vec![format!( "Log overflow (maximum {MAX_LOG_LINES}). Remaining log lines omitted." )], // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), ) }, Ordering::Greater => { return Ok(()); }, }; self.emit_log_line(line) } fn trace_system( &mut self, level: common::log_lines::LogLevel, messages: Vec<String>, system_log_metadata: common::log_lines::SystemLogMetadata, ) -> anyhow::Result<()> { let line = LogLine::new_system_log_line( level, messages, // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), system_log_metadata, ); self.emit_log_line(line) } fn rng(&mut self) -> anyhow::Result<&mut rand_chacha::ChaCha12Rng> { match self.phase { UdfPhase::Importing { ref mut rng } => Ok(rng), UdfPhase::Executing { ref mut rng, ref mut observed_rng, .. } => { *observed_rng = true; Ok(rng) }, UdfPhase::Finalized => anyhow::bail!("RNG not available in finalized phase"), } } fn unix_timestamp(&mut self) -> anyhow::Result<UnixTimestamp> { let result = match self.phase { UdfPhase::Importing { .. } => self.import_time_seed.unix_timestamp, UdfPhase::Executing { ref mut observed_time, .. } => { *observed_time = true; self.execution_time_seed.unix_timestamp }, UdfPhase::Finalized => anyhow::bail!("Time not available in finalized phase"), }; Ok(result) } fn unix_timestamp_non_deterministic(&mut self) -> anyhow::Result<UnixTimestamp> { Ok(self.rt.unix_timestamp()) } fn get_environment_variable( &mut self, _name: common::types::EnvVarName, ) -> anyhow::Result<Option<common::types::EnvVarValue>> { todo!() } fn start_execution(&mut self) -> anyhow::Result<()> { let UdfPhase::Importing { .. } = self.phase else { anyhow::bail!("Phase was already {:?}", self.phase) }; self.phase = UdfPhase::Executing { rng: ChaCha12Rng::from_seed(self.execution_time_seed.rng_seed), observed_time: false, observed_rng: false, }; Ok(()) } fn finish_execution(&mut self) -> anyhow::Result<EnvironmentOutcome> { let (observed_time, observed_rng) = match self.phase { UdfPhase::Importing { .. } => (false, false), UdfPhase::Executing { observed_time, observed_rng, .. } => (observed_time, observed_rng), UdfPhase::Finalized => { anyhow::bail!("Phase was already finalized") }, }; self.phase = UdfPhase::Finalized; self.log_line_sender.close(); Ok(EnvironmentOutcome { observed_rng, observed_time, }) } fn get_all_table_mappings(&mut self) -> anyhow::Result<NamespacedTableMapping> { self.check_executing()?; Ok(self.shared.get_all_table_mappings()) } } async fn run_request<RT: Runtime>( rt: RT, tx: &mut Transaction<RT>, module_loader: Arc<dyn ModuleLoader<RT>>, execution_time_seed: SeedData, client: &mut IsolateThreadClient<RT>, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, shared: UdfShared<RT>, mut log_line_receiver: spsc::Receiver<LogLine>, key_broker: KeyBroker, execution_context: ExecutionContext, query_journal: QueryJournal, ) -> anyhow::Result<UdfOutcome> { let (path, arguments, udf_server_version) = path_and_args.consume(); anyhow::ensure!( path.component.is_root(), "TODO: non-root components not supported yet" ); let udf_path = &path.udf_path; // Spawn a separate Tokio thread to receive log lines. let (log_line_tx, log_line_rx) = oneshot::channel(); let log_line_processor = rt.spawn("log_line_processor", async move { let mut log_lines: Vec<LogLine> = vec![]; while let Some(line) = log_line_receiver.recv().await { log_lines.push(line); } let _ = log_line_tx.send(log_lines); }); // Phase 1: Load and register all source needed, and evaluate the UDF's module. let r: anyhow::Result<_> = try { let mut stack = vec![udf_path.module().clone()]; while let Some(module_path) = stack.pop() { let module_specifier = module_specifier_from_path(&module_path)?; let path = CanonicalizedComponentModulePath { component: ComponentId::Root, module_path: module_path.clone(), }; let Some(module_metadata) = module_loader.get_module(tx, path).await? else { let err = ModuleNotFoundError::new(module_path.as_str()); Err(JsError::from_message(format!("{err}")))? }; let requests = client .register_module( module_specifier, module_metadata.source.to_string(), module_metadata.source_map.clone(), ) .await?; for requested_module_specifier in requests { let module_path = path_from_module_specifier(&requested_module_specifier)?; stack.push(module_path); } } let udf_module_specifier = module_specifier_from_path(udf_path.module())?; client.evaluate_module(udf_module_specifier.clone()).await?; anyhow::Ok(()) }; if let Err(e) = r { let js_error = e.downcast::<JsError>()?; client.shutdown().await?; log_line_processor.join().await?; let log_lines = log_line_rx.await?.into(); let outcome = UdfOutcome { path: path.for_logging(), arguments, identity: tx.inert_identity(), observed_identity: false, rng_seed: execution_time_seed.rng_seed, observed_rng: false, unix_timestamp: execution_time_seed.unix_timestamp, observed_time: false, log_lines, journal: QueryJournal::new(), result: Err(js_error), syscall_trace: SyscallTrace::new(), udf_server_version, memory_in_mb: 0, }; return Ok(outcome); } // Phase 2: Start the UDF, execute its async syscalls, and poll until // completion. let mut provider = Isolate2SyscallProvider::new( tx, rt.clone(), execution_time_seed.unix_timestamp, query_journal, udf_path.is_system(), shared, key_broker, execution_context, ); let r: anyhow::Result<_> = try { // Update our shared state with the updated table mappings before reentering // user code. provider.shared.update_table_mappings(provider.tx); let (function_id, mut result) = client .start_function(udf_type, udf_path.clone(), arguments.clone()) .await?; loop { let pending = match result { EvaluateResult::Ready(r) => break r, EvaluateResult::Pending(p) => p, }; let mut completions = Completions::new(); // TODO: The current implementation returns control to JS after each batch. let mut syscall_batch: Option<AsyncSyscallBatch> = None; let mut batch_promise_ids = vec![]; for PendingAsyncSyscall { promise_id, name, args, } in pending.async_syscalls { if let Some(ref mut batch) = syscall_batch && batch.can_push(&name, &args) { batch.push(name, args)?; batch_promise_ids.push(promise_id); continue; } if let Some(batch) = syscall_batch.take() { let results = DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await; assert_eq!(results.len(), batch_promise_ids.len()); for (promise_id, result) in batch_promise_ids.drain(..).zip(results) { completions .async_syscalls .push(AsyncSyscallCompletion { promise_id, result }); } } syscall_batch = Some(AsyncSyscallBatch::new(name, args)); assert!(batch_promise_ids.is_empty()); batch_promise_ids.push(promise_id); } if let Some(batch) = syscall_batch { let results = DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await; assert_eq!(results.len(), batch_promise_ids.len()); for (promise_id, result) in batch_promise_ids.into_iter().zip(results) { completions .async_syscalls .push(AsyncSyscallCompletion { promise_id, result }); } } // Async ops don't do anything within UDFs. for async_op in pending.async_ops { let err = ErrorMetadata::bad_request( format!("No{}InQueriesOrMutations", async_op.request.name_for_error()), format!( "Can't use {} in queries and mutations. Please consider using an action. See https://docs.convex.dev/functions/actions for more details.", async_op.request.description_for_error() ), ); completions.async_ops.push(AsyncOpCompletion { promise_id: async_op.promise_id, result: Err(err.into()), }); } // Dynamic imports aren't allowed in UDFs either. if !pending.dynamic_imports.is_empty() { anyhow::bail!("TODO: Propagate error to dynamic import"); } provider.shared.update_table_mappings(provider.tx); result = client.poll_function(function_id, completions).await?; } }; let result = match r { Ok(result) => Ok(result), Err(e) => { let js_error = e.downcast::<JsError>()?; Err(js_error) }, }; let outcome = client.shutdown().await?; log_line_processor.join().await?; let mut log_lines = log_line_rx.await?; DatabaseUdfEnvironment::<RT>::add_warnings_to_log_lines( &path.clone().for_logging(), &arguments, client.execution_time()?, provider.tx.execution_size(), provider.tx.biggest_document_writes(), result.as_ref().ok(), |warning| { log_lines.push(LogLine::new_system_log_line( warning.level, warning.messages, // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. rt.unix_timestamp(), warning.system_log_metadata, )); }, )?; let outcome = UdfOutcome { path: path.for_logging(), arguments, identity: provider.tx.inert_identity(), observed_identity: provider.observed_identity, rng_seed: execution_time_seed.rng_seed, observed_rng: outcome.observed_rng, unix_timestamp: execution_time_seed.unix_timestamp, observed_time: outcome.observed_time, log_lines: log_lines.into(), journal: provider.next_journal, result: result.map(JsonPackedValue::pack), syscall_trace: provider.syscall_trace, udf_server_version, memory_in_mb: (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20)) .try_into() .unwrap(), }; Ok(outcome) } struct UdfShared<RT: Runtime> { inner: Arc<Mutex<UdfSharedInner<RT>>>, } impl<RT: Runtime> Clone for UdfShared<RT> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl<RT: Runtime> UdfShared<RT> { pub fn new(table_mapping: TableMapping) -> Self { Self { inner: Arc::new(Mutex::new(UdfSharedInner { next_query_id: 0, queries: BTreeMap::new(), table_mapping, })), } } fn update_table_mappings(&self, tx: &mut Transaction<RT>) { let mut inner = self.inner.lock(); // TODO: Avoid cloning here if the table mapping hasn't changed. inner.table_mapping = tx.table_mapping().clone(); } fn lookup_table(&self, name: &TableName) -> anyhow::Result<Option<TabletIdAndTableNumber>> { let inner = self.inner.lock(); Ok(inner .table_mapping .namespace(TableNamespace::root_component()) .id_and_number_if_exists(name)) } fn lookup_virtual_table(&self, name: &TableName) -> anyhow::Result<Option<TableNumber>> { let virtual_mapping = virtual_system_mapping(); let Ok(physical_table_name) = virtual_mapping.virtual_to_system_table(name) else { return Ok(None); }; self.lookup_table(physical_table_name) .map(|r| r.map(|t| t.table_number)) } fn start_query(&self, query: Query, version: Option<Version>) -> QueryId { let mut inner = self.inner.lock(); let query_id = inner.next_query_id; inner.next_query_id += 1; inner .queries .insert(query_id, ManagedQuery::Pending { query, version }); query_id } fn take_query(&self, query_id: QueryId) -> Option<ManagedQuery<RT>> { let mut inner = self.inner.lock(); inner.queries.remove(&query_id) } fn insert_query(&self, query_id: QueryId, query: DeveloperQuery<RT>) { let mut inner = self.inner.lock(); inner.queries.insert(query_id, ManagedQuery::Active(query)); } fn cleanup_query(&self, query_id: u32) -> bool { let mut inner = self.inner.lock(); inner.queries.remove(&query_id).is_some() } fn get_all_table_mappings(&self) -> NamespacedTableMapping { let inner = self.inner.lock(); inner .table_mapping .namespace(TableNamespace::by_component_TODO()) } } struct UdfSharedInner<RT: Runtime> { next_query_id: QueryId, queries: BTreeMap<QueryId, ManagedQuery<RT>>, table_mapping: TableMapping, } struct Isolate2SyscallProvider<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, rt: RT, shared: UdfShared<RT>, unix_timestamp: UnixTimestamp, observed_identity: bool, prev_journal: QueryJournal, next_journal: QueryJournal, is_system: bool, syscall_trace: SyscallTrace, key_broker: KeyBroker, context: ExecutionContext, } impl<'a, RT: Runtime> Isolate2SyscallProvider<'a, RT> { fn new( tx: &'a mut Transaction<RT>, rt: RT, unix_timestamp: UnixTimestamp, prev_journal: QueryJournal, is_system: bool, shared: UdfShared<RT>, key_broker: KeyBroker, context: ExecutionContext, ) -> Self { Self { tx, rt, shared, unix_timestamp, observed_identity: false, prev_journal, next_journal: QueryJournal::new(), is_system, syscall_trace: SyscallTrace::new(), key_broker, context, } } } impl<RT: Runtime> AsyncSyscallProvider<RT> for Isolate2SyscallProvider<'_, RT> { fn rt(&self) -> &RT { &self.rt } fn tx(&mut self) -> anyhow::Result<&mut Transaction<RT>> { // We only process syscalls during the execution phase. Ok(self.tx) } fn component(&self) -> anyhow::Result<ComponentId> { Ok(ComponentId::Root) } fn key_broker(&self) -> &KeyBroker { &self.key_broker } fn context(&self) -> &ExecutionContext { &self.context } fn observe_identity(&mut self) -> anyhow::Result<()> { self.observed_identity = true; Ok(()) } fn persistence_version(&self) -> PersistenceVersion { self.tx.persistence_version() } fn is_system(&self) -> bool { self.is_system } fn table_filter(&self) -> TableFilter { if self.is_system { TableFilter::IncludePrivateSystemTables } else { TableFilter::ExcludePrivateSystemTables } } fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool) { self.syscall_trace .log_async_syscall(name, duration, is_success); } fn prev_journal(&mut self) -> &mut QueryJournal { &mut self.prev_journal } fn next_journal(&mut self) -> &mut QueryJournal { &mut self.next_journal } async fn validate_schedule_args( &mut self, path: CanonicalizedComponentFunctionPath, args: Vec<JsonValue>, scheduled_ts: UnixTimestamp, ) -> anyhow::Result<(CanonicalizedComponentFunctionPath, ConvexArray)> { validate_schedule_args(path, args, scheduled_ts, self.unix_timestamp, self.tx).await } async fn file_storage_generate_upload_url(&mut self) -> anyhow::Result<String> { todo!() } async fn file_storage_get_url_batch( &mut self, _storage_ids: BTreeMap<BatchKey, FileStorageId>, ) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> { todo!() } async fn file_storage_delete(&mut self, _storage_id: FileStorageId) -> anyhow::Result<()> { todo!() } async fn file_storage_get_entry( &mut self, _storage_id: FileStorageId, ) -> anyhow::Result<Option<FileStorageEntry>> { todo!() } fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>) { self.shared.insert_query(query_id, query) } fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>> { self.shared.take_query(query_id) } fn cleanup_query(&mut self, query_id: u32) -> bool { self.shared.cleanup_query(query_id) } async fn run_udf( &mut self, _udf_type: UdfType, _path: ResolvedComponentFunctionPath, _args: ConvexObject, ) -> anyhow::Result<ConvexValue> { todo!(); } async fn create_function_handle( &mut self, _path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle> { todo!(); } async fn resolve(&mut self, reference: Reference) -> anyhow::Result<Resource> { let resource = match reference { Reference::ComponentArgument { .. } => todo!(), Reference::Function(udf_path) => { Resource::Function(CanonicalizedComponentFunctionPath { component: ComponentPath::root(), udf_path, }) }, Reference::ChildComponent { .. } => todo!(), Reference::CurrentSystemUdfInComponent { .. } => todo!(), }; Ok(resource) } async fn lookup_function_handle( &mut self, _handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { todo!() } } async fn tokio_thread<RT: Runtime>( rt: RT, mut tx: Transaction<RT>, module_loader: Arc<dyn ModuleLoader<RT>>, execution_time_seed: SeedData, mut client: IsolateThreadClient<RT>, total_timeout: Duration, mut sender: oneshot::Sender<anyhow::Result<(Transaction<RT>, UdfOutcome)>>, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, shared: UdfShared<RT>, log_line_receiver: spsc::Receiver<LogLine>, key_broker: KeyBroker, execution_context: ExecutionContext, query_journal: QueryJournal, ) { let request = run_request( rt.clone(), &mut tx, module_loader, execution_time_seed, &mut client, udf_type, path_and_args, shared, log_line_receiver, key_broker, execution_context, query_journal, ); let r = tokio::select! { r = request => r, // Eventually we'll attempt to cleanup the isolate thread in these conditions. _ = rt.wait(total_timeout) => Err(anyhow::anyhow!("Total timeout exceeded")), _ = sender.closed() => Err(anyhow::anyhow!("Cancelled")), }; let _ = sender.send(r.map(|r| (tx, r))); drop(client); } pub async fn run_isolate_v2_udf<RT: Runtime>( rt: RT, mut tx: Transaction<RT>, module_loader: Arc<dyn ModuleLoader<RT>>, execution_time_seed: SeedData, udf_type: UdfType, path_and_args: ValidatedPathAndArgs, key_broker: KeyBroker, context: ExecutionContext, query_journal: QueryJournal, ) -> anyhow::Result<(Transaction<RT>, UdfOutcome)> { initialize_v8(); let semaphore = Arc::new(Semaphore::new(8)); let user_timeout = Duration::from_secs(5); // We actually don't really care about "system timeout" but rather "total // timeout", both for how long we're tying up a request thread + serving // based on a tx timestamp that may be out of retention. // TODO: Decrease this for prod, maybe disable it entirely for tests? let total_timeout = Duration::from_secs(128); // TODO: Move these into the timeout. let udf_config = UdfConfigModel::new(&mut tx, TableNamespace::TODO()) .get() .await?; let import_time_seed = SeedData { rng_seed: udf_config .as_ref() .map(|c| c.import_phase_rng_seed) .context("Missing import phase RNG seed")?, unix_timestamp: udf_config .as_ref() .map(|c| c.import_phase_unix_timestamp) .context("Missing import phase unix timestamp")?, }; let env_vars = EnvironmentVariablesModel::new(&mut tx).preload().await?; // TODO: This unconditionally takes a table mapping dep. let shared = UdfShared::new(tx.table_mapping().clone()); let (log_line_sender, log_line_receiver) = spsc::channel(32); let environment = UdfEnvironment::new( rt.clone(), path_and_args.path().udf_path.is_system(), import_time_seed, execution_time_seed, shared.clone(), env_vars, log_line_sender, ); // The protocol is synchronous, so there should never be more than // one pending request at a time. let (sender, receiver) = mpsc::channel(1); let v8_handle = rt.spawn_thread("isolate2", || async { if let Err(e) = v8_thread(receiver, Box::new(environment)).await { println!("Error in isolate thread: {e:?}"); } }); let client = IsolateThreadClient::new(rt.clone(), sender, user_timeout, semaphore); let (sender, receiver) = oneshot::channel(); let tokio_handle = rt.spawn( "tokio_thread", tokio_thread( rt.clone(), tx, module_loader, execution_time_seed, client, total_timeout, sender, udf_type, path_and_args, shared, log_line_receiver, key_broker, context, query_journal, ), ); let r = receiver.await??; tokio_handle.join().await?; v8_handle.join().await?; Ok(r) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server