Skip to main content
Glama

Convex MCP server

Official
by get-convex
local_sink.rs5.74 kB
use std::{ fs::OpenOptions, io::Write, path::PathBuf, str::FromStr, sync::Arc, }; use common::{ backoff::Backoff, errors::report_error, log_streaming::{ LogEvent, LogEventFormatVersion, }, runtime::Runtime, }; use parking_lot::Mutex; use serde_json::Value as JsonValue; use tokio::sync::mpsc; use crate::{ consts, LogSinkClient, }; pub const LOG_EVENT_FORMAT_FOR_LOCAL_SINK: LogEventFormatVersion = LogEventFormatVersion::V2; pub struct LocalSink<RT: Runtime> { runtime: RT, events_receiver: mpsc::Receiver<Vec<Arc<LogEvent>>>, config: LocalSinkConfig, } #[derive(Clone, Debug)] pub struct LocalSinkConfig { path: PathBuf, } impl FromStr for LocalSinkConfig { type Err = anyhow::Error; fn from_str(s: &str) -> Result<Self, Self::Err> { Ok(Self { path: s.parse()? }) } } impl<RT: Runtime> LocalSink<RT> { pub async fn start(runtime: RT, config: LocalSinkConfig) -> anyhow::Result<LogSinkClient> { let (tx, rx) = mpsc::channel(consts::LOCAL_SINK_EVENTS_BUFFER_SIZE); let sink = Self { runtime: runtime.clone(), events_receiver: rx, config: config.clone(), }; let handle = Arc::new(Mutex::new(runtime.spawn("local_sink", sink.go()))); let client = LogSinkClient { _handle: handle, events_sender: tx, }; tracing::info!("Started LocalSink at {:?}", config.path); Ok(client) } async fn go(mut self) { let mut backoff = Backoff::new( consts::LOCAL_SINK_INITIAL_BACKOFF, consts::LOCAL_SINK_MAX_BACKOFF, ); loop { match self.events_receiver.recv().await { None => { // The sender was closed, event loop should shutdown tracing::warn!("Stopping LocalSink. Sender was closed."); return; }, Some(events) => { while let Err(mut e) = self.process_events(events.clone()).await { let delay = backoff.fail(&mut self.runtime.rng()); tracing::error!( "Error emitting event in LocalSink: {e:?}. Waiting {delay:?}ms before \ retrying" ); report_error(&mut e).await; self.runtime.wait(delay).await; } backoff.reset(); }, } } } async fn process_events(&mut self, events: Vec<Arc<LogEvent>>) -> anyhow::Result<()> { let mut file = OpenOptions::new() .append(true) .create(true) .open(self.config.path.clone())?; let num_events = events.len(); for event in events { let fields: serde_json::Map<String, JsonValue> = event.to_json_map(LOG_EVENT_FORMAT_FOR_LOCAL_SINK)?; let mut event = serde_json::to_vec(&fields)?; event.extend_from_slice("\n".as_bytes()); file.write_all(&event)?; } file.sync_all()?; tracing::debug!( "Wrote {} events to file: {:?}", num_events, self.config.path.clone() ); Ok(()) } } #[cfg(test)] mod test { use std::{ sync::Arc, time::Duration, }; use common::{ log_lines::{ LogLevel, LogLineStructured, }, log_streaming::{ FunctionEventSource, LogEvent, StructuredLogEvent, }, runtime::Runtime, }; use runtime::testing::TestRuntime; use tempfile::TempDir; use crate::sinks::local_sink::{ LocalSink, LocalSinkConfig, LOG_EVENT_FORMAT_FOR_LOCAL_SINK, }; #[convex_macro::test_runtime] async fn test_local_sink(rt: TestRuntime) -> anyhow::Result<()> { let tempdir = TempDir::new()?; let path = tempdir.path().join("test_sink.log"); let test_sink_config = LocalSinkConfig { path }; let client = LocalSink::start(rt.clone(), test_sink_config.clone()).await?; let event = LogEvent { timestamp: rt.unix_timestamp(), event: StructuredLogEvent::Console { source: FunctionEventSource::new_for_test(), log_line: LogLineStructured::new_developer_log_line( LogLevel::Log, vec!["This is a test log.".to_string()], rt.unix_timestamp(), ), }, }; client .events_sender .try_send(vec![Arc::new(event.clone())])?; rt.wait(Duration::from_secs(1)).await; let contents = std::fs::read_to_string(test_sink_config.path.clone())?; let expected_contents = serde_json::to_string(&event.to_json_map(LOG_EVENT_FORMAT_FOR_LOCAL_SINK)?)? + "\n"; assert_eq!(contents, expected_contents); // Sending another event should append to the file let event = LogEvent { timestamp: rt.unix_timestamp(), event: StructuredLogEvent::Verification, }; client .events_sender .try_send(vec![Arc::new(event.clone())])?; rt.wait(Duration::from_secs(1)).await; let contents = std::fs::read_to_string(test_sink_config.path)?; let log_contents_2 = serde_json::to_string(&event.to_json_map(LOG_EVENT_FORMAT_FOR_LOCAL_SINK)?)? + "\n"; assert_eq!(contents, expected_contents + &log_contents_2); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server