Skip to main content
Glama

Convex MCP server

Official
by get-convex
main.rs6.67 kB
#![feature(let_chains)] use std::time::Duration; use clap::Parser; use cmd_util::env::config_service; use common::{ errors::MainError, http::ConvexHttpService, runtime::Runtime, shutdown::ShutdownSignal, version::SERVER_VERSION_STR, }; use db_connection::{ connect_persistence, ConnectPersistenceFlags, }; use futures::{ future::{ self, Either, }, FutureExt, }; use local_backend::{ config::LocalConfig, make_app, proxy::dev_site_proxy, router::router, HttpActionRouteMapper, MAX_CONCURRENT_REQUESTS, }; use runtime::prod::ProdRuntime; use tokio::{ signal::{ self, }, sync::oneshot, }; fn main() -> Result<(), MainError> { let _guard = config_service(); let config = LocalConfig::parse(); tracing::info!("Starting a Convex backend"); if !config.disable_beacon { tracing::info!( "The self-host Convex backend will periodically communicate with a remote beacon \ server. This is to help Convex understand and improve the product. You can disable \ this telemetry by setting the --disable-beacon flag or the DISABLE_BEACON \ environment variable." ); } let sentry = sentry::init(sentry::ClientOptions { release: Some(format!("local-backend@{}", *SERVER_VERSION_STR).into()), ..Default::default() }); if sentry.is_enabled() { tracing::info!( "Sentry is enabled. Errors will be reported to project with ID {}", sentry .dsn() .map(|dsn| dsn.project_id().to_string()) .unwrap_or("unknown".to_string()) ); if let Some(sentry_identifier) = config.sentry_identifier.clone() { sentry::configure_scope(|scope| { scope.set_user(Some(sentry::User { id: Some(sentry_identifier), ..Default::default() })); }); } } else { tracing::info!("Sentry is not enabled.") } let tokio = ProdRuntime::init_tokio()?; let runtime = ProdRuntime::new(&tokio); let runtime_ = runtime.clone(); let server_future = async { run_server(runtime_, config).await?; Ok(()) }; runtime.block_on("main", server_future) } async fn run_server(runtime: ProdRuntime, config: LocalConfig) -> anyhow::Result<()> { let serve_future = async move { run_server_inner(runtime, config).await }.fuse(); futures::pin_mut!(serve_future); futures::select! { r = serve_future => { r?; tracing::info!("Done") }, }; Ok(()) } async fn run_server_inner(runtime: ProdRuntime, config: LocalConfig) -> anyhow::Result<()> { // Used to receive fatal errors from the database or /preempt endpoint. let (preempt_tx, preempt_rx) = oneshot::channel(); let preempt_signal = ShutdownSignal::new(preempt_tx); // Use to signal to the http service to stop. let (shutdown_tx, shutdown_rx) = async_broadcast::broadcast(1); let persistence = connect_persistence( config.db, &config.db_spec, ConnectPersistenceFlags { require_ssl: !config.do_not_require_ssl, allow_read_only: false, skip_index_creation: false, }, &config.name(), runtime.clone(), preempt_signal.clone(), ) .await?; let st = make_app( runtime.clone(), config.clone(), persistence, shutdown_rx.clone(), preempt_signal.clone(), ) .await?; let router = router(st.clone()); let mut shutdown_rx_ = shutdown_rx.clone(); let http_service = ConvexHttpService::new( router, "backend", SERVER_VERSION_STR.to_string(), MAX_CONCURRENT_REQUESTS, Duration::from_secs(125), HttpActionRouteMapper, ); let serve_http_future = http_service.serve(config.http_bind_address().into(), async move { let _ = shutdown_rx_.recv().await; }); let proxy_future = dev_site_proxy( config.site_bind_address(), config.site_forward_prefix(), shutdown_rx, ); let serve_future = future::try_join(serve_http_future, proxy_future).fuse(); futures::pin_mut!(serve_future); // Start shutdown when we get a manual shutdown signal or with the first // ctrl-c. let mut force_exit_duration = None; futures::select! { r = serve_future => { r?; panic!("Serve future stopped unexpectedly!") }, _err = preempt_rx.fuse() => { // If we fail with a fatal error, we want to exit immediately. tracing::info!("Received a fatal error. Shutting down immediately"); force_exit_duration = Some(Duration::from_secs(0)); let _: Result<_, _> = shutdown_tx.broadcast(()).await; } r = signal::ctrl_c().fuse() => { tracing::info!("Received Ctrl-C signal!"); r?; let _: Result<_, _> = shutdown_tx.broadcast(()).await; }, } let shutdown = async move { // First, drain all in-progress requests; tracing::info!("Shutdown initiated, draining existing requests..."); serve_future.await?; // Next, shutdown all of our asynchronous workers. tracing::info!("Shutting down application..."); st.shutdown().await?; Ok::<_, anyhow::Error>(()) } .fuse(); futures::pin_mut!(shutdown); let mut force_exit_future = match force_exit_duration { Some(force_exit_duration) => Either::Left(runtime.wait(force_exit_duration)), None => Either::Right(std::future::pending()), } .fuse(); loop { futures::select! { r = shutdown => { r?; tracing::info!("Server successfully shut down."); // If we are not preempted we exit as soon as the requests are // drained. Otherwise, we have to wait for the cool down. if force_exit_duration.is_none() { break; } }, // Forcibly shutdown when the cool down expires _ = force_exit_future => { tracing::info!("Cool down expired. Shutting down"); break; } // Forcibly shutdown with second ctrl-c. r = signal::ctrl_c().fuse() => { r?; tracing::warn!("Forcibly shutting down!"); break; }, } } Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server