Skip to main content
Glama

Convex MCP server

Official
by get-convex
convex_api.rs7.08 kB
use std::{ collections::HashMap, fmt::Display, sync::LazyLock, }; use anyhow::Context; use async_trait::async_trait; use chrono::{ DateTime, Utc, }; use common::{ json::JsonForm as _, schemas::{ DatabaseSchema, TableDefinition, }, value::TableName, }; use convex_fivetran_common::config::Config; use convex_fivetran_destination::api_types::{ BatchWriteRow, CreateTableArgs, DeleteType, TruncateTableArgs, }; use serde::{ de::DeserializeOwned, Serialize, }; use serde_json::Value as JsonValue; use tonic::codegen::http::{ HeaderName, HeaderValue, }; #[allow(clippy::declare_interior_mutable_const)] const CONVEX_CLIENT_HEADER: HeaderName = HeaderName::from_static("convex-client"); static CONVEX_CLIENT_HEADER_VALUE: LazyLock<HeaderValue> = LazyLock::new(|| { let destination_version = env!("CARGO_PKG_VERSION"); HeaderValue::from_str(&format!("fivetran-import-{destination_version}")).unwrap() }); /// The APIs exposed by a Convex backend for streaming export. #[async_trait] pub trait Destination: Display + Send { /// An endpoint that confirms the Convex backend is accessible with /// streaming import enabled async fn test_streaming_import_connection(&self) -> anyhow::Result<()>; async fn get_schema(&self) -> anyhow::Result<Option<DatabaseSchema>>; async fn create_table(&self, table_definition: TableDefinition) -> anyhow::Result<()>; async fn truncate_table( &self, table_name: TableName, delete_type: DeleteType, delete_before: Option<DateTime<Utc>>, ) -> anyhow::Result<()>; async fn batch_write(&self, rows: Vec<BatchWriteRow>) -> anyhow::Result<()>; } /// Implementation of [`Destination`] accessing a real Convex deployment over /// HTTP. pub struct ConvexApi { pub config: Config, } impl ConvexApi { /// Performs a GET HTTP request to a given endpoint of the Convex API using /// the given query parameters. async fn get<T: DeserializeOwned>( &self, endpoint: &str, parameters: HashMap<&str, Option<String>>, ) -> anyhow::Result<T> { let non_null_parameters: HashMap<&str, String> = parameters .into_iter() .filter_map(|(key, value)| value.map(|value| (key, value))) .collect(); let mut url = self.config.deploy_url.join(endpoint).unwrap(); url.query_pairs_mut().extend_pairs(non_null_parameters); match reqwest::Client::new() .get(url) .header(CONVEX_CLIENT_HEADER, &*CONVEX_CLIENT_HEADER_VALUE) .header( reqwest::header::AUTHORIZATION, format!("Convex {}", self.config.deploy_key), ) .send() .await { Ok(resp) if resp.status().is_success() => Ok(resp .json::<T>() .await .context("Failed to deserialize query result")?), Ok(resp) => { let status = resp.status().as_str().to_string(); if let Ok(text) = resp.text().await { anyhow::bail!( "Call to {endpoint} on {} returned an unsuccessful response ({status}): \ {text}", self.config.deploy_url ) } else { anyhow::bail!( "Call to {endpoint} on {} returned an unsuccessful response with no \ content ({status})", self.config.deploy_url ) } }, Err(e) => anyhow::bail!(e.to_string()), } } /// Performs a POST HTTP request to a given endpoint of the Convex API /// using the given query parameters. async fn post<T: Serialize>(&self, endpoint: &str, args: T) -> anyhow::Result<()> { let url = self .config .deploy_url .join("api/") .unwrap() .join(endpoint) .unwrap(); match reqwest::Client::new() .post(url) .json(&args) .header(CONVEX_CLIENT_HEADER, &*CONVEX_CLIENT_HEADER_VALUE) .header( reqwest::header::AUTHORIZATION, format!("Convex {}", self.config.deploy_key), ) .send() .await { Ok(resp) if resp.status().is_success() => Ok(()), Ok(resp) => { let status = resp.status().as_str().to_string(); if let Ok(text) = resp.text().await { anyhow::bail!( "Call to {endpoint} on {} returned an unsuccessful response ({status}): \ {text}", self.config.deploy_url ) } else { anyhow::bail!( "Call to {endpoint} on {} returned an unsuccessful response with no \ content ({status})", self.config.deploy_url ) } }, Err(e) => anyhow::bail!(e.to_string()), } } } #[async_trait] impl Destination for ConvexApi { async fn test_streaming_import_connection(&self) -> anyhow::Result<()> { self.get_schema().await?; Ok(()) } async fn get_schema(&self) -> anyhow::Result<Option<DatabaseSchema>> { let value: JsonValue = self .get("/api/streaming_import/get_schema", HashMap::default()) .await?; if value == JsonValue::Null { return Ok(None); }; let schema = DatabaseSchema::json_deserialize_value(value) .context("Can’t deserialize the retrived schema")?; Ok(Some(schema)) } async fn create_table(&self, table_definition: TableDefinition) -> anyhow::Result<()> { self.post( "/api/streaming_import/fivetran_create_table", CreateTableArgs { table_definition: table_definition.try_into()?, }, ) .await?; Ok(()) } async fn truncate_table( &self, table_name: TableName, delete_type: DeleteType, delete_before: Option<DateTime<Utc>>, ) -> anyhow::Result<()> { self.post( "/api/streaming_import/fivetran_truncate_table", TruncateTableArgs { table_name: table_name.to_string(), delete_type, delete_before, }, ) .await?; Ok(()) } async fn batch_write(&self, rows: Vec<BatchWriteRow>) -> anyhow::Result<()> { self.post("/api/streaming_import/apply_fivetran_operations", rows) .await?; Ok(()) } } impl Display for ConvexApi { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(self.config.deploy_url.as_ref()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server