Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs5.26 kB
use std::sync::LazyLock; use common::{ document::{ ParseDocument, ParsedDocument, }, query::{ IndexRange, IndexRangeExpression, Order, Query, }, runtime::Runtime, types::WriteTimestamp, }; use database::{ unauthorized_error, ResolvedQuery, SystemMetadataModel, Transaction, }; use keybroker::Identity; use sync_types::Timestamp; use value::{ ConvexValue, FieldPath, TableName, TableNamespace, }; pub mod types; use types::{ SessionRequestIdentifier, SessionRequestOutcome, SessionRequestRecord, }; use crate::{ SystemIndex, SystemTable, }; /// Table name for the sync protocol requests. /// This is used to make session requests idempotent pub static SESSION_REQUESTS_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_session_requests" .parse() .expect("Invalid built-in session metadata table") }); static SESSION_ID_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "sessionId".parse().expect("Invalid built-in field")); static REQUEST_ID_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "requestId".parse().expect("Invalid built-in field")); pub static SESSION_REQUESTS_INDEX: LazyLock<SystemIndex<SessionRequestsTable>> = LazyLock::new(|| { SystemIndex::new( "by_session_id_and_request_id", [&SESSION_ID_FIELD, &REQUEST_ID_FIELD], ) .unwrap() }); pub struct SessionRequestsTable; impl SystemTable for SessionRequestsTable { type Metadata = SessionRequestRecord; fn table_name() -> &'static TableName { &SESSION_REQUESTS_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![SESSION_REQUESTS_INDEX.clone()] } } pub struct SessionRequestModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, } impl<'a, RT: Runtime> SessionRequestModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>) -> Self { Self { tx } } pub async fn get_session_request_record( &mut self, request_identifier: &SessionRequestIdentifier, identity: Identity, ) -> anyhow::Result<Option<(Timestamp, SessionRequestOutcome)>> { // We only expect this function to be called by the framework as part // of a mutation UDF. We require passing in a system identity to confirm // that the caller isn't letting a user call this directly. if !identity.is_system() { anyhow::bail!(unauthorized_error("get_session_request_record")) } // Query whether this request has been seen by the system already. It's // important we scan over the session request index and include it // in our read set so we can ensure the request happens exactly once. let index_range_query = IndexRange { index_name: SESSION_REQUESTS_INDEX.name(), range: vec![ IndexRangeExpression::Eq( SESSION_ID_FIELD.clone(), ConvexValue::try_from(request_identifier.session_id.to_string())?.into(), ), IndexRangeExpression::Eq( REQUEST_ID_FIELD.clone(), ConvexValue::from(request_identifier.request_id as i64).into(), ), ], order: Order::Asc, }; let query = Query::index_range(index_range_query); let (doc, ts): (ParsedDocument<SessionRequestRecord>, Timestamp) = { // Get the timestamp of when the record was committed. // document by ID. // Note that this does not check the // transaction's outstanding writes. This is ok since we know we didn't // commit in the current transaction. let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, query)?; let Some((doc, ts)) = query_stream.next_with_ts(self.tx, None).await? else { return Ok(None); }; anyhow::ensure!( query_stream.next(self.tx, Some(1)).await?.is_none(), "Expected at most one session request record." ); let WriteTimestamp::Committed(ts) = ts else { anyhow::bail!( "Wrote a session request record in the same transaction as the get? Not \ supported." ); }; (doc.parse()?, ts) }; let outcome = doc.into_value().outcome; Ok(Some((ts, outcome))) } pub async fn record_session_request( &mut self, record: SessionRequestRecord, identity: Identity, ) -> anyhow::Result<()> { // We only expect this function to be called by the framework as part // of executing a sync protocol session UDF. We require passing in a system // identity to confirm that the caller isn't letting a user call this // directly if !identity.is_system() { anyhow::bail!(unauthorized_error("record_session_request")) } SystemMetadataModel::new_global(self.tx) .insert_metadata(&SESSION_REQUESTS_TABLE, record.try_into()?) .await?; Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server