Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs16.8 kB
//! This module contains the implementation of both synchronous and //! async ops. Unlike syscalls, these functions are present in *every* //! environment, but the environment may decide not to implement their //! functionality, causing a runtime error. mod blob; mod console; mod crypto; mod database; mod environment_variables; mod errors; mod http; mod random; mod storage; mod stream; mod structured_clone; mod text; mod time; mod validate_args; mod validate_returns; use std::{ collections::BTreeMap, ops::DerefMut, }; use ::errors::ErrorMetadata; use anyhow::anyhow; use bytes::Bytes; use common::{ log_lines::LogLevel, runtime::{ Runtime, UnixTimestamp, }, types::{ EnvVarName, EnvVarValue, }, }; use crypto::{ op_crypto_decrypt, op_crypto_encrypt, }; use deno_core::{ v8, ModuleSpecifier, }; use rand_chacha::ChaCha12Rng; use sourcemap::SourceMap; use structured_clone::op_structured_clone; use uuid::Uuid; use validate_returns::op_validate_returns; use value::{ heap_size::WithHeapSize, NamespacedTableMapping, }; use self::{ blob::{ op_blob_create_part, op_blob_read_part, op_blob_slice_part, }, console::{ op_console_message, op_console_time_end, op_console_time_log, op_console_time_start, op_console_trace, }, crypto::{ op_crypto_base64_url_decode, op_crypto_base64_url_encode, op_crypto_derive_bits, op_crypto_digest, op_crypto_export_key, op_crypto_export_pkcs8_ed25519, op_crypto_export_pkcs8_x25519, op_crypto_export_spki_ed25519, op_crypto_export_spki_x25519, op_crypto_generate_key_bytes, op_crypto_generate_keypair, op_crypto_get_random_values, op_crypto_import_key, op_crypto_import_pkcs8_ed25519, op_crypto_import_pkcs8_x25519, op_crypto_import_spki_ed25519, op_crypto_import_spki_x25519, op_crypto_jwk_x_ed25519, op_crypto_random_uuid, op_crypto_sign, op_crypto_sign_ed25519, op_crypto_verify, op_crypto_verify_ed25519, }, database::op_get_table_mapping, environment_variables::op_environment_variables_get, errors::{ op_error_stack, op_throw_uncatchable_developer_error, }, http::{ async_op_fetch, async_op_parse_multi_part, op_headers_get_mime_type, op_headers_normalize_name, op_url_get_url_info, op_url_get_url_search_param_pairs, op_url_stringify_url_search_params, op_url_update_url_info, }, storage::{ async_op_storage_get, async_op_storage_store, }, stream::{ async_op_stream_read_part, op_stream_create, op_stream_extend, }, text::{ op_atob, op_btoa, op_text_encoder_cleanup, op_text_encoder_decode, op_text_encoder_decode_single, op_text_encoder_encode, op_text_encoder_encode_into, op_text_encoder_new_decoder, op_text_encoder_normalize_label, }, time::{ async_op_sleep, op_now, }, validate_args::op_validate_args, }; pub use self::{ crypto::CryptoOps, random::op_random, }; use crate::{ environment::{ crypto_rng::CryptoRng, AsyncOpRequest, IsolateEnvironment, }, execution_scope::ExecutionScope, helpers::to_rust_string, metrics, request_scope::{ ReadableStream, StreamListener, TextDecoderResource, }, }; pub trait OpProvider<'b> { fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng>; fn crypto_rng(&mut self) -> anyhow::Result<CryptoRng>; fn scope(&mut self) -> &mut v8::HandleScope<'b>; fn lookup_source_map( &mut self, specifier: &ModuleSpecifier, ) -> anyhow::Result<Option<SourceMap>>; fn trace(&mut self, level: LogLevel, messages: Vec<String>) -> anyhow::Result<()>; fn console_timers( &mut self, ) -> anyhow::Result<&mut WithHeapSize<BTreeMap<String, UnixTimestamp>>>; fn unix_timestamp(&mut self) -> anyhow::Result<UnixTimestamp>; fn unix_timestamp_non_deterministic(&mut self) -> anyhow::Result<UnixTimestamp>; fn start_async_op( &mut self, request: AsyncOpRequest, resolver: v8::Global<v8::PromiseResolver>, ) -> anyhow::Result<()>; fn create_blob_part(&mut self, bytes: Bytes) -> anyhow::Result<Uuid>; fn get_blob_part(&mut self, uuid: &Uuid) -> anyhow::Result<Option<Bytes>>; fn create_stream(&mut self) -> anyhow::Result<Uuid>; fn extend_stream( &mut self, id: Uuid, bytes: Option<Bytes>, new_done: bool, ) -> anyhow::Result<()>; fn new_stream_listener( &mut self, stream_id: Uuid, listener: StreamListener, ) -> anyhow::Result<()>; fn create_text_decoder(&mut self, decoder: TextDecoderResource) -> anyhow::Result<Uuid>; fn get_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result<&mut TextDecoderResource>; fn remove_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result<TextDecoderResource>; fn get_environment_variable(&mut self, name: EnvVarName) -> anyhow::Result<Option<EnvVarValue>>; fn get_all_table_mappings(&mut self) -> anyhow::Result<NamespacedTableMapping>; } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> OpProvider<'b> for ExecutionScope<'a, 'b, RT, E> { fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> { let state = self.state_mut()?; state.environment.rng() } fn crypto_rng(&mut self) -> anyhow::Result<CryptoRng> { let state = self.state_mut()?; state.environment.crypto_rng() } fn lookup_source_map( &mut self, specifier: &ModuleSpecifier, ) -> anyhow::Result<Option<SourceMap>> { ExecutionScope::lookup_source_map(self, specifier) } fn scope(&mut self) -> &mut v8::HandleScope<'b> { self.deref_mut() } fn trace(&mut self, level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> { let state = self.state_mut()?; state.environment.trace(level, messages)?; Ok(()) } fn console_timers( &mut self, ) -> anyhow::Result<&mut WithHeapSize<BTreeMap<String, UnixTimestamp>>> { let state = self.state_mut()?; Ok(&mut state.console_timers) } fn unix_timestamp(&mut self) -> anyhow::Result<UnixTimestamp> { let state = self.state_mut()?; state.environment.unix_timestamp() } fn unix_timestamp_non_deterministic(&mut self) -> anyhow::Result<UnixTimestamp> { let state = self.state_mut()?; Ok(state.unix_timestamp_non_deterministic()) } fn start_async_op( &mut self, request: AsyncOpRequest, resolver: v8::Global<v8::PromiseResolver>, ) -> anyhow::Result<()> { let state = self.state_mut()?; state.environment.start_async_op(request, resolver) } fn create_blob_part(&mut self, bytes: Bytes) -> anyhow::Result<Uuid> { let state = self.state_mut()?; state.create_blob_part(bytes) } fn get_blob_part(&mut self, uuid: &Uuid) -> anyhow::Result<Option<Bytes>> { let state = self.state_mut()?; Ok(state.blob_parts.get(uuid).cloned()) } fn create_stream(&mut self) -> anyhow::Result<Uuid> { self.state_mut()?.create_stream() } fn extend_stream( &mut self, id: Uuid, bytes: Option<Bytes>, new_done: bool, ) -> anyhow::Result<()> { let state = self.state_mut()?; let new_part_id = match bytes { Some(bytes) => { if let Some(request_stream_state) = state.request_stream_state.as_mut() && request_stream_state.stream_id() == id { request_stream_state.track_bytes_read(bytes.len()); } Some(state.create_blob_part(bytes)?) }, None => None, }; state.streams.mutate(&id, |stream| -> anyhow::Result<()> { let Some(Ok(ReadableStream { parts, done })) = stream else { anyhow::bail!("unrecognized stream id {id}"); }; if *done { anyhow::bail!("stream {id} is already done"); } if let Some(new_part_id) = new_part_id { parts.push_back(new_part_id); } if new_done { *done = true; } Ok(()) })?; self.update_stream_listeners()?; Ok(()) } fn new_stream_listener( &mut self, stream_id: Uuid, listener: StreamListener, ) -> anyhow::Result<()> { if self .state_mut()? .stream_listeners .insert(stream_id, listener) .is_some() { anyhow::bail!("cannot read from the same stream twice"); } self.update_stream_listeners() } fn create_text_decoder(&mut self, decoder: TextDecoderResource) -> anyhow::Result<Uuid> { self.state_mut()?.create_text_decoder(decoder) } fn get_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result<&mut TextDecoderResource> { self.state_mut()?.get_text_decoder(uuid) } fn remove_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result<TextDecoderResource> { self.state_mut()?.remove_text_decoder(uuid) } fn get_environment_variable( &mut self, name: EnvVarName, ) -> anyhow::Result<Option<EnvVarValue>> { let state = self.state_mut()?; state.environment.get_environment_variable(name) } fn get_all_table_mappings(&mut self) -> anyhow::Result<NamespacedTableMapping> { let state = self.state_mut()?; state.environment.get_all_table_mappings() } } pub fn run_op<'b, P: OpProvider<'b>>( provider: &mut P, args: v8::FunctionCallbackArguments, rv: v8::ReturnValue, ) -> anyhow::Result<()> { if args.length() < 1 { // This must be a bug in our `udf-runtime` code, not a developer error. anyhow::bail!("op(op_name, ...) takes at least one argument"); } let op_name: v8::Local<v8::String> = args.get(0).try_into()?; let op_name = to_rust_string(provider.scope(), &op_name)?; let timer = metrics::op_timer(&op_name); match &op_name[..] { "throwUncatchableDeveloperError" => { op_throw_uncatchable_developer_error(provider, args, rv)? }, "console/message" => op_console_message(provider, args, rv)?, "console/trace" => op_console_trace(provider, args, rv)?, "console/timeStart" => op_console_time_start(provider, args, rv)?, "console/timeLog" => op_console_time_log(provider, args, rv)?, "console/timeEnd" => op_console_time_end(provider, args, rv)?, "error/stack" => op_error_stack(provider, args, rv)?, "random" => op_random(provider, args, rv)?, "now" => op_now(provider, args, rv)?, "url/getUrlInfo" => op_url_get_url_info(provider, args, rv)?, "url/getUrlSearchParamPairs" => op_url_get_url_search_param_pairs(provider, args, rv)?, "url/stringifyUrlSearchParams" => op_url_stringify_url_search_params(provider, args, rv)?, "url/updateUrlInfo" => op_url_update_url_info(provider, args, rv)?, "blob/createPart" => op_blob_create_part(provider, args, rv)?, "blob/slicePart" => op_blob_slice_part(provider, args, rv)?, "blob/readPart" => op_blob_read_part(provider, args, rv)?, "headers/getMimeType" => op_headers_get_mime_type(provider, args, rv)?, "headers/normalizeName" => op_headers_normalize_name(provider, args, rv)?, "stream/create" => op_stream_create(provider, args, rv)?, "stream/extend" => op_stream_extend(provider, args, rv)?, "textEncoder/encode" => op_text_encoder_encode(provider, args, rv)?, "textEncoder/encodeInto" => op_text_encoder_encode_into(provider, args, rv)?, "textEncoder/decodeSingle" => op_text_encoder_decode_single(provider, args, rv)?, "textEncoder/decode" => op_text_encoder_decode(provider, args, rv)?, "textEncoder/newDecoder" => op_text_encoder_new_decoder(provider, args, rv)?, "textEncoder/cleanup" => op_text_encoder_cleanup(provider, args, rv)?, "textEncoder/normalizeLabel" => op_text_encoder_normalize_label(provider, args, rv)?, "atob" => op_atob(provider, args, rv)?, "btoa" => op_btoa(provider, args, rv)?, "structuredClone" => op_structured_clone(provider, args.get(1), rv)?, "environmentVariables/get" => op_environment_variables_get(provider, args, rv)?, "getTableMapping" => op_get_table_mapping(provider, args, rv)?, "validateArgs" => op_validate_args(provider, args, rv)?, "validateReturns" => op_validate_returns(provider, args, rv)?, "crypto/randomUUID" => op_crypto_random_uuid(provider, args, rv)?, "crypto/getRandomValues" => op_crypto_get_random_values(provider, args, rv)?, "crypto/sign" => op_crypto_sign(provider, args, rv)?, "crypto/signEd25519" => op_crypto_sign_ed25519(provider, args, rv)?, "crypto/verify" => op_crypto_verify(provider, args, rv)?, "crypto/verifyEd25519" => op_crypto_verify_ed25519(provider, args, rv)?, "crypto/deriveBits" => op_crypto_derive_bits(provider, args, rv)?, "crypto/digest" => op_crypto_digest(provider, args, rv)?, "crypto/encrypt" => op_crypto_encrypt(provider, args, rv)?, "crypto/decrypt" => op_crypto_decrypt(provider, args, rv)?, "crypto/importKey" => op_crypto_import_key(provider, args, rv)?, "crypto/importSpkiEd25519" => op_crypto_import_spki_ed25519(provider, args, rv)?, "crypto/importPkcs8Ed25519" => op_crypto_import_pkcs8_ed25519(provider, args, rv)?, "crypto/importSpkiX25519" => op_crypto_import_spki_x25519(provider, args, rv)?, "crypto/importPkcs8X25519" => op_crypto_import_pkcs8_x25519(provider, args, rv)?, "crypto/base64UrlEncode" => op_crypto_base64_url_encode(provider, args, rv)?, "crypto/base64UrlDecode" => op_crypto_base64_url_decode(provider, args, rv)?, "crypto/exportKey" => op_crypto_export_key(provider, args, rv)?, "crypto/exportSpkiEd25519" => op_crypto_export_spki_ed25519(provider, args, rv)?, "crypto/exportPkcs8Ed25519" => op_crypto_export_pkcs8_ed25519(provider, args, rv)?, "crypto/JwkXEd25519" => op_crypto_jwk_x_ed25519(provider, args, rv)?, "crypto/exportSpkiX25519" => op_crypto_export_spki_x25519(provider, args, rv)?, "crypto/exportPkcs8X25519" => op_crypto_export_pkcs8_x25519(provider, args, rv)?, "crypto/generateKeyPair" => op_crypto_generate_keypair(provider, args, rv)?, "crypto/generateKeyBytes" => op_crypto_generate_key_bytes(provider, args, rv)?, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownOperation", format!("Unknown operation {op_name}") )); }, } timer.finish(); Ok(()) } pub fn start_async_op<'b, P: OpProvider<'b>>( provider: &mut P, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) -> anyhow::Result<()> { if args.length() < 1 { anyhow::bail!("asyncOp(op, ...args) takes at least one argument"); } let op_name: v8::Local<v8::String> = args.get(0).try_into()?; let op_name = to_rust_string(provider.scope(), &op_name)?; let resolver = v8::PromiseResolver::new(provider.scope()) .ok_or_else(|| anyhow!("Failed to create PromiseResolver"))?; let promise = resolver.get_promise(provider.scope()); let resolver = v8::Global::new(provider.scope(), resolver); match &op_name[..] { "fetch" => async_op_fetch(provider, args, resolver)?, "form/parseMultiPart" => async_op_parse_multi_part(provider, args, resolver)?, "sleep" => async_op_sleep(provider, args, resolver)?, "storage/store" => async_op_storage_store(provider, args, resolver)?, "storage/get" => async_op_storage_get(provider, args, resolver)?, "stream/readPart" => async_op_stream_read_part(provider, args, resolver)?, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownAsyncOperation", format!("Unknown async operation {op_name}") )); }, }; rv.set(promise.into()); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server