Skip to main content
Glama

Convex MCP server

Official
by get-convex
main.rs22.7 kB
#![feature(exit_status_error)] #![feature(let_chains)] #![feature(try_blocks)] #![feature(error_iter)] use std::{ io::ErrorKind, net::SocketAddr, path::{ Path, PathBuf, }, sync::{ Arc, LazyLock, }, time::Duration, }; use ::metrics::{ StaticMetricLabel, SERVER_VERSION_STR, }; use anyhow::Context; use axum::{ extract::{ ws::{ Message, WebSocket, }, State, WebSocketUpgrade, }, response::IntoResponse, routing::get, Router, }; use backend_harness::{ with_provision, BackendProvisioner, ProvisionHostCredentials, ProvisionRequest, }; use clap::Parser; use cmd_util::env::config_service; use common::{ errors::{ report_error, MainError, }, http::{ ConvexHttpService, HttpResponseError, NoopRouteMapper, }, }; use event_receiver::Event; use futures::{ stream::SplitSink, FutureExt, SinkExt, StreamExt, }; use health_check::wait_for_http_health; use log_interleaver::LogInterleaver; use performance_stats::exporter::register_prometheus_exporter; use runtime::prod::ProdRuntime; use serde::{ Deserialize, Serialize, }; use strum::Display; use tokio::{ process::Command, sync::mpsc, time::sleep, }; use crate::setup::setup; mod event_receiver; mod metrics; mod setup; mod stats; #[cfg(test)] mod tests; use crate::{ event_receiver::EventProcessor, stats::Stats, }; static SCENARIO_RUNNER_PATH: LazyLock<&'static Path> = LazyLock::new(|| Path::new("npm-packages/scenario-runner")); static MAX_JITTER_SECONDS: u64 = 30; fn default_num_rows() -> u64 { 500 } #[derive(Clone, Debug, Deserialize)] struct Workload { name: String, scenarios: Vec<ScenarioConfig>, /// Number of vector rows for the initial setup mutation #[serde(default)] num_vector_rows: u64, /// Number of messages rows for the initial setup mutation #[serde(default = "default_num_rows")] num_rows: u64, } #[derive(Clone, Debug, Deserialize, Serialize)] struct ScenarioConfig { #[serde(flatten)] scenario: Scenario, #[serde(flatten)] mode: Mode, } #[derive(Debug, Clone, Deserialize, Serialize, Display)] #[serde(rename_all = "snake_case")] enum FunctionType { Query, Mutation, Action, } #[derive(Deserialize, Serialize, Debug, Clone, Display)] #[serde(tag = "name")] enum Scenario { RunFunction { path: String, fn_type: FunctionType }, ObserveInsert { search_indexes: bool }, Search, VectorSearch, SnapshotExport, ManyIntersections { num_subscriptions: i32 }, CloudBackup, RunHttpAction { path: String, method: String }, } impl Scenario { fn includes_action(&self) -> bool { match self { Scenario::RunFunction { fn_type: FunctionType::Action, .. } => true, Scenario::RunFunction { .. } | Scenario::ObserveInsert { .. } | Scenario::Search | Scenario::VectorSearch | Scenario::SnapshotExport | Scenario::CloudBackup | Scenario::RunHttpAction { .. } => false, Scenario::ManyIntersections { .. } => false, } } fn path(&self) -> Option<String> { match self { Scenario::RunFunction { path, .. } => Some(path.clone()), _ => None, } } } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] enum Mode { /// Number requests to send per second Rate(f64), /// Number of threads to run in benchmark mode. Each thread will run /// requests serially, waiting for a response before sending the next /// request. Benchmark(u32), } fn parse_workload_config(path: &str) -> anyhow::Result<Workload> { let s = std::fs::read_to_string(path) .with_context(|| format!("Failed to read workload config file at {path}"))?; let workload = serde_json::from_str(&s)?; Ok(workload) } #[derive(Parser, Debug)] #[clap(group(clap::ArgGroup::new("provision").multiple(false)))] pub struct Config { /// Host interface to bind to #[clap(short, long, default_value = "0.0.0.0")] pub interface: ::std::net::Ipv4Addr, /// Host port HTTP server should use #[clap(short, long, default_value = "8010")] pub port: u16, /// address:port that metrics will be served on /// (0.0.0.0:9100 is usually what you want in production) #[clap(short, long)] pub metrics_addr: Option<SocketAddr>, /// Seconds LoadGenerator should run for #[clap(long)] duration: u64, /// Path to workload config #[clap(value_parser = parse_workload_config)] workload: Workload, /// Where to provision a backend from for load generation #[clap( long, value_enum, group = "provision", required_unless_present = "existing_instance_url" )] provisioner: Option<BackendProvisioner>, /// Print out a stats report after duration has passed #[clap(long)] stats_report: bool, /// If set, skip rebuilding artifacts (scenario-runner) #[clap(long)] skip_build: bool, #[clap(long)] once: bool, #[clap(long)] existing_project_slug: Option<String>, #[clap(long, group = "provision", requires = "existing_instance_admin_key")] existing_instance_url: Option<String>, #[clap(long)] existing_instance_admin_key: Option<String>, // This is a somewhat hacky flag that allows us to skip creating actions. // We need it to avoid creating too many AWS lambda on production, before // having a way to vacuum them. #[clap(long)] skip_actions_deploy: bool, /// Preview deployments -- We'll create `num_preview_deployments` and push /// them `num_preview_deployment_pushes` times in a loop. #[clap(long)] use_preview_deployments: bool, #[clap(long, requires = "use_preview_deployments")] num_preview_deployments: Option<u64>, #[clap(long, requires = "use_preview_deployments")] num_preview_deployment_pushes: Option<u64>, #[clap(long)] use_usher_test_url: bool, } impl Config { fn load_description(&self) -> String { format!("{}_{}s", self.workload.name, self.duration) } } async fn run(config: &Config) -> anyhow::Result<()> { let logs = LogInterleaver::new(); let load_description = config.load_description(); let load_description_label = StaticMetricLabel::new("load_description", load_description); tracing::info!("provisioning with {:?}", config.provisioner); // HACK: Allow running load generator from any directory in our repo. if let Ok(current_workspace) = std::env::var("CARGO_WORKSPACE_ROOT") { std::env::set_current_dir(current_workspace) .context("Failed to set current dir to repo root")?; } if !config.skip_build { tracing::info!("building scenario-runner"); logs.spawn_with_prefixed_logs( "rush build".into(), Command::new("just") .arg("rush") .arg("build") .arg("-t") .arg("scenario-runner") .current_dir("npm-packages"), ) .context("Couldn't spawn rush build in npm-packages/. Run from repo root?")? .wait() .await? .exit_ok()?; } // The --skip-actions-deploy flag avoids deploying node actions. // Since we can't tell them apart, we ensure that no scenarios include any // actions, even non-node actions. if config.skip_actions_deploy { anyhow::ensure!( config .workload .scenarios .iter() .all(|scenario_config| !scenario_config.scenario.includes_action()), "Can't skip actions deploy and perform actions at the same time!" ); tracing::info!("Deleting the convex/actions folder"); if let Err(e) = tokio::fs::remove_dir_all(SCENARIO_RUNNER_PATH.join("convex/actions")).await { if e.kind() != ErrorKind::NotFound { return Err(e.into()); } }; }; loop { if let Some(ref backend_url) = config.existing_instance_url && let Some(ref admin_key) = config.existing_instance_admin_key { run_workload( None, backend_url.clone(), admin_key.clone(), logs.clone(), config, None, ) .await?; } else { let backend_provisioner = config .provisioner .expect("Required argument if backend url wasn't present above"); if backend_provisioner == BackendProvisioner::Production { // Add jitter for provisioning to avoid claiming the same instance when load // generator restarts let jitter = rand::random::<f32>(); let sleep_dur = Duration::from_secs(MAX_JITTER_SECONDS).mul_f32(jitter); tracing::info!("Sleeping for jitter {sleep_dur:?}"); sleep(sleep_dur).await; } if config.use_preview_deployments { run_preview_deployment_workload(config, &logs, load_description_label.clone()) .await? } else { let provision_request = match &config.existing_project_slug { Some(project_slug) => ProvisionRequest::ExistingProject { project_slug: project_slug.to_string(), }, None => ProvisionRequest::NewProject, }; let provision_host_credentials = backend_provisioner.provision_host_credentials(); with_provision( &logs, backend_provisioner, &provision_request, &SCENARIO_RUNNER_PATH, load_description_label.clone(), |mut backend_url, admin_key, local_log_sink| { if config.use_usher_test_url { backend_url = backend_url.replace("convex.cloud", "test.convex.cloud"); } run_workload( provision_host_credentials, backend_url, admin_key, logs.clone(), config, local_log_sink, ) }, ) .await?; }; } if config.once { break; } } Ok(()) } async fn run_preview_deployment_workload( config: &Config, logs: &LogInterleaver, metric_label: StaticMetricLabel, ) -> anyhow::Result<()> { let backend_provisioner = config .provisioner .expect("Required argument if backend url wasn't present above"); let provision_request = match &config.existing_project_slug { Some(project_slug) => ProvisionRequest::ExistingProject { project_slug: project_slug.clone(), }, None => ProvisionRequest::NewProject, }; let provision_host_credentials = backend_provisioner.provision_host_credentials(); with_provision( logs, backend_provisioner, &provision_request, &SCENARIO_RUNNER_PATH, metric_label.clone(), |_a, _b, _c| async move { for _ in 0..config .num_preview_deployment_pushes .expect("Required argument if use_preview_deployments set") { for i in 0..config .num_preview_deployments .expect("Required argument if use_preview_deployments set") { let identifier = format!("test-preview-{i}"); let provision_request = ProvisionRequest::Preview { identifier: identifier.clone(), }; with_provision( logs, backend_provisioner, &provision_request, &SCENARIO_RUNNER_PATH, metric_label.clone(), |mut backend_url, admin_key, local_log_sink| { if config.use_usher_test_url { backend_url = backend_url.replace("convex.cloud", "test.convex.cloud"); } run_workload( provision_host_credentials.clone(), backend_url, admin_key, logs.clone(), config, local_log_sink, ) }, ) .await?; } } Ok(()) }, ) .await?; Ok(()) } fn main() -> Result<(), MainError> { config_service(); tracing::info!("starting up"); let sentry = sentry::init(sentry::ClientOptions { release: Some(format!("load-generator@{}", *SERVER_VERSION_STR).into()), ..Default::default() }); if sentry.is_enabled() { tracing::info!("Sentry is enabled! Check the load-generator project for errors: https://sentry.io/organizations/convex-dev/projects/load-generator/?project=6505624"); } else { tracing::info!("Sentry is not enabled.") } let config = Config::parse(); let tokio = ProdRuntime::init_tokio()?; let runtime = ProdRuntime::new(&tokio); let maybe_metrics = config .metrics_addr .map(|addr| register_prometheus_exporter(runtime.clone(), addr)); let load_generator = async move { run(&config).await?; if let Some((handle, flush)) = maybe_metrics { flush().await; handle.shutdown_and_join().await?; } Ok::<_, MainError>(()) }; runtime.block_on("load_generator", load_generator)?; Ok(()) } async fn run_workload( provision_host_credentials: Option<ProvisionHostCredentials>, backend_url: String, admin_key: String, logs: LogInterleaver, config: &Config, local_log_sink: Option<PathBuf>, ) -> anyhow::Result<()> { // Get the backend version - only retry twice since the instance should be up by // this point. let backend_version = wait_for_http_health( &backend_url.parse()?, None, None, 2, Duration::from_millis(250), ) .await?; let backend_version_label = StaticMetricLabel::new("backend_version", backend_version); setup( &backend_url, config.workload.num_rows, config.workload.num_vector_rows, ) .await?; #[derive(Serialize)] struct Scenarios { scenarios: Vec<ScenarioMessage>, } // Must directly spawn node. Spawning with `npm start` means that the // subprocess cannot be killed directly from the parent due to // https://github.com/npm/npm/issues/4603 // // There might be a workaround, but I haven't figured it out. tracing::info!("spawning a scenario-runner"); let scenarios = Scenarios { scenarios: config .workload .scenarios .clone() .into_iter() .map(ScenarioMessage::from) .collect(), }; let mut scenario_runner_cmd = Command::new("node"); let mut cmd = scenario_runner_cmd .arg("--enable-source-maps") .arg("dist/scenario-runner.js") .arg("--deployment-url") .arg(backend_url) .arg("--admin-key") .arg(admin_key) .arg("--load-generator-port") .arg(config.port.to_string()) .arg("--scenarios") .arg(serde_json::to_string(&scenarios).context("Failed to serialize scenarios")?) .current_dir(*SCENARIO_RUNNER_PATH) .kill_on_drop(true); if let Some(ProvisionHostCredentials { provision_host, access_token, }) = provision_host_credentials { cmd = cmd .arg("--provision-host") .arg(provision_host) .arg("--access-token") .arg(access_token) } let scenario_runner_handle = logs.spawn_with_prefixed_logs("scenario runner".into(), cmd)?; let load_description = config.load_description(); let load_description_label = StaticMetricLabel::new("load_description", load_description); let mut load_generator = LoadGenerator::new( config, vec![load_description_label.clone(), backend_version_label], local_log_sink, )?; tracing::info!("Running workload: {:?}", config.workload); load_generator .run( Duration::from_secs(config.duration), config.stats_report, (config.interface, config.port).into(), vec![scenario_runner_handle], ) .await?; Ok(()) } #[derive(Clone)] struct LoadGeneratorState { /// Transmitter to send events to the [EventProcessor] event_sender: mpsc::Sender<Result<Event, serde_json::Error>>, /// List of websocket connections with scenario-runners websocket_connections: Arc<tokio::sync::Mutex<Vec<SplitSink<WebSocket, Message>>>>, } /// LoadGenerator provisions instances and runs scenario-runner against them. It /// has an [EventProcessor] for processing metric events from ScenarioRunner and /// generating a stats report. struct LoadGenerator { event_processor: EventProcessor, /// Transmitter to send events to the [EventProcessor] tx: mpsc::Sender<Result<Event, serde_json::Error>>, } impl LoadGenerator { fn new( config: &Config, metric_labels: Vec<StaticMetricLabel>, local_log_sink: Option<PathBuf>, ) -> anyhow::Result<Self> { let duration = Duration::from_secs(config.duration); let stats = Stats::new(duration, local_log_sink); let (tx, rx) = mpsc::channel(200); Ok(Self { event_processor: EventProcessor { rx, stats, metric_labels, }, tx, }) } async fn sync_handler( ws: WebSocketUpgrade, state: State<LoadGeneratorState>, ) -> Result<impl IntoResponse, HttpResponseError> { Ok(ws.on_upgrade(|ws| Self::process_events(ws, state))) } async fn process_events(ws: WebSocket, state: State<LoadGeneratorState>) { let (tx, mut rx) = ws.split(); state.websocket_connections.lock().await.push(tx); while let Some(message) = rx.next().await { match message { Ok(m) => match m { Message::Text(s) => { let event: Result<Event, _> = serde_json::from_str(&s); if let Err(e) = state.event_sender.send(event).await { report_error(&mut e.into()).await; } }, Message::Pong(_) | Message::Ping(_) => { continue; }, Message::Close(_) => { break; }, Message::Binary(_) => { tracing::error!("Unexpected binary message"); }, }, Err(e) => { report_error(&mut e.into()).await; }, } } } async fn run( &mut self, duration: Duration, stats_report: bool, ws_server_addr: SocketAddr, scenario_runner_handles: Vec<tokio::process::Child>, ) -> anyhow::Result<()> { let state = LoadGeneratorState { event_sender: self.tx.clone(), websocket_connections: Arc::new(tokio::sync::Mutex::new(Vec::new())), }; let router = Router::new() .route("/sync", get(Self::sync_handler)) .with_state(state.clone()); let http_service = ConvexHttpService::new( router, "load-generator", SERVER_VERSION_STR.clone(), 100, Duration::from_secs(60), NoopRouteMapper, ); let serve_http_future = http_service.serve( ws_server_addr, tokio::signal::ctrl_c().map(|_| { tracing::info!("Shutting down load_generator http server"); }), ); let receive_fut = self.event_processor.receive_events(); tokio::select! { _ = serve_http_future => { tracing::error!("Http server completed unexpectedly") }, _ = receive_fut => { tracing::error!("Event processor completed unexpectedly") }, _ = tokio::time::sleep(duration) => { tracing::info!("{duration:?} has passed. Shutting down load generator"); let mut websocket_connections = state.websocket_connections.lock().await; for mut tx in websocket_connections.drain(..) { let _ = tx.send(Message::Close(None)).await; } for mut handle in scenario_runner_handles { handle.kill().await?; } }, } // Wait to give backend time to process any remaining log events. tokio::time::sleep(Duration::from_secs(10)).await; if stats_report { self.event_processor.stats.report(); } self.event_processor.stats.fail_if_too_many_errors()?; Ok(()) } } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct ScenarioMessage { scenario: Scenario, rate: Option<f64>, threads: Option<u32>, } impl From<ScenarioConfig> for ScenarioMessage { fn from(ScenarioConfig { scenario, mode }: ScenarioConfig) -> Self { let (rate, threads) = match mode { Mode::Benchmark(threads) => (None, Some(threads)), Mode::Rate(rate) => { metrics::log_target_qps(&scenario.to_string(), rate, scenario.path()); (Some(rate), None) }, }; Self { scenario, rate, threads, } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server