Skip to main content
Glama

Convex MCP server

Official
by get-convex
syscall.rs8.48 kB
#![allow(non_snake_case)] use std::str::FromStr; use anyhow::Context; use common::{ query::Query, runtime::Runtime, static_span, version::Version, }; use database::{ query::TableFilter, DeveloperQuery, }; use errors::ErrorMetadata; use model::virtual_system_mapping; use serde::{ Deserialize, Serialize, }; use serde_json::{ json, Value as JsonValue, }; use value::{ id_v6::DeveloperDocumentId, identifier::Identifier, ConvexValue, InternalId, TableName, TableNumber, TabletIdAndTableNumber, }; use super::{ async_syscall::AsyncSyscallProvider, DatabaseUdfEnvironment, }; use crate::environment::helpers::{ parse_version, with_argument_error, ArgName, }; pub trait SyscallProvider<RT: Runtime> { fn table_filter(&self) -> TableFilter; fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TabletIdAndTableNumber>>; fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>>; fn component_argument(&self, name: &str) -> anyhow::Result<Option<ConvexValue>>; fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32>; fn cleanup_query(&mut self, query_id: u32) -> bool; } impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> { fn table_filter(&self) -> TableFilter { if self.path.udf_path.is_system() { TableFilter::IncludePrivateSystemTables } else { TableFilter::ExcludePrivateSystemTables } } fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TabletIdAndTableNumber>> { let namespace = self.phase.component()?.into(); let table_mapping = self.phase.tx()?.table_mapping().namespace(namespace); Ok(table_mapping.id_and_number_if_exists(name)) } fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>> { let virtual_mapping = virtual_system_mapping(); let Ok(physical_table_name) = virtual_mapping.virtual_to_system_table(name) else { return Ok(None); }; self.lookup_table(physical_table_name) .map(|r| r.map(|t| t.table_number)) } fn component_argument(&self, name: &str) -> anyhow::Result<Option<ConvexValue>> { let component_arguments = self.phase.component_arguments()?; let result = match Identifier::from_str(name) { Ok(identifier) => component_arguments.get(&identifier).cloned(), Err(_) => None, }; Ok(result) } fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32> { let table_filter = SyscallProvider::<RT>::table_filter(self); let component = self.component()?; let tx = self.phase.tx()?; // TODO: Are all invalid query pipelines developer errors? These could be bugs // in convex/server. let compiled_query = { DeveloperQuery::new_with_version(tx, component.into(), query, version, table_filter)? }; let query_id = self.query_manager.put_developer(compiled_query); Ok(query_id) } fn cleanup_query(&mut self, query_id: u32) -> bool { self.query_manager.cleanup_developer(query_id) } } pub fn syscall_impl<RT: Runtime, P: SyscallProvider<RT>>( provider: &mut P, name: &str, args: JsonValue, ) -> anyhow::Result<JsonValue> { match name { "1.0/queryCleanup" => syscall_query_cleanup(provider, args), "1.0/queryStream" => syscall_query_stream(provider, args), "1.0/db/normalizeId" => syscall_normalize_id(provider, args), "1.0/componentArgument" => syscall_component_argument(provider, args), #[cfg(any(test, feature = "testing"))] "throwSystemError" => anyhow::bail!("I can't go for that."), "throwOcc" => anyhow::bail!(ErrorMetadata::user_occ(None, None, None, None)), "throwOverloaded" => { anyhow::bail!(ErrorMetadata::overloaded("Busy", "I'm a bit busy.")) }, #[cfg(test)] "slowSyscall" => { std::thread::sleep(std::time::Duration::from_secs(1)); Ok(JsonValue::Number(1017.into())) }, #[cfg(test)] "reallySlowSyscall" => { std::thread::sleep(std::time::Duration::from_secs(3)); Ok(JsonValue::Number(1017.into())) }, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownOperation", format!("Unknown operation {name}") )); }, } } fn syscall_normalize_id<RT: Runtime, P: SyscallProvider<RT>>( provider: &mut P, args: JsonValue, ) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct NormalizeIdArgs { table: String, id_string: String, } let (table_name, id_string) = with_argument_error("db.normalizeId", || { let args: NormalizeIdArgs = serde_json::from_value(args)?; let table_name: TableName = args.table.parse().context(ArgName("table"))?; Ok((table_name, args.id_string)) })?; let virtual_table_number = provider.lookup_virtual_table(&table_name)?; let table_number = match virtual_table_number { Some(table_number) => Some(table_number), None => { let physical_table_number = provider.lookup_table(&table_name)?.map(|t| t.table_number); match provider.table_filter() { TableFilter::IncludePrivateSystemTables => physical_table_number, TableFilter::ExcludePrivateSystemTables if table_name.is_system() => None, TableFilter::ExcludePrivateSystemTables => physical_table_number, } }, }; let normalized_id = match table_number { Some(table_number) => { if let Ok(id_v6) = DeveloperDocumentId::decode(&id_string) && id_v6.table() == table_number { Some(id_v6) } else if let Ok(internal_id) = InternalId::from_developer_str(&id_string) { let id_v6 = DeveloperDocumentId::new(table_number, internal_id); Some(id_v6) } else { None } }, None => None, }; match normalized_id { Some(id_v6) => Ok(json!({ "id": id_v6.encode() })), None => Ok(json!({ "id": JsonValue::Null })), } } fn syscall_component_argument<RT: Runtime, P: SyscallProvider<RT>>( provider: &mut P, args: JsonValue, ) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] struct ComponentArgumentArgs { name: String, } let arg_name = with_argument_error("componentArgument", || { let ComponentArgumentArgs { name } = serde_json::from_value(args)?; Ok(name) })?; let result = match provider.component_argument(&arg_name)? { Some(value) => json!({ "value": value }), None => json!({}), }; Ok(result) } fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>( provider: &mut P, args: JsonValue, ) -> anyhow::Result<JsonValue> { let _s = static_span!(); #[derive(Deserialize)] struct QueryStreamArgs { query: JsonValue, version: Option<String>, } let (parsed_query, version) = with_argument_error("queryStream", || { let args: QueryStreamArgs = serde_json::from_value(args)?; let parsed_query = Query::try_from(args.query).context(ArgName("query"))?; let version = parse_version(args.version)?; Ok((parsed_query, version)) })?; let query_id = provider.start_query(parsed_query, version)?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct QueryStreamResult { query_id: u32, } Ok(serde_json::to_value(QueryStreamResult { query_id })?) } fn syscall_query_cleanup<RT: Runtime, P: SyscallProvider<RT>>( provider: &mut P, args: JsonValue, ) -> anyhow::Result<JsonValue> { let _s = static_span!(); #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct QueryCleanupArgs { query_id: u32, } let args: QueryCleanupArgs = with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?; let cleaned_up = provider.cleanup_query(args.query_id); Ok(serde_json::to_value(cleaned_up)?) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server