Skip to main content
Glama

Convex MCP server

Official
by get-convex
broker.rs49.6 kB
use std::{ collections::{ BTreeMap, HashMap, }, fmt, time::{ Duration, SystemTime, }, }; use anyhow::Context; use biscuit::JWT; pub use common::types::SystemKey; use common::{ components::ComponentId, identity::{ IdentityCacheKey, InertIdentity, }, index::IndexKeyBytes, query::{ Cursor, CursorPosition, SerializedCursor, }, query_journal::QueryJournal, runtime::{ Runtime, UnixTimestamp, }, types::{ format_admin_key, remove_type_prefix_from_instance_name, split_admin_key, ActionCallbackToken, AdminKey, MemberId, PersistenceVersion, TeamId, UdfType, }, }; use errors::ErrorMetadata; use metrics::StaticMetricLabel; use openidconnect::{ core::{ CoreGenderClaim, CoreIdTokenVerifier, CoreJweContentEncryptionAlgorithm, CoreJwsSigningAlgorithm, }, AdditionalClaims, IdToken, Nonce, }; use pb::{ convex_actions::ActionCallbackToken as ActionCallbackTokenProto, convex_cursor::{ instance_cursor::Position as PositionProto, IndexKey as IndexKeyProto, InstanceCursor as InstanceCursorProto, }, convex_identity::{ unchecked_identity::Identity as UncheckedIdentityProto, ActingUser, UnknownIdentity, }, convex_keys::{ admin_key::Identity as AdminIdentityProto, storage_token::{ AuthorizationType as AuthorizationTypeProto, StoreFile as StoreFileProto, }, AdminKey as AdminKeyProto, StorageToken as StorageTokenProto, }, convex_query_journal::InstanceQueryJournal as InstanceQueryJournalProto, }; #[cfg(any(test, feature = "testing"))] use proptest::prelude::{ Arbitrary, Strategy, }; use serde::{ Deserialize, Serialize, }; use sync_types::{ AuthenticationToken, SerializedQueryJournal, UserIdentifier, UserIdentityAttributes, }; #[cfg(any(test, feature = "testing"))] use crate::testing::TestUserIdentity; use crate::{ encryptor::{ DeterministicEncryptor, Purpose, RandomEncryptor, }, legacy_encryptor::LegacyEncryptor, metrics::{ log_actions_token_expired, log_store_file_auth_expired, }, secret::InstanceSecret, }; const ACTION_KEY_VERSION: u8 = 2; const ADMIN_KEY_VERSION: u8 = 1; const CURSOR_VERSION: u8 = 7; const STORE_FILE_AUTHZ_VERSION: u8 = 1; const QUERY_JOURNAL_VERSION: u8 = 7; // Max delay from transaction start time -> key being issued that is tolerable. const MAX_TS_DELAY: Duration = Duration::from_secs(15); #[derive(Clone)] pub struct KeyBroker { instance_name: String, encryptor: LegacyEncryptor, admin_key_encryptor: RandomEncryptor, action_callback_encryptor: RandomEncryptor, cursor_encryptor: DeterministicEncryptor, journal_encryptor: RandomEncryptor, store_file_encryptor: RandomEncryptor, } // This enum encodes a successful authentication decision, and its nontrivial // variants cannot be constructed outside this crate. Since possession of this // value confers access permissions, don't store it persistently: Instead, use // [`common::identity::InertIdentity`] to store an "inert" version that records // the variant without representation authentication. #[derive(Clone, Debug)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum Identity { InstanceAdmin(AdminIdentity), System(SystemIdentity), User(UserIdentity), // ActingUser keeps track of the ID of the admin acting as a user, // and that user's fake attributes ActingUser(AdminIdentity, UserIdentityAttributes), // Unknown(None) means no identity was provided. // Unknown(Some(error_message)) means an error occurred while parsing the identity. // We allow the request to go through, but keep the error to throw when code tries to // access the identity (eg ctx.getUserIdentity()) Unknown(Option<ErrorMetadata>), // include an optional error message } impl From<Identity> for AuthenticationToken { fn from(i: Identity) -> Self { match i { Identity::User(identity) => AuthenticationToken::User(identity.original_token), Identity::ActingUser(identity, user) => { AuthenticationToken::Admin(identity.key, Some(user)) }, Identity::InstanceAdmin(identity) => AuthenticationToken::Admin(identity.key, None), _ => AuthenticationToken::None, } } } impl From<Identity> for pb::convex_identity::UncheckedIdentity { fn from(i: Identity) -> Self { let identity = match i { Identity::InstanceAdmin(admin_identity) => { UncheckedIdentityProto::AdminIdentity(admin_identity.into()) }, Identity::System(_) => UncheckedIdentityProto::System(()), Identity::User(user_identity) => { UncheckedIdentityProto::UserIdentity(user_identity.into()) }, Identity::ActingUser(admin_identity, attributes) => { UncheckedIdentityProto::ActingUser(ActingUser { admin_identity: Some(admin_identity.into()), attributes: Some(attributes.into()), }) }, Identity::Unknown(error_message) => UncheckedIdentityProto::Unknown(UnknownIdentity { error_message: error_message.map(|e| e.into()), }), }; Self { identity: Some(identity), } } } impl Identity { pub fn from_proto_unchecked( msg: pb::convex_identity::UncheckedIdentity, ) -> anyhow::Result<Self> { let identity = msg .identity .ok_or_else(|| anyhow::anyhow!("Missing nested identity"))?; match identity { UncheckedIdentityProto::AdminIdentity(admin_identity) => Ok(Identity::InstanceAdmin( AdminIdentity::from_proto_unchecked(admin_identity)?, )), UncheckedIdentityProto::System(()) => Ok(Identity::System(SystemIdentity)), UncheckedIdentityProto::UserIdentity(user_identity) => Ok(Identity::User( UserIdentity::from_proto_unchecked(user_identity)?, )), UncheckedIdentityProto::ActingUser(ActingUser { admin_identity, attributes, }) => { let admin_identity = AdminIdentity::from_proto_unchecked( admin_identity.ok_or_else(|| anyhow::anyhow!("Missing admin identity"))?, )?; let attributes = attributes.ok_or_else(|| anyhow::anyhow!("Missing user attributes"))?; Ok(Identity::ActingUser(admin_identity, attributes.try_into()?)) }, UncheckedIdentityProto::Unknown(UnknownIdentity { error_message }) => Ok( Identity::Unknown(error_message.map(|e| e.try_into()).transpose()?), ), } } pub fn ensure_can_run_function(&self, udf_type: UdfType) -> anyhow::Result<()> { // Everyone can run queries. if udf_type == UdfType::Query { return Ok(()); } match self { Identity::InstanceAdmin(admin_identity) | Identity::ActingUser(admin_identity, _) => { if admin_identity.is_read_only() { anyhow::bail!(ErrorMetadata::forbidden( "Unauthorized", format!("You do not have permission to run {udf_type} functions.") )); } }, _ => {}, } Ok(()) } pub fn tag(&self) -> StaticMetricLabel { InertIdentity::from(self.clone()).tag() } } impl From<Identity> for InertIdentity { fn from(i: Identity) -> Self { match i { Identity::InstanceAdmin(i) => InertIdentity::InstanceAdmin(i.instance_name), Identity::System(_) => InertIdentity::System, Identity::Unknown(_) => InertIdentity::Unknown, Identity::User(user) => InertIdentity::User(user.attributes.token_identifier), Identity::ActingUser(identity, user) => match identity.principal { AdminIdentityPrincipal::Member(member_id) => { InertIdentity::MemberActingUser(member_id, user.token_identifier) }, AdminIdentityPrincipal::Team(team_id) => { InertIdentity::TeamActingUser(team_id, user.token_identifier) }, }, } } } impl PartialEq for Identity { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::InstanceAdmin(l), Self::InstanceAdmin(r)) => l == r, (Self::System(..), Self::System(..)) => true, (Self::User(l), Self::User(r)) => { l.attributes.token_identifier == r.attributes.token_identifier }, (Self::Unknown(_), Self::Unknown(_)) => true, ( Self::ActingUser(l_admin_identity, l_attributes), Self::ActingUser(r_admin_identity, r_attributes), ) => l_admin_identity == r_admin_identity && l_attributes == r_attributes, (Self::InstanceAdmin(_), _) | (Self::System(_), _) | (Self::User(_), _) | (Self::Unknown(_), _) | (Self::ActingUser(..), _) => false, } } } impl Eq for Identity {} impl Identity { pub fn cache_key(&self) -> IdentityCacheKey { match self.clone() { Identity::InstanceAdmin(i) => IdentityCacheKey::InstanceAdmin(i.instance_name), Identity::System(_) => IdentityCacheKey::System, Identity::Unknown(error_message) => { IdentityCacheKey::Unknown(error_message.map(|e| e.to_string())) }, Identity::User(user) => IdentityCacheKey::User(user.attributes), // Identity of the impersonator not relevant for caching. Only the one being // impersonated. Identity::ActingUser(_identity, user) => IdentityCacheKey::User(user), } } /// Easy-to-audit entry point for creating a global system identity. pub fn system() -> Self { Identity::System(SystemIdentity) } /// Entry point for creating a User identity after authenticating the user. pub fn user(user: UserIdentity) -> Self { Identity::User(user) } pub fn is_system(&self) -> bool { matches!(self, Identity::System(..)) } pub fn is_admin(&self) -> bool { matches!(self, Identity::InstanceAdmin(..)) } pub fn is_user(&self) -> bool { matches!(self, Identity::User(..)) } /// Returns the admin's [`MemberId`] if this is an /// [`Identity::InstanceAdmin`] with a member principal pub fn member_id(&self) -> Option<MemberId> { if let Identity::InstanceAdmin(AdminIdentity { principal, .. }) = self { return if let AdminIdentityPrincipal::Member(member_id) = principal { Some(*member_id) } else { None }; } None } pub fn instance_admin_principal(&self) -> Option<AdminIdentityPrincipal> { if let Identity::InstanceAdmin(AdminIdentity { principal, .. }) = self { return Some(principal.clone()); } None } pub fn instance_name(&self) -> Option<String> { if let Identity::InstanceAdmin(AdminIdentity { instance_name, .. }) = self { return Some(instance_name.to_string()); } None } pub fn user_identity(&self) -> Option<UserIdentity> { if let Identity::User(id) = self { return Some(id.clone()); } None } pub fn assert_present(&self) -> anyhow::Result<()> { if matches!(self, Identity::Unknown(_)) { anyhow::bail!(ErrorMetadata::unauthenticated( "AuthorizationMissing", "This request requires the HTTP `Authorization` header.", )); } Ok(()) } } #[derive(Clone, Debug, PartialEq)] #[non_exhaustive] // Prevents creating this struct without calling `from_token` pub struct UserIdentity { pub subject: String, // Might be useful for developers to know which provider authenticated this user. pub issuer: String, pub expiration: SystemTime, pub attributes: UserIdentityAttributes, // The original token this user identity was created from. This may either by an // OIDC JWT or a custom JWT. pub original_token: String, } #[cfg(any(test, feature = "testing"))] impl Arbitrary for UserIdentity { type Parameters = (); type Strategy = impl Strategy<Value = UserIdentity>; fn arbitrary_with((): Self::Parameters) -> Self::Strategy { use proptest::prelude::*; // This is not randomized right now because there are many constraints on // string fields in UserIdentity. any::<()>().prop_map(|()| UserIdentity::test()) } } impl From<UserIdentity> for pb::convex_identity::UserIdentity { fn from( UserIdentity { subject, issuer, expiration, attributes, original_token, }: UserIdentity, ) -> Self { Self { subject: Some(subject), issuer: Some(issuer), expiration: Some(expiration.into()), attributes: Some(attributes.into()), original_token: Some(original_token), } } } macro_rules! get_string { ($claims: ident, $field: ident) => { $claims.$field().map(|v| v.to_string()) }; } macro_rules! get_localized_string { ($claims: ident, $field: ident) => { $claims .$field() .and_then(|loc| loc.get(None)) .map(|v| v.to_string()) }; } pub type CoreIdTokenWithCustomClaims = IdToken< CustomClaims, CoreGenderClaim, CoreJweContentEncryptionAlgorithm, CoreJwsSigningAlgorithm, >; #[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq, Eq)] pub struct CustomClaims(HashMap<String, serde_json::Value>); impl AdditionalClaims for CustomClaims {} impl UserIdentity { pub fn from_custom_jwt( token: JWT<serde_json::Value, biscuit::Empty>, original_token: String, ) -> Result<Self, anyhow::Error> { let payload = token.payload()?; let subject = payload.registered.subject.as_ref().ok_or_else(|| { ErrorMetadata::unauthenticated("InvalidAuthHeader", "Missing subject") })?; let issuer = payload.registered.issuer.as_ref().ok_or_else(|| { ErrorMetadata::unauthenticated("InvalidAuthHeader", "Missing issuer") })?; let Some(expiry) = payload.registered.expiry else { anyhow::bail!(ErrorMetadata::unauthenticated( "InvalidAuthHeader", "Missing expiry" )); }; let mut custom_claims = BTreeMap::new(); if let serde_json::Value::Object(ref properties) = payload.private { custom_claims = extract_custom_jwt_claims(properties); } Ok(UserIdentity { subject: subject.clone(), issuer: issuer.clone(), expiration: (*expiry).into(), attributes: UserIdentityAttributes { token_identifier: UserIdentifier::construct(issuer, subject), subject: Some(subject.clone()), issuer: Some(issuer.clone()), custom_claims, ..Default::default() }, original_token, }) } pub fn from_token( token: CoreIdTokenWithCustomClaims, verifier: CoreIdTokenVerifier, ) -> Result<Self, anyhow::Error> { // NB: Nonce verification is optional, and we'd need the developer to create and // store a nonce with their initial request to auth0 (or whatever // provider they use) and provide that nonce to us with the ID token. let nonce_verifier = |_: Option<&Nonce>| Ok(()); let binding = token.clone(); let claims = binding.claims(&verifier, nonce_verifier)?; let subject = claims.subject().to_string(); let issuer = claims.issuer().to_string(); let mut custom_claims = BTreeMap::new(); for claim in &claims.additional_claims().0 { // Filter out standard claims and claims set by auth providers match claim.0.as_str() { // Standard claims that we support: see https://docs.convex.dev/api/interfaces/server.UserIdentity "sub" | "iss" | "exp" | "name" | "given_name" | "family_name" | "nickname" | "preferred_username" | "profile" | "picture" | "website" | "email" | "email_verified" | "gender" | "birthdate" | "zoneinfo" | "locale" | "phone_number" | "phone_number_verified" | "address" | "updated_at" // Clerk claims: see https://clerk.com/docs/backend-requests/resources/session-tokens | "jti" | "nbf" => { continue; }, _ => { custom_claims.insert(claim.0.to_string(), claim.1.to_string()); }, } } Ok(UserIdentity { subject: subject.clone(), issuer: issuer.clone(), expiration: claims.expiration().into(), original_token: token.to_string(), attributes: UserIdentityAttributes { token_identifier: UserIdentifier::construct(&issuer, &subject), subject: Some(subject), issuer: Some(issuer), name: get_localized_string!(claims, name), given_name: get_localized_string!(claims, given_name), family_name: get_localized_string!(claims, family_name), nickname: get_localized_string!(claims, nickname), preferred_username: get_string!(claims, preferred_username), profile_url: get_localized_string!(claims, profile), picture_url: get_localized_string!(claims, picture), website_url: get_localized_string!(claims, website), email: get_string!(claims, email), email_verified: claims.email_verified(), gender: get_string!(claims, gender), birthday: get_string!(claims, birthday), timezone: get_string!(claims, zoneinfo), language: get_string!(claims, locale), phone_number: get_string!(claims, phone_number), phone_number_verified: claims.phone_number_verified(), address: claims .address() .and_then(|a| a.formatted.as_ref()) .map(|f| f.to_string()), updated_at: claims.updated_at().map(|dt| dt.to_rfc3339()), custom_claims, }, }) } // Decode an `Identity` serialized to protobuf *without* revalidating its // original token. This method assumes that the protobuf comes from a // trusted source, like an internal backend. pub fn from_proto_unchecked(msg: pb::convex_identity::UserIdentity) -> anyhow::Result<Self> { let subject = msg .subject .ok_or_else(|| anyhow::anyhow!("Missing subject"))?; let issuer = msg .issuer .ok_or_else(|| anyhow::anyhow!("Missing issuer"))?; let expiration = msg .expiration .ok_or_else(|| anyhow::anyhow!("Missing expiration"))? .try_into()?; let attributes = msg .attributes .ok_or_else(|| anyhow::anyhow!("Missing user identity attributes"))? .try_into()?; let original_token = msg .original_token .ok_or_else(|| anyhow::anyhow!("Missing original_token"))? .parse()?; Ok(Self { subject, issuer, expiration, attributes, original_token, }) } pub fn is_expired(&self, current_time: SystemTime) -> bool { current_time >= self.expiration } } fn extract_custom_jwt_claims( payload: &serde_json::Map<String, serde_json::Value>, ) -> BTreeMap<String, String> { let mut result = BTreeMap::new(); for (key, value) in payload { if let serde_json::Value::Object(nested_object) = value { for (nested_key, value) in extract_custom_jwt_claims(nested_object) { result.insert(format!("{key}.{nested_key}"), value); } } else { result.insert(key.clone(), value.to_string()); } } result } #[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Debug)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum AdminIdentityPrincipal { Member(MemberId), Team(TeamId), } // Token indicating the possessor has authenticated as the admin for an // instance. #[derive(Clone, PartialEq, Eq, Hash)] pub struct AdminIdentity { instance_name: String, principal: AdminIdentityPrincipal, key: String, // is_read_only being true implies that this identity should not be able to write data. // At the function level, read only admins are allowed to run queries but not mutations and // actions. At the database level, they are allowed to read data from user and system tables // but not write to them. is_read_only: bool, } impl From<AdminIdentity> for pb::convex_identity::AdminIdentity { fn from( AdminIdentity { instance_name, principal, key, is_read_only, }: AdminIdentity, ) -> Self { Self { instance_name: Some(instance_name), principal: match principal { AdminIdentityPrincipal::Member(member_id) => Some( pb::convex_identity::admin_identity::Principal::MemberId(member_id.0), ), AdminIdentityPrincipal::Team(team_id) => Some( pb::convex_identity::admin_identity::Principal::TeamId(team_id.0), ), }, key: Some(key), is_read_only, } } } impl AdminIdentity { pub fn from_proto_unchecked(msg: pb::convex_identity::AdminIdentity) -> anyhow::Result<Self> { let instance_name = msg .instance_name .ok_or_else(|| anyhow::anyhow!("Missing instance_name"))?; let principal = match msg.principal { Some(pb::convex_identity::admin_identity::Principal::MemberId(id)) => { AdminIdentityPrincipal::Member(id.into()) }, Some(pb::convex_identity::admin_identity::Principal::TeamId(id)) => { AdminIdentityPrincipal::Team(id.into()) }, None => anyhow::bail!("Missing principal"), }; let key = msg.key.ok_or_else(|| anyhow::anyhow!("Missing key"))?; let is_read_only: bool = msg.is_read_only; Ok(Self { instance_name, principal, key, is_read_only, }) } pub fn new_for_access_token( instance_name: String, principal: AdminIdentityPrincipal, access_token: String, is_read_only: bool, ) -> Self { Self { instance_name, principal, key: access_token, is_read_only, } } pub fn principal(&self) -> &AdminIdentityPrincipal { &self.principal } // is_read_only being true implies that this identity should not be able to // write data. At the function level, read only admins are allowed to run // queries but not mutations and actions. At the database level, they are // allowed to read data from user and system tables but not write to them. pub fn is_read_only(&self) -> bool { self.is_read_only } } #[cfg(any(test, feature = "testing"))] impl Arbitrary for AdminIdentity { type Parameters = (); type Strategy = impl proptest::strategy::Strategy<Value = AdminIdentity>; fn arbitrary_with((): Self::Parameters) -> Self::Strategy { use proptest::prelude::*; any::<(AdminIdentityPrincipal, String)>().prop_map(|(principal, key)| AdminIdentity { instance_name: "fake-instance-name".to_string(), principal, key, is_read_only: false, }) } } #[cfg(any(test, feature = "testing"))] impl AdminIdentity { pub fn new_for_test_only(instance_name: String, member_id: MemberId) -> AdminIdentity { AdminIdentity { instance_name, principal: AdminIdentityPrincipal::Member(member_id), key: "chocolate-charlies-cupcake".to_string(), is_read_only: false, } } pub fn instance_name(&self) -> &str { &self.instance_name } } impl fmt::Debug for AdminIdentity { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "{}/{:?}/{}", self.instance_name, self.principal, self.key ) } } #[derive(Clone, PartialEq, Eq, Hash)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct SystemIdentity; impl fmt::Debug for SystemIdentity { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "SystemIdentity") } } /// Encrypted authorization to store a file #[derive(Debug, derive_more::Display)] pub struct StoreFileAuthorization(String); /// Encrypted authorization to get a file #[derive(Debug, derive_more::Display)] pub struct GetFileAuthorization(String); pub fn cursor_parse_error() -> ErrorMetadata { ErrorMetadata::bad_request("InvalidCursor", "Failed to parse cursor") } impl KeyBroker { pub fn new(instance_name: &str, instance_secret: InstanceSecret) -> anyhow::Result<Self> { Ok(Self { instance_name: instance_name.to_owned(), encryptor: LegacyEncryptor::new(instance_secret)?, admin_key_encryptor: RandomEncryptor::derive_from_secret( &instance_secret, Purpose::ADMIN_KEY, )?, action_callback_encryptor: RandomEncryptor::derive_from_secret( &instance_secret, Purpose::ACTION_CALLBACK_TOKEN, )?, cursor_encryptor: DeterministicEncryptor::derive_from_secret( &instance_secret, Purpose::CURSOR, )?, journal_encryptor: RandomEncryptor::derive_from_secret( &instance_secret, Purpose::QUERY_JOURNAL, )?, store_file_encryptor: RandomEncryptor::derive_from_secret( &instance_secret, Purpose::STORE_FILE_AUTHORIZATION, )?, }) } pub fn dev() -> Self { Self::new( crate::DEV_INSTANCE_NAME, InstanceSecret::try_from(crate::DEV_SECRET).unwrap(), ) .unwrap() } pub fn local_dev(instance_name: &str) -> Self { Self::new( instance_name, InstanceSecret::try_from(crate::DEV_SECRET).unwrap(), ) .unwrap() } pub fn issue_admin_key(&self, member_id: MemberId) -> AdminKey { AdminKey::new(self.issue_key(Some(member_id), false)) } pub fn issue_read_only_admin_key(&self, member_id: MemberId) -> AdminKey { AdminKey::new(self.issue_key(Some(member_id), true)) } pub fn issue_system_key(&self) -> SystemKey { SystemKey::new(self.issue_key(None, false)) } pub fn issue_store_file_authorization<RT: Runtime>( &self, rt: &RT, issued: UnixTimestamp, component: ComponentId, ) -> anyhow::Result<StoreFileAuthorization> { let now = rt.unix_timestamp(); if (now - issued) > MAX_TS_DELAY { anyhow::bail!("Could not issue authorization. Issued TS too far in past."); } let component_str = component.serialize_to_string(); Ok(StoreFileAuthorization( self.store_file_encryptor.encrypt_proto( STORE_FILE_AUTHZ_VERSION, &StorageTokenProto { instance_name: self.instance_name.clone(), issued_s: issued.as_secs(), authorization_type: Some(AuthorizationTypeProto::StoreFile(StoreFileProto {})), component_id: component_str, }, ), )) } /// Private helper method to generate an admin key. /// If `member_id` is None, it generates a system key, otherwise /// an admin key for the given user. fn issue_key(&self, member_id: Option<MemberId>, is_read_only: bool) -> String { let now = SystemTime::now(); let since_epoch = now .duration_since(SystemTime::UNIX_EPOCH) .expect("Failed to compute seconds since epoch?"); let identity = match member_id { Some(member_id) => AdminIdentityProto::MemberId(member_id.0), None => AdminIdentityProto::System(()), }; let proto = AdminKeyProto { instance_name: None, issued_s: since_epoch.as_secs(), identity: Some(identity), is_read_only, }; format_admin_key( &self.instance_name, &self .admin_key_encryptor .encrypt_proto(ADMIN_KEY_VERSION, &proto), ) } pub fn is_encrypted_admin_key(&self, key: &str) -> bool { let encrypted_part = split_admin_key(key).map(|(_, key)| key).unwrap_or(key); let admin_key: Result<AdminKeyProto, _> = self .encryptor .decode_proto(ADMIN_KEY_VERSION, encrypted_part) .or_else(|_| { self.admin_key_encryptor .decrypt_proto(ADMIN_KEY_VERSION, encrypted_part) }); admin_key.is_ok() } pub fn check_admin_key(&self, key: &str) -> anyhow::Result<Identity> { let (instance_name, encrypted_part) = split_admin_key(key) .map(|(name, key)| (Some(remove_type_prefix_from_instance_name(name)), key)) .unwrap_or((None, key)); let AdminKeyProto { instance_name: instance_name_from_encrypted_part, issued_s, identity, is_read_only, } = self .encryptor .decode_proto(ADMIN_KEY_VERSION, encrypted_part) .or_else(|_| { self.admin_key_encryptor .decrypt_proto(ADMIN_KEY_VERSION, encrypted_part) }) .with_context(|| format!("Couldn't decode the AdminKeyProto {key}"))?; let instance_name = instance_name .or(instance_name_from_encrypted_part.as_deref()) .context("Invalid admin key format")?; if instance_name != self.instance_name { return Err(anyhow::anyhow!( "Key is for invalid instance {instance_name}", )); } anyhow::ensure!(issued_s != 0, "Proto missing issued_s"); let identity = identity.context("Proto missing identity")?; Ok(match identity { AdminIdentityProto::MemberId(member_id) => Identity::InstanceAdmin(AdminIdentity { instance_name: self.instance_name.clone(), principal: AdminIdentityPrincipal::Member(MemberId(member_id)), key: key.to_string(), is_read_only, }), AdminIdentityProto::System(()) => Identity::system(), }) } pub fn check_store_file_authorization<RT: Runtime>( &self, rt: &RT, store_file_authorization: &str, validity: Duration, ) -> anyhow::Result<ComponentId> { let StorageTokenProto { instance_name, issued_s, authorization_type, component_id, } = self .encryptor .decode_proto(STORE_FILE_AUTHZ_VERSION, store_file_authorization) .or_else(|_| { self.store_file_encryptor .decrypt_proto(STORE_FILE_AUTHZ_VERSION, store_file_authorization) }) .context(ErrorMetadata::unauthenticated( "StorageTokenInvalid", "Couldn't decode the StoreFileAuthorization token", ))?; if instance_name != self.instance_name { anyhow::bail!(ErrorMetadata::unauthenticated( "InvalidStorageToken", "Storage token is for invalid instance {instance_name}" )); } anyhow::ensure!(issued_s != 0, "Proto missing issued_s"); let now = rt.unix_timestamp().as_secs(); if issued_s + validity.as_secs() <= now { log_store_file_auth_expired(); anyhow::bail!(ErrorMetadata::unauthenticated( "StorageTokenExpired", "Store File Authorization expired", )); } let Some(AuthorizationTypeProto::StoreFile(StoreFileProto {})) = authorization_type else { anyhow::bail!(ErrorMetadata::unauthenticated( "InvalidStorageToken", "Storage token is for invalid instance {instance_name}" )); }; let component = ComponentId::deserialize_from_string(component_id.as_deref()).context( ErrorMetadata::unauthenticated("InvalidStorageToken", "Invalid component ID"), )?; Ok(component) } fn cursor_to_proto(&self, cursor: &Cursor) -> InstanceCursorProto { let position = match cursor.position { CursorPosition::End => PositionProto::End(()), CursorPosition::After(ref key) => PositionProto::After(IndexKeyProto { values: key.clone().0, }), }; InstanceCursorProto { instance_name: self.instance_name.clone(), position: Some(position), query_fingerprint: cursor.query_fingerprint.clone(), } } fn proto_to_cursor(&self, proto: InstanceCursorProto) -> anyhow::Result<Cursor> { if proto.instance_name != self.instance_name { anyhow::bail!(ErrorMetadata::bad_request( "InvalidCursor", format!("Key is invalid for instance {:?}", proto.instance_name) )); } let cursor_position = match proto.position { Some(PositionProto::End(())) => CursorPosition::End, Some(PositionProto::After(IndexKeyProto { values: proto_values, })) => CursorPosition::After(IndexKeyBytes(proto_values)), None => anyhow::bail!(ErrorMetadata::bad_request( "InvalidCursor", "Missing position field" )), }; Ok(Cursor { position: cursor_position, query_fingerprint: proto.query_fingerprint, }) } /// Serializes and encrypts the provided Cursor for sending to clients. pub fn encrypt_cursor( &self, cursor: &Cursor, persistence_version: PersistenceVersion, ) -> SerializedCursor { let proto = self.cursor_to_proto(cursor); let cursor_version = persistence_version.index_key_version(CURSOR_VERSION); self.cursor_encryptor.encrypt_proto(cursor_version, &proto) } /// Attempts to decrypt and deserialize the EncryptedCursor. May fail if the /// client is sending up an old version. pub fn decrypt_cursor( &self, cursor: SerializedCursor, persistence_version: PersistenceVersion, ) -> anyhow::Result<Cursor> { let cursor_version = persistence_version.index_key_version(CURSOR_VERSION); let proto: InstanceCursorProto = self .encryptor .decode_proto(cursor_version, &cursor) .or_else(|_| self.cursor_encryptor.decrypt_proto(cursor_version, &cursor)) .with_context(cursor_parse_error)?; self.proto_to_cursor(proto) } pub fn encrypt_query_journal( &self, journal: &QueryJournal, persistence_version: PersistenceVersion, ) -> SerializedQueryJournal { let query_journal_version = persistence_version.index_key_version(QUERY_JOURNAL_VERSION); let cursor = match &journal.end_cursor { Some(cursor) => Some(self.cursor_to_proto(cursor)), None => return None, }; let proto = InstanceQueryJournalProto { end_cursor: cursor }; Some( self.journal_encryptor .encrypt_proto(query_journal_version, &proto), ) } pub fn decrypt_query_journal( &self, journal: SerializedQueryJournal, persistence_version: PersistenceVersion, ) -> anyhow::Result<QueryJournal> { let query_journal_version = persistence_version.index_key_version(QUERY_JOURNAL_VERSION); match journal { None => Ok(QueryJournal::new()), Some(journal) => { let proto: InstanceQueryJournalProto = self .encryptor .decode_proto(query_journal_version, &journal) .or_else(|_| { self.journal_encryptor .decrypt_proto(query_journal_version, &journal) }) .with_context(cursor_parse_error)?; let end_cursor = match proto.end_cursor { Some(cursor) => Some(self.proto_to_cursor(cursor)?), None => None, }; Ok(QueryJournal { end_cursor }) }, } } pub fn issue_action_token(&self, component_id: ComponentId) -> ActionCallbackToken { let now = SystemTime::now(); let since_epoch = now .duration_since(SystemTime::UNIX_EPOCH) .expect("Failed to compute seconds since epoch?"); let proto = ActionCallbackTokenProto { issued_s: since_epoch.as_secs(), component_id: component_id.serialize_to_string(), }; self.action_callback_encryptor .encrypt_proto(ACTION_KEY_VERSION, &proto) } // Checks the action token and returns its issue time. pub fn check_action_token( &self, token: &ActionCallbackToken, validity: Duration, ) -> anyhow::Result<(SystemTime, ComponentId)> { let ActionCallbackTokenProto { issued_s, component_id, } = self .encryptor .decode_proto(ACTION_KEY_VERSION, token) .or_else(|_| { self.action_callback_encryptor .decrypt_proto(ACTION_KEY_VERSION, token) }) .with_context(|| format!("Couldn't decode ActionCallbackTokenProto {token}"))?; anyhow::ensure!(issued_s != 0, "ActionCallbackTokenProto missing issued_s"); let now = SystemTime::now(); let since_epoch = now .duration_since(SystemTime::UNIX_EPOCH) .expect("Failed to compute seconds since epoch?") .as_secs(); if issued_s + validity.as_secs() <= since_epoch { log_actions_token_expired(); // Note we don't use .context(AuthError::TokenExpired) since this is // Convex infrastructure error and should be logged as internal error. return Err(anyhow::anyhow!("Action callback token expired")); } let system_time = SystemTime::UNIX_EPOCH + Duration::from_secs(issued_s); let component_id = ComponentId::deserialize_from_string(component_id.as_deref())?; Ok((system_time, component_id)) } } #[cfg(test)] mod tests { use std::{ str::FromStr, time::{ Duration, SystemTime, }, }; use cmd_util::env::env_config; use common::{ bootstrap_model::index::database_index::IndexedFields, components::ComponentId, index::IndexKey, query::{ Cursor, CursorPosition, Order, Query, }, query_journal::QueryJournal, runtime::Runtime, types::{ MemberId, PersistenceVersion, TableName, }, value::DeveloperDocumentId, }; use pb::convex_keys::{ admin_key::Identity as AdminIdentityProto, AdminKey as AdminKeyProto, }; use pretty_assertions::assert_eq; use proptest::prelude::*; use runtime::testing::TestDriver; use super::{ AdminKey, KeyBroker, ADMIN_KEY_VERSION, }; use crate::{ AdminIdentity, Identity, }; #[test] fn test_admin_keys() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let key = kb.issue_admin_key(MemberId(0)); let admin = kb.check_admin_key(key.as_str()).unwrap(); assert!(admin.is_admin()); assert!(!admin.is_system()); Ok(()) } #[test] fn test_system_keys() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let key = kb.issue_system_key(); let system = kb.check_admin_key(key.as_str())?; assert!(!system.is_admin()); assert!(system.is_system()); Ok(()) } #[test] fn test_admin_keys_with_prefix() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let key = kb.issue_admin_key(MemberId(0)); let prefixed_key = format!("prod:{}", key.as_str()); let admin = kb.check_admin_key(&prefixed_key).unwrap(); assert!(admin.is_admin()); assert!(!admin.is_system()); Ok(()) } fn old_issue_key(kb: &KeyBroker, member_id: Option<MemberId>) -> String { let now = SystemTime::now(); let since_epoch = now .duration_since(SystemTime::UNIX_EPOCH) .expect("Failed to compute seconds since epoch?"); let identity = match member_id { Some(member_id) => AdminIdentityProto::MemberId(member_id.0), None => AdminIdentityProto::System(()), }; let proto = AdminKeyProto { instance_name: Some(kb.instance_name.clone()), issued_s: since_epoch.as_secs(), identity: Some(identity), is_read_only: false, }; kb.encryptor.encode_proto(ADMIN_KEY_VERSION, proto) } #[test] fn test_old_admin_keys() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let key = AdminKey::new(old_issue_key(&kb, Some(MemberId(0)))); kb.check_admin_key(key.as_str()).unwrap(); Ok(()) } #[test] fn test_store_file_authorization() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let td = TestDriver::new(); let now = td.rt().unix_timestamp(); let key = kb.issue_store_file_authorization(&td.rt(), now, ComponentId::test_user())?; let component = kb.check_store_file_authorization(&td.rt(), &key.to_string(), Duration::from_secs(60))?; assert_eq!(component, ComponentId::test_user()); Ok(()) } #[test] fn test_cant_issue_backwards_timestamps() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let td = TestDriver::new(); let hour_ago = td.rt().unix_timestamp() - Duration::from_secs(3600); kb.issue_store_file_authorization(&td.rt(), hour_ago, ComponentId::test_user()) .unwrap_err(); Ok(()) } #[test] fn test_cursors() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let cursor = Cursor { position: CursorPosition::End, query_fingerprint: vec![], }; let encrypted = kb.encrypt_cursor(&cursor, PersistenceVersion::default()); let echoed = kb.decrypt_cursor(encrypted, PersistenceVersion::default())?; assert_eq!(cursor, echoed); // Add this back if there's a PersistenceVersion that changes cursors // let encrypted_old_version = kb.encrypt_cursor(&cursor, // PersistenceVersion::V5); let result = kb // .decrypt_cursor(encrypted_old_version, PersistenceVersion::V5) // .unwrap_err(); // assert!(result.is::<InvalidCursor>()); Ok(()) } #[test] fn test_query_journal_size() -> anyhow::Result<()> { // Query journals are synced to the client along with every query // result. This test ensures they stay reasonably small. // Feel free to bump this values by a little bit, but rethink changes // that would increase them by a lot. let kb = KeyBroker::dev(); // Empty journal with no data. Serializes as None/null. let empty_journal = QueryJournal::new(); let serialized_empty_journal = kb.encrypt_query_journal(&empty_journal, PersistenceVersion::default()); assert_eq!(serialized_empty_journal, None); // Realistic journal with the end cursor from a paginated query. let query = Query::full_table_scan(TableName::from_str("documents")?, Order::Asc); let mut journal_with_cursor = QueryJournal::new(); journal_with_cursor.end_cursor = Some(Cursor { position: CursorPosition::After( IndexKey::new(vec![100.into()], DeveloperDocumentId::MIN).to_bytes(), ), query_fingerprint: query.fingerprint(&IndexedFields::creation_time())?, }); let serialized_journal_with_cursor = kb.encrypt_query_journal(&journal_with_cursor, PersistenceVersion::default()); assert_eq!(serialized_journal_with_cursor.unwrap().len(), 228); Ok(()) } #[test] fn test_action_token() -> anyhow::Result<()> { let kb = KeyBroker::dev(); let before_issue = SystemTime::now(); let token = kb.issue_action_token(ComponentId::test_user()); let after_issue = SystemTime::now(); // Should be valid if checked with validity of 1 minute. let (issue_time, component_id) = kb.check_action_token(&token, Duration::from_secs(60))?; // Note we round down the issue time to nearest second. assert!(issue_time > before_issue - Duration::from_secs(1)); assert!(issue_time < after_issue); assert_eq!(component_id, ComponentId::test_user()); // Should be invalid if checked with validity of 0. let err = kb .check_action_token(&token, Duration::from_secs(0)) .unwrap_err(); assert!(format!("{err}").contains("Action callback token expired")); // Try with completely invalid token. let err = kb .check_action_token(&"invalid-token".to_owned(), Duration::from_secs(60)) .unwrap_err(); assert!(format!("{err}").contains("Couldn't decode ActionCallbackTokenProto")); Ok(()) } proptest! { #![proptest_config(ProptestConfig { cases: 64 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, .. ProptestConfig::default() })] #[test] fn test_cursor_roundtrips(cursor in any::<Cursor>()) { let kb = KeyBroker::dev(); let encrypted = kb.encrypt_cursor(&cursor, PersistenceVersion::default()); let decrypted = kb.decrypt_cursor(encrypted, PersistenceVersion::default()).unwrap(); assert_eq!(cursor, decrypted); } #[test] fn test_query_journal_roundtrips(journal in any::<QueryJournal>()) { let kb = KeyBroker::dev(); let encrypted = kb.encrypt_query_journal(&journal, PersistenceVersion::default()); let decrypted = kb.decrypt_query_journal( encrypted, PersistenceVersion::default(), ).unwrap(); assert_eq!(journal, decrypted); } #[test] fn test_identity_proto_roundtrips(identity in any::<Identity>()) { let proto: pb::convex_identity::UncheckedIdentity = identity.clone().into(); let roundtripped = Identity::from_proto_unchecked(proto).unwrap(); assert_eq!(identity, roundtripped); } #[test] fn test_admin_identity_proto_roundtrips(admin_identity in any::<AdminIdentity>()) { let proto: pb::convex_identity::AdminIdentity = admin_identity .clone() .into(); let roundtripped = AdminIdentity::from_proto_unchecked(proto).unwrap(); assert_eq!(admin_identity, roundtripped); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server