Skip to main content
Glama

Convex MCP server

Official
by get-convex
api.rs20.7 kB
use std::{ ops::Bound, sync::Arc, time::Duration, }; use async_trait::async_trait; use bytes::Bytes; use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentId, ExportPath, PublicFunctionPath, }, http::ResolvedHostname, runtime::Runtime, types::{ AllowedVisibility, ConvexOrigin, FunctionCaller, RepeatableTimestamp, }, RequestId, }; use database::{ Database, LogReader, ReadSet, Subscription, Token, }; use file_storage::{ FileRangeStream, FileStream, }; use futures::{ future::BoxFuture, stream::BoxStream, FutureExt, }; use headers::{ ContentLength, ContentType, }; use keybroker::Identity; use model::{ file_storage::FileStorageId, session_requests::types::SessionRequestIdentifier, }; use sync_types::{ types::SerializedArgs, AuthenticationToken, SerializedQueryJournal, Timestamp, }; use udf::{ HttpActionRequest, HttpActionResponseStreamer, }; use value::{ sha256::Sha256Digest, DeveloperDocumentId, }; use crate::{ Application, FunctionError, FunctionReturn, RedactedActionError, RedactedActionReturn, RedactedMutationError, RedactedMutationReturn, RedactedQueryReturn, }; #[cfg_attr( any(test, feature = "testing"), derive(proptest_derive::Arbitrary, Debug, Clone, PartialEq) )] pub enum ExecuteQueryTimestamp { // Execute the query at the latest timestamp. Latest, // Execute the query at a given timestamp. At(Timestamp), } // A trait that abstracts the backend API. It all state and validation logic // so http routes can be kept thin and stateless. The implementor is also // responsible for routing the request to the appropriate backend in the hosted // version of Convex. #[async_trait] pub trait ApplicationApi: Send + Sync { async fn authenticate( &self, host: &ResolvedHostname, request_id: RequestId, auth_token: AuthenticationToken, ) -> anyhow::Result<Identity>; /// Execute a public query on the root app. This method is used by the sync /// worker and HTTP API for the majority of traffic as the main entry point /// for queries. async fn execute_public_query( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, ts: ExecuteQueryTimestamp, journal: Option<SerializedQueryJournal>, ) -> anyhow::Result<RedactedQueryReturn>; /// Execute an admin query for a particular component. This method is used /// by the sync worker for running queries for the dashboard and only works /// for admin or system identity. async fn execute_admin_query( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ts: ExecuteQueryTimestamp, journal: Option<SerializedQueryJournal>, ) -> anyhow::Result<RedactedQueryReturn>; /// Execute a public mutation on the root app. async fn execute_public_mutation( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, // Identifier used to make this mutation idempotent. mutation_identifier: Option<SessionRequestIdentifier>, // The length of the mutation queue at the time the mutation was executed. mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>; /// Execute an admin mutation for a particular component for the dashboard. async fn execute_admin_mutation( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, mutation_identifier: Option<SessionRequestIdentifier>, // The length of the mutation queue at the time the mutation was executed. mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>; /// Execute a public action on the root app. async fn execute_public_action( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>; /// Execute an admin action for a particular component for the dashboard. async fn execute_admin_action( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>; /// Execute an HTTP action on the root app. async fn execute_http_action( &self, host: &ResolvedHostname, request_id: RequestId, http_request_metadata: HttpActionRequest, identity: Identity, caller: FunctionCaller, response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<()>; /// For the dashboard (and the CLI), run any function in any component /// without knowing its type. This function requires admin identity for /// calling functions outside the root component. async fn execute_any_function( &self, host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<FunctionReturn, FunctionError>>; async fn latest_timestamp( &self, host: &ResolvedHostname, request_id: RequestId, ) -> anyhow::Result<RepeatableTimestamp>; async fn check_store_file_authorization( &self, host: &ResolvedHostname, request_id: RequestId, token: &str, validity: Duration, ) -> anyhow::Result<ComponentId>; async fn store_file( &self, host: &ResolvedHostname, request_id: RequestId, origin: ConvexOrigin, component: ComponentId, content_length: Option<ContentLength>, content_type: Option<ContentType>, expected_sha256: Option<Sha256Digest>, body: BoxStream<'_, anyhow::Result<Bytes>>, ) -> anyhow::Result<DeveloperDocumentId>; async fn get_file_range( &self, host: &ResolvedHostname, request_id: RequestId, origin: ConvexOrigin, component: ComponentId, file_storage_id: FileStorageId, range: (Bound<u64>, Bound<u64>), ) -> anyhow::Result<FileRangeStream>; async fn get_file( &self, host: &ResolvedHostname, request_id: RequestId, origin: ConvexOrigin, component: ComponentId, file_storage_id: FileStorageId, ) -> anyhow::Result<FileStream>; // Returns a fallible subscription client. The implementation is not required to // recover from transient errors with the underlying connection or stream. The // client is responsible to Drop the client and create a new one on any system // errors. The intended use is to create a single client for every new web // socket connection. NOTE: We might eventually strengthen the requirement for // the implementation and require it to reconnect internally but easier to // start this way. async fn subscription_client( &self, host: &ResolvedHostname, ) -> anyhow::Result<Box<dyn SubscriptionClient>>; /// To be used for metrics only. async fn partition_id(&self, host: &ResolvedHostname) -> anyhow::Result<u64>; } // Implements ApplicationApi via Application. #[async_trait] impl<RT: Runtime> ApplicationApi for Application<RT> { async fn authenticate( &self, _host: &ResolvedHostname, _request_id: RequestId, auth_token: AuthenticationToken, ) -> anyhow::Result<Identity> { let validate_time = self.runtime().system_time(); self.authenticate(auth_token, validate_time).await } async fn execute_public_query( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, ts: ExecuteQueryTimestamp, journal: Option<SerializedQueryJournal>, ) -> anyhow::Result<RedactedQueryReturn> { anyhow::ensure!( caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); let ts = match ts { ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(), ExecuteQueryTimestamp::At(ts) => ts, }; self.read_only_udf_at_ts( request_id, PublicFunctionPath::RootExport(path), args.into_args()?, identity, ts, journal, caller, ) .await } async fn execute_admin_query( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ts: ExecuteQueryTimestamp, journal: Option<SerializedQueryJournal>, ) -> anyhow::Result<RedactedQueryReturn> { anyhow::ensure!( path.component.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); let ts = match ts { ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(), ExecuteQueryTimestamp::At(ts) => ts, }; self.read_only_udf_at_ts( request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, ts, journal, caller, ) .await } async fn execute_public_mutation( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, // Identifier used to make this mutation idempotent. mutation_identifier: Option<SessionRequestIdentifier>, mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> { anyhow::ensure!( caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); self.mutation_udf( request_id, PublicFunctionPath::RootExport(path), args.into_args()?, identity, mutation_identifier, caller, mutation_queue_length, ) .await } async fn execute_admin_mutation( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, mutation_identifier: Option<SessionRequestIdentifier>, mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> { anyhow::ensure!( path.component.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); self.mutation_udf( request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, mutation_identifier, caller, mutation_queue_length, ) .await } async fn execute_public_action( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: ExportPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> { anyhow::ensure!( caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); self.action_udf( request_id, PublicFunctionPath::RootExport(path), args.into_args()?, identity, caller, ) .await } async fn execute_admin_action( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> { anyhow::ensure!( path.component.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); self.action_udf( request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, caller, ) .await } async fn execute_any_function( &self, _host: &ResolvedHostname, request_id: RequestId, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, caller: FunctionCaller, ) -> anyhow::Result<Result<FunctionReturn, FunctionError>> { anyhow::ensure!( path.component.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); self.any_udf(request_id, path, args.into_args()?, identity, caller) .await } async fn latest_timestamp( &self, _host: &ResolvedHostname, _request_id: RequestId, ) -> anyhow::Result<RepeatableTimestamp> { Ok(self.now_ts_for_reads()) } async fn execute_http_action( &self, _host: &ResolvedHostname, request_id: RequestId, http_request_metadata: HttpActionRequest, identity: Identity, caller: FunctionCaller, response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<()> { self.http_action_udf( request_id, http_request_metadata, identity, caller, response_streamer, ) .await } async fn check_store_file_authorization( &self, _host: &ResolvedHostname, _request_id: RequestId, token: &str, validity: Duration, ) -> anyhow::Result<ComponentId> { self.key_broker() .check_store_file_authorization(&self.runtime, token, validity) } async fn store_file( &self, _host: &ResolvedHostname, _request_id: RequestId, _origin: ConvexOrigin, component: ComponentId, content_length: Option<ContentLength>, content_type: Option<ContentType>, expected_sha256: Option<Sha256Digest>, body: BoxStream<'_, anyhow::Result<Bytes>>, ) -> anyhow::Result<DeveloperDocumentId> { self.store_file( component, content_length, content_type, expected_sha256, body, ) .await } async fn get_file_range( &self, _host: &ResolvedHostname, _request_id: RequestId, _origin: ConvexOrigin, component: ComponentId, file_storage_id: FileStorageId, range: (Bound<u64>, Bound<u64>), ) -> anyhow::Result<FileRangeStream> { self.get_file_range(component, file_storage_id, range).await } async fn get_file( &self, _host: &ResolvedHostname, _request_id: RequestId, _origin: ConvexOrigin, component: ComponentId, file_storage_id: FileStorageId, ) -> anyhow::Result<FileStream> { self.get_file(component, file_storage_id).await } async fn subscription_client( &self, _host: &ResolvedHostname, ) -> anyhow::Result<Box<dyn SubscriptionClient>> { Ok(Box::new(ApplicationSubscriptionClient { database: self.database.clone(), })) } async fn partition_id(&self, _host: &ResolvedHostname) -> anyhow::Result<u64> { // Not relevant; just return something Ok(0) } } #[async_trait] pub trait SubscriptionClient: Send + Sync { async fn subscribe(&self, token: Token) -> anyhow::Result<Box<dyn SubscriptionTrait>>; } struct ApplicationSubscriptionClient<RT: Runtime> { database: Database<RT>, } #[async_trait] impl<RT: Runtime> SubscriptionClient for ApplicationSubscriptionClient<RT> { async fn subscribe(&self, token: Token) -> anyhow::Result<Box<dyn SubscriptionTrait>> { let inner = self.database.subscribe(token.clone()).await?; Ok(Box::new(ApplicationSubscription { initial_ts: token.ts(), reads: token.reads_owned(), inner, log: self.database.log().clone(), })) } } pub enum SubscriptionValidity { /// The subscription is valid. Valid, /// The subscription may no longer be valid. /// This result can be returned spuriously even if there were no conflicting /// writes. Invalid { /// The earliest conflicting timestamp, if known. This is not guaranteed /// to be known. invalid_ts: Option<Timestamp>, }, } #[async_trait] pub trait SubscriptionTrait: Send + Sync { /// Returns a future that completes after the subscription becomes invalid. /// The future yields the invalidation timestamp, if known; this is the same /// thing as [`SubscriptionValidity`]'s `invalid_ts`. fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<Option<Timestamp>>>; /// Checks if the subscription is still valid as of `new_ts`. See comments /// on [`SubscriptionValidity`]. async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<SubscriptionValidity>; } struct ApplicationSubscription { inner: Subscription, log: LogReader, reads: Arc<ReadSet>, // The initial timestamp the subscription was created at. This is known // to be valid. initial_ts: Timestamp, } #[async_trait] impl SubscriptionTrait for ApplicationSubscription { fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<Option<Timestamp>>> { self.inner.wait_for_invalidation().map(Ok).boxed() } #[fastrace::trace] async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<SubscriptionValidity> { if new_ts < self.initial_ts { // new_ts is before the initial subscription timestamp. return Ok(SubscriptionValidity::Invalid { invalid_ts: None }); } // The inner subscription is periodically updated by the subscription // worker. let Some(current_ts) = self.inner.current_ts() else { // Subscription is no longer valid. We could check validity from end_ts // to new_ts, but this is likely to fail and is potentially unbounded amount of // work, so we return false here. This is valid per the function contract. return Ok(SubscriptionValidity::Invalid { invalid_ts: self.inner.invalid_ts(), }); }; let current_token = Token::new(self.reads.clone(), current_ts); Ok(match self.log.refresh_token(current_token, new_ts)? { Ok(_new_token) => SubscriptionValidity::Valid, Err(invalid_ts) => { // Subscription validity can't be extended. Note that returning false // here also doesn't mean there is a conflict. SubscriptionValidity::Invalid { invalid_ts } }, }) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server