Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs16.8 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, sync::{ Arc, LazyLock, }, }; use anyhow::Context; use common::{ components::{ CanonicalizedComponentFunctionPath, CanonicalizedComponentModulePath, ComponentId, ResolvedComponentFunctionPath, }, document::ParsedDocument, runtime::Runtime, types::ModuleEnvironment, value::ResolvedDocumentId, }; use database::{ unauthorized_error, BootstrapComponentsModel, SystemMetadataModel, Transaction, }; use errors::ErrorMetadata; use metrics::get_module_metadata_timer; use sync_types::CanonicalizedModulePath; use value::{ sha256::{ Sha256, Sha256Digest, }, FieldPath, TableName, }; use self::{ module_versions::{ AnalyzedFunction, AnalyzedModule, ModuleSource, SourceMap, }, types::ModuleMetadata, user_error::{ FunctionNotFoundError, ModuleNotFoundError, }, }; use crate::{ config::{ module_loader::ModuleLoader, types::{ ModuleConfig, ModuleDiff, }, }, source_packages::types::SourcePackageId, SystemIndex, SystemTable, }; pub mod function_validators; mod metrics; pub mod module_versions; pub mod types; pub mod user_error; /// Table name for user modules. pub static MODULES_TABLE: LazyLock<TableName> = LazyLock::new(|| "_modules".parse().expect("Invalid built-in module table")); /// Field for a module's path in `ModuleMetadata`. static PATH_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "path".parse().expect("Invalid built-in field")); /// Field for a module's deleted flag in `ModuleMetadata`. static DELETED_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "deleted".parse().expect("Invalid built-in field")); pub static MODULE_INDEX_BY_PATH: LazyLock<SystemIndex<ModulesTable>> = LazyLock::new(|| SystemIndex::new("by_path", [&PATH_FIELD]).unwrap()); pub static MODULE_INDEX_BY_DELETED: LazyLock<SystemIndex<ModulesTable>> = LazyLock::new(|| SystemIndex::new("by_deleted", [&DELETED_FIELD, &PATH_FIELD]).unwrap()); pub static HTTP_MODULE_PATH: LazyLock<CanonicalizedModulePath> = LazyLock::new(|| "http.js".parse().unwrap()); pub struct ModulesTable; impl SystemTable for ModulesTable { type Metadata = ModuleMetadata; fn table_name() -> &'static TableName { &MODULES_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![ MODULE_INDEX_BY_PATH.clone(), MODULE_INDEX_BY_DELETED.clone(), ] } } pub struct ModuleModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, } impl<'a, RT: Runtime> ModuleModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>) -> Self { Self { tx } } #[fastrace::trace] pub async fn apply( &mut self, component: ComponentId, modules: Vec<ModuleConfig>, source_package_id: Option<SourcePackageId>, mut analyze_results: BTreeMap<CanonicalizedModulePath, AnalyzedModule>, ) -> anyhow::Result<ModuleDiff> { if modules.iter().any(|c| c.path.is_system()) { anyhow::bail!("You cannot push functions under the '_system/' directory."); } let mut added_modules = BTreeSet::new(); // Add new modules. let mut remaining_modules: BTreeMap<_, _> = self .get_application_metadata(component) .await? .into_iter() .map(|module| (module.path.clone(), module.id())) .collect(); for module in modules { let path = module.path.canonicalize(); let existing_module_id = remaining_modules.remove(&path); if existing_module_id.is_none() { added_modules.insert(path.clone()); } let analyze_result = if !path.is_deps() { // We expect AnalyzeResult to always be set for non-dependency modules. let analyze_result = analyze_results.remove(&path).context(format!( "Missing analyze result for module {}", path.as_str() ))?; Some(analyze_result) } else { // We don't analyze dependencies. None }; self.put( existing_module_id, CanonicalizedComponentModulePath { component, module_path: path.clone(), }, module.source, source_package_id.context("missing source_package_id")?, module.source_map, analyze_result, module.environment, ) .await?; } let mut removed_modules = BTreeSet::new(); for (path, module_id) in remaining_modules { removed_modules.insert(path.clone()); self.delete(component, module_id).await?; } ModuleDiff::new(added_modules, removed_modules) } /// Returns the registered modules metadata, including system modules. #[fastrace::trace] pub async fn get_all_metadata( &mut self, component: ComponentId, ) -> anyhow::Result<Vec<ParsedDocument<ModuleMetadata>>> { // Hacky: It's important that we scan the _by_id index instead of the // _by_creation_time index (which is used by `Query::full_table_scan`). // This prevents creating too many read ranges in the transaction later // if we need to replace many documents by-id. let modules = self .tx .query_system(component.into(), &SystemIndex::<ModulesTable>::by_id())? .all() .await?; // TODO: thread Arc out of this function Ok(modules.into_iter().map(Arc::unwrap_or_clone).collect()) } pub async fn get_application_metadata( &mut self, component: ComponentId, ) -> anyhow::Result<Vec<ParsedDocument<ModuleMetadata>>> { let modules = self .get_all_metadata(component) .await? .into_iter() .filter(|metadata| !metadata.path.is_system()) .collect(); Ok(modules) } /// Returns all registered modules that aren't system modules. pub async fn get_application_modules( &mut self, component: ComponentId, module_loader: &dyn ModuleLoader<RT>, ) -> anyhow::Result<BTreeMap<CanonicalizedModulePath, ModuleConfig>> { let mut modules = BTreeMap::new(); for metadata in self.get_all_metadata(component).await? { let path = metadata.path.clone(); if !path.is_system() { let environment = metadata.environment; let full_source = module_loader .get_module( self.tx, CanonicalizedComponentModulePath { component, module_path: metadata.path.clone(), }, ) .await? .context("Module source does not exist")?; let module_config = ModuleConfig { path: path.clone().into(), source: full_source.source.clone(), source_map: full_source.source_map.clone(), environment, }; if modules.insert(path.clone(), module_config).is_some() { panic!("Duplicate application module at {path:?}"); } } } Ok(modules) } pub async fn get_metadata_for_function( &mut self, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> { let module_path = BootstrapComponentsModel::new(self.tx).function_path_to_module(&path)?; let module_metadata = self.get_metadata(module_path).await?; Ok(module_metadata) } pub async fn get_metadata_for_function_by_id( &mut self, path: &ResolvedComponentFunctionPath, ) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> { let module_path = CanonicalizedComponentModulePath { component: path.component, module_path: path.udf_path.module().clone(), }; let module_metadata = self.get_metadata(module_path).await?; Ok(module_metadata) } /// Helper function to get a module at the latest version. pub async fn get_metadata( &mut self, path: CanonicalizedComponentModulePath, ) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> { let timer = get_module_metadata_timer(); let is_system = path.module_path.is_system(); if is_system && !(self.tx.identity().is_admin() || self.tx.identity().is_system()) { anyhow::bail!(unauthorized_error("get_module")) } let module_metadata = match self.module_metadata(path).await? { Some(r) => r, None => return Ok(None), }; timer.finish(); Ok(Some(module_metadata)) } /// Put a module's source at a given path. /// `module_id` is the existing module at this `path`. pub async fn put( &mut self, module_id: Option<ResolvedDocumentId>, path: CanonicalizedComponentModulePath, source: ModuleSource, source_package_id: SourcePackageId, source_map: Option<SourceMap>, analyze_result: Option<AnalyzedModule>, environment: ModuleEnvironment, ) -> anyhow::Result<()> { if !(self.tx.identity().is_admin() || self.tx.identity().is_system()) { anyhow::bail!(unauthorized_error("put_module")); } if path.module_path.is_system() { anyhow::bail!("You cannot push a function under '_system/'"); } anyhow::ensure!( path.module_path.is_deps() || analyze_result.is_some(), "AnalyzedModule is required for non-dependency modules" ); let sha256 = hash_module_source(&source, source_map.as_ref()); self.put_module_metadata( module_id, path, source_package_id, analyze_result, environment, sha256, ) .await?; Ok(()) } async fn put_module_metadata( &mut self, module_id: Option<ResolvedDocumentId>, path: CanonicalizedComponentModulePath, source_package_id: SourcePackageId, analyze_result: Option<AnalyzedModule>, environment: ModuleEnvironment, sha256: Sha256Digest, ) -> anyhow::Result<ResolvedDocumentId> { let new_metadata = ModuleMetadata { path: path.module_path, source_package_id, environment, analyze_result: analyze_result.clone(), sha256, }; let module_id = match module_id { Some(module_id) => { SystemMetadataModel::new(self.tx, path.component.into()) .replace(module_id, new_metadata.try_into()?) .await?; module_id }, None => { SystemMetadataModel::new(self.tx, path.component.into()) .insert(&MODULES_TABLE, new_metadata.try_into()?) .await? }, }; Ok(module_id) } /// Delete a module, making it inaccessible for subsequent transactions. pub async fn delete( &mut self, component: ComponentId, module_id: ResolvedDocumentId, ) -> anyhow::Result<()> { if !(self.tx.identity().is_admin() || self.tx.identity().is_system()) { anyhow::bail!(unauthorized_error("delete_module")); } let namespace = component.into(); SystemMetadataModel::new(self.tx, namespace) .delete(module_id) .await?; Ok(()) } #[convex_macro::instrument_future] async fn module_metadata( &mut self, path: CanonicalizedComponentModulePath, ) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> { let namespace = path.component.into(); let module_document = self .tx .query_system(namespace, &MODULE_INDEX_BY_PATH)? .eq(&[path.module_path.as_str()])? .unique() .await?; // TODO: thread Arc out of this function Ok(module_document.map(Arc::unwrap_or_clone)) } // Helper method that returns the AnalyzedFunction for the specified path. // It returns a user error if the module or function does not exist. // Note that using this method will error if AnalyzedResult is not backfilled, pub async fn get_analyzed_function( &mut self, path: &CanonicalizedComponentFunctionPath, ) -> anyhow::Result<anyhow::Result<AnalyzedFunction>> { let udf_path = &path.udf_path; let Some(module) = self.get_metadata_for_function(path.clone()).await? else { let err = ModuleNotFoundError::new(udf_path.module().as_str()); return Ok(Err(ErrorMetadata::bad_request( "ModuleNotFound", err.to_string(), ) .into())); }; // Dependency modules don't have AnalyzedModule. if !udf_path.module().is_deps() { let analyzed_module = module .analyze_result .as_ref() .ok_or_else(|| anyhow::anyhow!("Expected analyze result for {udf_path:?}"))?; for function in &analyzed_module.functions { if &function.name == udf_path.function_name() { return Ok(Ok(function.clone())); } } } Ok(Err(ErrorMetadata::bad_request( "FunctionNotFound", FunctionNotFoundError::new(udf_path.function_name(), udf_path.module().as_str()) .to_string(), ) .into())) } // Helper method that returns the AnalyzedFunction for the specified path. // It returns a user error if the module or function does not exist. // Note that using this method will error if AnalyzedResult is not backfilled, pub async fn get_analyzed_function_by_id( &mut self, path: &ResolvedComponentFunctionPath, ) -> anyhow::Result<anyhow::Result<AnalyzedFunction>> { let udf_path = &path.udf_path; let Some(module) = self.get_metadata_for_function_by_id(path).await? else { let err = ModuleNotFoundError::new(udf_path.module().as_str()); return Ok(Err(ErrorMetadata::bad_request( "ModuleNotFound", err.to_string(), ) .into())); }; // Dependency modules don't have AnalyzedModule. if !udf_path.module().is_deps() { let analyzed_module = module .analyze_result .as_ref() .ok_or_else(|| anyhow::anyhow!("Expected analyze result for {udf_path:?}"))?; for function in &analyzed_module.functions { if &function.name == udf_path.function_name() { return Ok(Ok(function.clone())); } } } Ok(Err(ErrorMetadata::bad_request( "FunctionNotFound", FunctionNotFoundError::new(udf_path.function_name(), udf_path.module().as_str()) .to_string(), ) .into())) } pub async fn get_http( &mut self, component: ComponentId, ) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> { let path = CanonicalizedComponentModulePath { component, module_path: HTTP_MODULE_PATH.clone(), }; self.get_metadata(path).await } pub async fn has_http(&mut self, component: ComponentId) -> anyhow::Result<bool> { Ok(self.get_http(component).await?.is_some()) } } /// Hash a module's source and source map. This same hash is also computed in /// the CLI to determine if a module has changed. Therefore this algorithm /// can never be changed (if you want a new algorithm, we need a new API /// endpoint and a new CLI version to call it). pub fn hash_module_source(source: &ModuleSource, source_map: Option<&SourceMap>) -> Sha256Digest { let mut hasher = Sha256::new(); hasher.update(source.as_bytes()); if let Some(source_map) = source_map { hasher.update(source_map.as_bytes()); } hasher.finalize() }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server