Skip to main content
Glama

Convex MCP server

Official
by get-convex
async_syscall.rs17 kB
#![allow(non_snake_case)] use anyhow::Context; use common::{ bootstrap_model::components::handles::FunctionHandle, components::{ ComponentId, Reference, }, runtime::{ Runtime, UnixTimestamp, }, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use model::{ components::auth::propagate_component_auth, file_storage::FileStorageId, }; use serde::{ Deserialize, Serialize, }; use serde_json::{ json, Value as JsonValue, }; use value::{ id_v6::DeveloperDocumentId, JsonPackedValue, }; use vector::{ VectorSearchJson, VectorSearchRequest, }; use super::task_executor::TaskExecutor; use crate::{ environment::helpers::{ remove_rejected_before_execution, with_argument_error, ArgName, }, helpers::UdfArgsJson, metrics::async_syscall_timer, }; impl<RT: Runtime> TaskExecutor<RT> { pub fn component_id(&self) -> ComponentId { self.component_id } #[fastrace::trace] pub async fn run_async_syscall(&self, name: String, args: JsonValue) -> anyhow::Result<String> { let start = self.rt.monotonic_now(); let timer = async_syscall_timer(&name); let result: anyhow::Result<_> = try { match &name[..] { "1.0/actions/query" => self.async_syscall_actions_runQuery(args).await?.into(), "1.0/actions/mutation" => { self.async_syscall_actions_runMutation(args).await?.into() }, "1.0/actions/action" => self.async_syscall_actions_runAction(args).await?.into(), "1.0/actions/schedule" => self.async_syscall_schedule(args).await?.into(), "1.0/actions/cancel_job" => self.async_syscall_cancel_job(args).await?.into(), "1.0/actions/vectorSearch" => self.async_syscall_vectorSearch(args).await?.into(), "1.0/getUserIdentity" => self.async_syscall_getUserIdentity(args).await?.into(), "1.0/storageDelete" => self.async_syscall_storageDelete(args).await?.into(), "1.0/storageGetMetadata" => { self.async_syscall_storageGetMetadata(args).await?.into() }, "1.0/storageGenerateUploadUrl" => self .async_syscall_storageGenerateUploadUrl(args) .await? .into(), "1.0/storageGetUrl" => self.async_syscall_storageGetUrl(args).await?.into(), "1.0/createFunctionHandle" => { self.async_syscall_createFunctionHandle(args).await?.into() }, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownAsyncOperation", format!("Unknown async operation {name}") )); }, } }; self.syscall_trace .lock() .log_async_syscall(name, start.elapsed(), result.is_ok()); match &result { Ok(_) => timer.finish(), Err(e) => timer.finish_with(e.metric_status_label_value()), }; struct JsonString(String); impl From<JsonPackedValue> for JsonString { fn from(v: JsonPackedValue) -> Self { Self(v.as_str().to_owned()) } } impl From<JsonValue> for JsonString { fn from(v: JsonValue) -> Self { Self(v.to_string()) } } result.map(|JsonString(s)| s) } #[convex_macro::instrument_future] async fn async_syscall_actions_runQuery( &self, args: JsonValue, ) -> anyhow::Result<JsonPackedValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RunQueryArgs { name: Option<String>, reference: Option<String>, function_handle: Option<String>, args: UdfArgsJson, } let RunQueryArgs { name, reference, function_handle, args, } = with_argument_error("runQuery", || Ok(serde_json::from_value(args)?))?; let function_path = match function_handle { Some(function_handle) => { let handle: FunctionHandle = with_argument_error("runQuery", || function_handle.parse())?; self.action_callbacks .lookup_function_handle(self.identity.clone(), handle) .await? }, None => { let reference = parse_name_or_reference("runQuery", name, reference)?; self.resolve_function(&reference)? }, }; let value = self .action_callbacks .execute_query( propagate_component_auth( &self.identity, self.component_id, function_path.component.is_root(), ), function_path, args.into_serialized_args()?, self.context.clone(), ) .await .map_err(remove_rejected_before_execution)? .result?; Ok(value) } #[convex_macro::instrument_future] async fn async_syscall_actions_runMutation( &self, args: JsonValue, ) -> anyhow::Result<JsonPackedValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RunMutationArgs { name: Option<String>, reference: Option<String>, function_handle: Option<String>, args: UdfArgsJson, } let RunMutationArgs { name, reference, function_handle, args, } = with_argument_error("runMutation", || Ok(serde_json::from_value(args)?))?; let function_path = match function_handle { Some(function_handle) => { let handle: FunctionHandle = with_argument_error("runMutation", || function_handle.parse())?; self.action_callbacks .lookup_function_handle(self.identity.clone(), handle) .await? }, None => { let reference = parse_name_or_reference("runMutation", name, reference)?; self.resolve_function(&reference)? }, }; let value = self .action_callbacks .execute_mutation( propagate_component_auth( &self.identity, self.component_id, function_path.component.is_root(), ), function_path, args.into_serialized_args()?, self.context.clone(), ) .await .map_err(remove_rejected_before_execution)? .result?; Ok(value) } #[convex_macro::instrument_future] async fn async_syscall_actions_runAction( &self, args: JsonValue, ) -> anyhow::Result<JsonPackedValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RunActionArgs { name: Option<String>, reference: Option<String>, function_handle: Option<String>, args: UdfArgsJson, } let RunActionArgs { name, reference, function_handle, args, } = with_argument_error("runAction", || Ok(serde_json::from_value(args)?))?; let function_path = match function_handle { Some(function_handle) => { let handle: FunctionHandle = with_argument_error("runAction", || function_handle.parse())?; self.action_callbacks .lookup_function_handle(self.identity.clone(), handle) .await? }, None => { let reference = parse_name_or_reference("runAction", name, reference)?; self.resolve_function(&reference)? }, }; let value = self .action_callbacks .execute_action( propagate_component_auth( &self.identity, self.component_id, function_path.component.is_root(), ), function_path, args.into_serialized_args()?, self.context.clone(), ) .await .map_err(remove_rejected_before_execution)? .result?; Ok(value) } #[convex_macro::instrument_future] async fn async_syscall_schedule(&self, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ScheduleArgs { name: Option<String>, reference: Option<String>, function_handle: Option<String>, ts: f64, args: UdfArgsJson, } let ScheduleArgs { name, reference, function_handle, ts, args, }: ScheduleArgs = with_argument_error("scheduler", || Ok(serde_json::from_value(args)?))?; let path = match function_handle { Some(h) => { let handle: FunctionHandle = with_argument_error("scheduler", || h.parse())?; self.action_callbacks .lookup_function_handle(self.identity.clone(), handle) .await? }, None => { let reference = parse_name_or_reference("scheduler", name, reference)?; self.resolve_function(&reference)? }, }; let scheduled_ts = with_argument_error("ts", || UnixTimestamp::from_secs_f64(ts))?; let virtual_id = self .action_callbacks .schedule_job( self.identity.clone(), self.component_id(), path, args.into_serialized_args()?, scheduled_ts, self.context.clone(), ) .await?; Ok(JsonValue::from(virtual_id)) } #[convex_macro::instrument_future] async fn async_syscall_cancel_job(&self, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CancelJobArgs { id: String, } let virtual_id_v6 = with_argument_error("db.cancel_job", || { let args: CancelJobArgs = serde_json::from_value(args)?; let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; Ok(id) })?; self.action_callbacks .cancel_job(self.identity.clone(), virtual_id_v6) .await?; Ok(JsonValue::Null) } #[convex_macro::instrument_future] async fn async_syscall_vectorSearch(&self, args: JsonValue) -> anyhow::Result<JsonValue> { let VectorSearchRequest { query } = serde_json::from_value(args)?; let component_id = self.component_id(); let mut vector_search_query: VectorSearchJson = serde_json::from_value(query)?; vector_search_query.insert_component_id(component_id); let (results, usage_stats) = self .action_callbacks .vector_search( self.identity.clone(), serde_json::to_value(vector_search_query)?, ) .await?; self.usage_tracker.add(usage_stats); let results: Vec<_> = results.into_iter().map(JsonValue::from).collect(); Ok(json!({ "results": results })) } #[convex_macro::instrument_future] async fn async_syscall_getUserIdentity(&self, _args: JsonValue) -> anyhow::Result<JsonValue> { self.user_identity() } async fn async_syscall_storageGenerateUploadUrl( &self, _args: JsonValue, ) -> anyhow::Result<JsonValue> { let issued_ts = self.rt.unix_timestamp(); let component = self.component_id(); let postUrl = self.file_storage.generate_upload_url_with_origin( self.convex_origin_override.lock().clone(), &self.key_broker, issued_ts, component, )?; Ok(serde_json::to_value(postUrl)?) } #[convex_macro::instrument_future] async fn async_syscall_storageGetUrl(&self, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct GetUrlArgs { storage_id: String, } let storage_id: FileStorageId = with_argument_error("storage.getUrl", || { let GetUrlArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) })?; let url = self .action_callbacks .storage_get_url(self.identity.clone(), self.component_id(), storage_id) .await?; Ok(url.into()) } #[convex_macro::instrument_future] async fn async_syscall_storageDelete(&self, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct StorageDeleteArgs { storage_id: String, } let storage_id: FileStorageId = with_argument_error("storage.delete", || { let StorageDeleteArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) })?; self.action_callbacks .storage_delete(self.identity.clone(), self.component_id(), storage_id) .await?; Ok(JsonValue::Null) } #[convex_macro::instrument_future] async fn async_syscall_storageGetMetadata(&self, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct StorageGetMetadataArgs { storage_id: String, } let storage_id: FileStorageId = with_argument_error("storage.getMetadata", || { let StorageGetMetadataArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) })?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct FileMetadataJson { storage_id: String, sha256: String, size: i64, content_type: Option<String>, } let file_metadata = self .action_callbacks .storage_get_file_entry(self.identity.clone(), self.component_id(), storage_id) .await? .map(|(_, entry)| { // NB: `storage_key is an internal field that we shouldn't return in syscalls. FileMetadataJson { storage_id: entry.storage_id.to_string(), // TODO(CX-5533) use base64 for consistency. sha256: entry.sha256.as_hex(), size: entry.size, content_type: entry.content_type, } }); Ok(serde_json::to_value(file_metadata)?) } async fn async_syscall_createFunctionHandle( &self, args: JsonValue, ) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateFunctionHandleArgs { name: Option<String>, function_handle: Option<String>, reference: Option<String>, } let CreateFunctionHandleArgs { name, function_handle, reference, } = with_argument_error("createFunctionHandle", || Ok(serde_json::from_value(args)?))?; let function_path = match function_handle { Some(function_handle) => { return Ok(serde_json::to_value(function_handle)?); }, None => { let reference = parse_name_or_reference("createFunctionHandle", name, reference)?; self.resolve_function(&reference)? }, }; let handle = self .action_callbacks .create_function_handle(self.identity.clone(), function_path) .await?; Ok(serde_json::to_value(String::from(handle))?) } } pub fn parse_name_or_reference( function_name: &str, name: Option<String>, reference: Option<String>, ) -> anyhow::Result<Reference> { with_argument_error(function_name, || match (name, reference) { (Some(name), _) => Ok(Reference::Function(name.parse()?)), (_, Some(reference)) => Ok(reference.parse()?), _ => anyhow::bail!("Missing required argument 'name'"), }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server