Skip to main content
Glama

Convex MCP server

Official
by get-convex
mock_sink.rs3.26 kB
use std::sync::{ Arc, LazyLock, }; use common::{ backoff::Backoff, errors::report_error, log_streaming::LogEvent, runtime::Runtime, }; use parking_lot::{ Mutex, RwLock, }; use tokio::sync::mpsc; use crate::{ consts, LogSinkClient, }; /// MockSink directs all logs to a static buffer pub struct MockSink<RT: Runtime> { runtime: RT, events_receiver: mpsc::Receiver<Vec<Arc<LogEvent>>>, } /// The buffer MockSink writes events to. This is explicitly a static global /// since there is no effective and simple way to dependency inject a shared /// vector into the MockSink that would be accessible to test code: /// /// Passing a shared vector to MockSink::start is ineffective since this is only /// invoked by LogManager::config_to_log_sink_client which is not directly /// invoked by test code but instead by a database listener, making it difficult /// to obtain a reference to the shared vector outside of the sink. /// /// Creating a shared vector and passing to LogSinkClient would also be /// ineffective since testing interfaces through an Application instance and /// Application only owns a LogManagerClient, not a LogManager itself. /// LogManagerClient does not have direct access to LogManager's LogSinkClients. /// /// For safety, this module is only compiled in testing. pub static MOCK_SINK_EVENTS_BUFFER: LazyLock<Arc<RwLock<Vec<Arc<LogEvent>>>>> = LazyLock::new(|| Arc::new(RwLock::new(vec![]))); impl<RT: Runtime> MockSink<RT> { pub async fn start(runtime: RT) -> anyhow::Result<LogSinkClient> { let (tx, rx) = mpsc::channel(consts::MOCK_SINK_EVENTS_BUFFER_SIZE); let sink = Self { runtime: runtime.clone(), events_receiver: rx, }; let handle = Arc::new(Mutex::new(runtime.spawn("mock_sink", sink.go()))); let client = LogSinkClient { _handle: handle, events_sender: tx, }; Ok(client) } async fn go(mut self) { let mut backoff = Backoff::new( consts::MOCK_SINK_INITIAL_BACKOFF, consts::MOCK_SINK_MAX_BACKOFF, ); loop { match self.events_receiver.recv().await { None => { // The sender was closed, event loop should shutdown tracing::warn!("Stopping MockSink. Sender was closed."); return; }, Some(events) => { while let Err(mut e) = self.process_events(events.clone()).await { let delay = backoff.fail(&mut self.runtime.rng()); tracing::error!( "Error emitting event in MockSink: {e:?}. Waiting {delay:?}ms before \ retrying" ); report_error(&mut e).await; self.runtime.wait(delay).await; } backoff.reset(); }, } } } async fn process_events(&mut self, mut events: Vec<Arc<LogEvent>>) -> anyhow::Result<()> { let mut buf = MOCK_SINK_EVENTS_BUFFER.write(); buf.append(&mut events); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server