Skip to main content
Glama

Convex MCP server

Official
by get-convex
log_streaming.rs34.8 kB
use std::{ fmt, fmt::Display, str::FromStr, time::Duration, }; use serde::{ ser::SerializeMap, Deserialize, Serialize, Serializer, }; use serde_json::{ json, Value as JsonValue, }; use tonic::async_trait; use value::heap_size::HeapSize; use crate::{ components::ComponentPath, errors::JsError, execution_context::ExecutionContext, log_lines::LogLineStructured, runtime::{ Runtime, UnixTimestamp, }, types::{ ModuleEnvironment, UdfType, }, }; /// Public worker for the LogManager. #[async_trait] pub trait LogSender: Send + Sync { fn send_logs(&self, logs: Vec<LogEvent>); async fn shutdown(&self) -> anyhow::Result<()>; } /// Structured log #[derive(Debug, Clone)] pub struct LogEvent { /// Rough timestamp of when this event was created, for the user's benefit. /// We provide no guarantees on the consistency of this timestamp across /// topics and log sources - it's best-effort. /// This timestamp is serialized to milliseconds. pub timestamp: UnixTimestamp, pub event: StructuredLogEvent, } /// User-facing UDF stats, that is logged in the UDF execution log /// and might be used for debugging purposes. /// /// TODO(sarah) this is nearly identical to the type in the `usage_tracking` /// crate, but there's a dependency cycle preventing us from using it directly. #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct AggregatedFunctionUsageStats { pub database_read_bytes: u64, pub database_write_bytes: u64, pub database_read_documents: u64, pub storage_read_bytes: u64, pub storage_write_bytes: u64, pub vector_index_read_bytes: u64, pub vector_index_write_bytes: u64, pub action_memory_used_mb: Option<u64>, pub return_bytes: Option<u64>, } #[derive(Serialize, Debug, Clone)] pub struct OccInfo { pub table_name: Option<String>, pub document_id: Option<String>, pub write_source: Option<String>, pub retry_count: u64, } // Nothing yet. Can add information like parent scheduled job, scheduler lag, // etc. #[derive(Serialize, Debug, Clone)] pub struct SchedulerInfo { pub job_id: String, } // When adding a new event type: // - add a Schema type in the tests at the bottom of this file // - consider adding formatting of it in the CLI // - add it to the docs // // Also consider getting rid of the V1 format! #[derive(Debug, Clone)] pub enum StructuredLogEvent { /// Topic for verification logs. These are issued on sink startup and are /// used to test that the backend can authenticate with the sink. Verification, /// Topic for logs generated by `console.*` events. This is considered a /// `SystemLogTopic` since the topic is generated by the backend. Console { source: FunctionEventSource, log_line: LogLineStructured, }, /// Topic that records UDF executions and provides information on the /// execution. FunctionExecution { source: FunctionEventSource, error: Option<JsError>, execution_time: Duration, usage_stats: AggregatedFunctionUsageStats, occ_info: Option<OccInfo>, scheduler_info: Option<SchedulerInfo>, }, /// Topic for exceptions. These happen when a UDF raises an exception from /// JS Exception { error: JsError, user_identifier: Option<sync_types::UserIdentifier>, source: FunctionEventSource, udf_server_version: Option<semver::Version>, }, /// Topic for deployment audit logs. These are issued when developers /// interact with a deployment. DeploymentAuditLog { action: String, metadata: serde_json::Map<String, JsonValue>, }, /// Topic for global stats from the scheduler. For function-specific stats, /// look in FunctionExecution SchedulerStats { lag_seconds: Duration, num_running_jobs: u64, }, ScheduledJobLag { lag_seconds: Duration, }, // User-specified topics -- not yet implemented. // See here for more details: https://www.notion.so/Log-Streaming-in-Convex-19a1dfadd6924c33b29b2796b0f5b2e2 // User { // topic: String, // payload: serde_json::Map<String, JsonValue> // }, } #[derive(Debug, Clone, Copy, PartialEq, Deserialize)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum LogEventFormatVersion { V1, V2, } impl FromStr for LogEventFormatVersion { type Err = anyhow::Error; fn from_str(s: &str) -> Result<Self, Self::Err> { match s { "1" => Ok(Self::V1), "2" => Ok(Self::V2), v => anyhow::bail!("Invalid LogEventFormatVersion: {v}"), } } } impl Display for LogEventFormatVersion { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::V1 => write!(f, "1"), Self::V2 => write!(f, "2"), } } } #[cfg(any(test, feature = "testing"))] impl Default for LogEventFormatVersion { fn default() -> Self { Self::V2 } } /// Structured log impl LogEvent { pub fn default_for_verification<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> { Ok(Self { event: StructuredLogEvent::Verification, timestamp: runtime.unix_timestamp(), }) } #[cfg(any(test, feature = "testing"))] pub fn sample_exception<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> { use sync_types::UserIdentifier; let source = FunctionEventSource { context: ExecutionContext::new_for_test(), component_path: ComponentPath::test_user(), udf_path: "test".to_string(), udf_type: UdfType::Action, module_environment: ModuleEnvironment::Isolate, cached: None, mutation_queue_length: None, mutation_retry_count: None, }; Ok(Self { timestamp: runtime.unix_timestamp(), event: StructuredLogEvent::Exception { error: JsError::from_frames_for_test( "test_message", vec!["test_frame_1", "test_frame_2"], ), user_identifier: Some(UserIdentifier("test|user".to_string())), source, udf_server_version: Some(semver::Version::new(1, 5, 1)), }, }) } pub fn to_json_map( &self, format: LogEventFormatVersion, ) -> anyhow::Result<serde_json::Map<String, JsonValue>> { let object = self.to_json_serializer(format, serde_json::value::Serializer)?; let JsonValue::Object(fields) = object else { unreachable!(); }; Ok(fields) } pub fn to_json_serializer<S: Serializer>( &self, format: LogEventFormatVersion, serializer: S, ) -> Result<S::Ok, S::Error> { let ms = self .timestamp .as_ms_since_epoch() .map_err(serde::ser::Error::custom)?; macro_rules! serialize_map { ({$( $key:literal: $value:expr ),* $(,)?}) => {{ let mut map_builder = serializer.serialize_map(None)?; $(map_builder.serialize_entry($key, &$value)?;)* map_builder.end() }} } match format { LogEventFormatVersion::V1 => match &self.event { StructuredLogEvent::Verification => { serialize_map!({ "_timestamp": ms, "_topic": "_verification", "message": "Convex connection test" }) }, StructuredLogEvent::Console { source, log_line } => { serialize_map!({ "_timestamp": ms, "_topic": "_console", "_functionPath": source.udf_path, "_functionType": source.udf_type, "_functionCached": source.cached, "message": log_line.to_pretty_string() }) }, StructuredLogEvent::FunctionExecution { source, error, execution_time, usage_stats, occ_info: _, scheduler_info: _, } => { let (reason, status) = match error { Some(err) => (Some(err.to_string()), "failure"), None => (None, "success"), }; let execution_time_ms = execution_time.as_millis(); serialize_map!({ "_timestamp": ms, "_topic": "_execution_record", "_functionPath": source.udf_path, "_functionType": source.udf_type, "_functionCached": source.cached, "status": status, "reason": reason, "executionTimeMs": execution_time_ms, "databaseReadBytes": usage_stats.database_read_bytes, "databaseWriteBytes": usage_stats.database_write_bytes, "storageReadBytes": usage_stats.storage_read_bytes, "storageWriteBytes": usage_stats.storage_write_bytes, }) }, StructuredLogEvent::Exception { error, user_identifier, source, udf_server_version, } => { let message = &error.message; let frames: Option<Vec<String>> = error .frames .as_ref() .map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect()); serialize_map!({ "_timestamp": ms, "_topic": "_exception", "_functionPath": source.udf_path, "_functionType": source.udf_type, "_functionCached": source.cached, "message": message, "frames": frames, "udfServerVersion": udf_server_version, "userIdentifier": user_identifier, }) }, StructuredLogEvent::DeploymentAuditLog { action, metadata } => { serialize_map!({ "_timestamp": ms, "_topic": "_audit_log", "action": action, "actionMetadata": metadata }) }, StructuredLogEvent::SchedulerStats { lag_seconds, num_running_jobs, } => serialize_map!({ "_timestamp": ms, "_topic": "_scheduler_stats", "lag_seconds": lag_seconds.as_secs(), "num_running_jobs": num_running_jobs }), StructuredLogEvent::ScheduledJobLag { lag_seconds } => { serialize_map!({ "_timestamp": ms, "_topic": "_scheduled_job_lag", "lag_seconds": lag_seconds.as_secs() }) }, }, LogEventFormatVersion::V2 => match &self.event { StructuredLogEvent::Verification => { serialize_map!({ "timestamp": ms, "topic": "verification", "message": "Convex connection test" }) }, StructuredLogEvent::Console { source, log_line } => { let function_source = source.to_json_map(); let LogLineStructured { messages, level, timestamp, is_truncated, system_metadata, } = log_line; let timestamp_ms = timestamp .as_ms_since_epoch() .map_err(serde::ser::Error::custom)?; serialize_map!({ "timestamp": timestamp_ms, "topic": "console", "function": function_source, "log_level": level.to_string(), "message": messages.join(" "), "is_truncated": is_truncated, "system_code": system_metadata.as_ref().map(|s| &s.code) }) }, StructuredLogEvent::FunctionExecution { source, error, execution_time, usage_stats, occ_info, scheduler_info, } => { let function_source = source.to_json_map(); let (status, error_message) = match error { Some(error) => ("failure", Some(error.to_string())), None => ("success", None), }; #[derive(Serialize)] struct Usage { database_read_bytes: u64, database_write_bytes: u64, database_read_documents: u64, file_storage_read_bytes: u64, file_storage_write_bytes: u64, vector_storage_read_bytes: u64, vector_storage_write_bytes: u64, action_memory_used_mb: Option<u64>, } serialize_map!({ "timestamp": ms, "topic": "function_execution", "function": function_source, "execution_time_ms": execution_time.as_millis(), "status": status, "error_message": error_message, "occ_info": occ_info, "scheduler_info": scheduler_info, "usage": Usage { database_read_bytes: usage_stats.database_read_bytes, database_write_bytes: usage_stats.database_write_bytes, database_read_documents: usage_stats.database_read_documents, file_storage_read_bytes: usage_stats.storage_read_bytes, file_storage_write_bytes: usage_stats.storage_write_bytes, vector_storage_read_bytes: usage_stats.vector_index_read_bytes, vector_storage_write_bytes: usage_stats.vector_index_write_bytes, action_memory_used_mb: usage_stats.action_memory_used_mb } }) }, // This codepath is unused because we filter out logs in default_log_filter and // construct exception logs in the Sentry sink StructuredLogEvent::Exception { error, user_identifier, source, udf_server_version, } => { let message = &error.message; let frames: Option<Vec<String>> = error .frames .as_ref() .map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect()); serialize_map!({ "_timestamp": ms, "_topic": "_exception", "_functionPath": source.udf_path, "_functionType": source.udf_type, "_functionCached": source.cached, "message": message, "frames": frames, "udfServerVersion": udf_server_version, "userIdentifier": user_identifier, }) }, StructuredLogEvent::DeploymentAuditLog { action, metadata } => { serialize_map!({ "timestamp": ms, "topic": "audit_log", "audit_log_action": action, // stringified JSON to avoid "audit_log_metadata": serde_json::to_string(metadata).map_err(serde::ser::Error::custom)? }) }, StructuredLogEvent::SchedulerStats { lag_seconds, num_running_jobs, } => { serialize_map!({ "topic": "scheduler_stats", "timestamp": ms, "lag_seconds": lag_seconds.as_secs(), "num_running_jobs": num_running_jobs }) }, StructuredLogEvent::ScheduledJobLag { lag_seconds } => { serialize_map!({ "timestamp": ms, "topic": "scheduled_job_lag", "lag_seconds": lag_seconds.as_secs() }) }, }, } } } #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum EventSource { Function(FunctionEventSource), System, } #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct FunctionEventSource { pub context: ExecutionContext, pub component_path: ComponentPath, pub udf_path: String, pub udf_type: UdfType, pub module_environment: ModuleEnvironment, // Only queries can be cached, so this is only Some for queries. This is important // information to transmit to the client to distinguish from logs users explicitly created // and logs that we created for by redoing a query when its readset changes. pub cached: Option<bool>, // For mutations, this is the length of the mutation queue at the time the mutation was // executed. This is useful for monitoring and debugging mutation queue backlogs. pub mutation_queue_length: Option<usize>, // For mutations, this is the number of previous failed executions before a successful one. pub mutation_retry_count: Option<usize>, } impl FunctionEventSource { #[cfg(any(test, feature = "testing"))] pub fn new_for_test() -> Self { Self { context: ExecutionContext::new_for_test(), component_path: ComponentPath::test_user(), udf_path: "path/to/file:myFunction".to_string(), udf_type: UdfType::Mutation, module_environment: ModuleEnvironment::Isolate, cached: None, mutation_queue_length: None, mutation_retry_count: None, } } pub fn to_json_map(&self) -> serde_json::Map<String, JsonValue> { let udf_type = match self.udf_type { UdfType::Query => "query", UdfType::Mutation => "mutation", UdfType::Action => "action", UdfType::HttpAction => "http_action", }; let JsonValue::Object(mut fields) = json!({ "path": self.udf_path, "type": udf_type, "cached": self.cached, "request_id": self.context.request_id.to_string(), "mutation_queue_length": self.mutation_queue_length, "mutation_retry_count": self.mutation_retry_count, }) else { unreachable!() }; if let Some(component_path_str) = self.component_path.clone().serialize() { fields.insert( "component_path".to_string(), JsonValue::String(component_path_str), ); } fields } } impl HeapSize for FunctionEventSource { fn heap_size(&self) -> usize { self.component_path.heap_size() + self.udf_path.heap_size() + self.udf_type.heap_size() + self.cached.heap_size() + self.mutation_queue_length.heap_size() } } #[cfg(test)] mod tests { use serde::{ Deserialize, Serialize, }; use serde_json::{ json, Value as JsonValue, }; use utoipa::{ OpenApi, ToSchema, }; use crate::{ components::ComponentPath, execution_context::ExecutionContext, log_lines::{ LogLevel, LogLineStructured, }, log_streaming::{ AggregatedFunctionUsageStats, FunctionEventSource, LogEvent, LogEventFormatVersion, OccInfo, SchedulerInfo, StructuredLogEvent, }, runtime::UnixTimestamp, types::{ ModuleEnvironment, UdfType, }, }; #[test] fn test_serialization_of_console_log_event() -> anyhow::Result<()> { let timestamp = UnixTimestamp::from_millis(1000); let context = ExecutionContext::new_for_test(); let request_id = context.request_id.clone(); let event = LogEvent { timestamp, event: StructuredLogEvent::Console { source: FunctionEventSource { context, component_path: ComponentPath::test_user(), udf_path: "test:test".to_string(), udf_type: UdfType::Query, module_environment: ModuleEnvironment::Isolate, cached: Some(true), mutation_queue_length: None, mutation_retry_count: None, }, log_line: LogLineStructured::new_developer_log_line( LogLevel::Log, vec!["my test log".to_string()], timestamp, ), }, }; // Test serialization let fields: serde_json::Map<String, JsonValue> = event.to_json_map(LogEventFormatVersion::default())?; let value = serde_json::to_value(&fields)?; assert_eq!( value, json!({ "topic": "console", "timestamp": 1000, "function": json!({ "path": "test:test", "type": "query", "cached": true, "request_id": request_id.to_string(), "mutation_queue_length": null, "mutation_retry_count": null }), "log_level": "LOG", "message": "my test log", "is_truncated": false, "system_code": JsonValue::Null }) ); Ok(()) } // Utoipa schemas for log stream events which are for documentation only. // They are cursorily tested to check they can be used to parse some // event log output. // // These types need to be updated manually. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct ConsoleLogEvent { timestamp: u64, #[schema(inline)] function: SchemaFunctionEventSource, log_level: String, message: String, is_truncated: bool, system_code: Option<String>, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct SchemaFunctionEventSource { path: String, r#type: String, cached: Option<bool>, request_id: String, mutation_queue_length: Option<usize>, mutation_retry_count: Option<usize>, component_path: Option<String>, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct SchemaOccInfo { table_name: Option<String>, document_id: Option<String>, write_source: Option<String>, retry_count: u64, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct SchemaSchedulerInfo { job_id: String, } // Additional log event schemas #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct VerificationEvent { timestamp: u64, message: String, // "Convex connection test" } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct FunctionExecutionEvent { timestamp: u64, #[schema(inline)] function: SchemaFunctionEventSource, execution_time_ms: u64, status: String, // "success" or "failure" error_message: Option<String>, #[schema(inline)] occ_info: Option<SchemaOccInfo>, #[schema(inline)] scheduler_info: Option<SchemaSchedulerInfo>, #[schema(inline)] usage: UsageStats, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct UsageStats { database_read_bytes: u64, database_write_bytes: u64, database_read_documents: u64, file_storage_read_bytes: u64, file_storage_write_bytes: u64, vector_storage_read_bytes: u64, vector_storage_write_bytes: u64, action_memory_used_mb: Option<u64>, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct DeploymentAuditLogEvent { timestamp: u64, audit_log_action: String, audit_log_metadata: String, // JSON-stringified metadata } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct SchedulerStatsEvent { timestamp: u64, lag_seconds: u64, num_running_jobs: u64, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[allow(dead_code)] struct ScheduledJobLagEvent { timestamp: u64, lag_seconds: u64, } // Union type for all log events, discriminated by topic field #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] #[serde(tag = "topic")] #[allow(dead_code)] enum LogStreamEvent { #[serde(rename = "console")] Console(ConsoleLogEvent), #[serde(rename = "verification")] Verification(VerificationEvent), #[serde(rename = "function_execution")] FunctionExecution(FunctionExecutionEvent), #[serde(rename = "audit_log")] DeploymentAuditLog(DeploymentAuditLogEvent), #[serde(rename = "scheduler_stats")] SchedulerStats(SchedulerStatsEvent), #[serde(rename = "scheduled_job_lag")] ScheduledJobLag(ScheduledJobLagEvent), } // OpenAPI document for log stream schemas #[derive(OpenApi)] #[openapi( info( title = "Convex Log Stream Events", version = "2.0.0", description = "Schema definitions for Convex log stream events (V2 format)" ), components(schemas(LogStreamEvent)) )] struct LogStreamApiDoc; #[test] fn test_v2_events_deserialize_to_schemas() -> anyhow::Result<()> { let verification_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(1000), event: StructuredLogEvent::Verification, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(verification_json)?; let console_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(2000), event: StructuredLogEvent::Console { source: FunctionEventSource { context: ExecutionContext::new_for_test(), component_path: ComponentPath::test_user(), udf_path: "test:console".to_string(), udf_type: UdfType::Query, module_environment: ModuleEnvironment::Isolate, cached: Some(true), mutation_queue_length: None, mutation_retry_count: None, }, log_line: LogLineStructured { messages: vec!["test console log".to_string()].into(), level: LogLevel::Log, is_truncated: false, timestamp: UnixTimestamp::from_millis(2000), system_metadata: None, }, }, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(console_json)?; let function_execution_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(3000), event: StructuredLogEvent::FunctionExecution { source: FunctionEventSource { context: ExecutionContext::new_for_test(), component_path: ComponentPath::test_user(), udf_path: "test:function".to_string(), udf_type: UdfType::Mutation, module_environment: ModuleEnvironment::Isolate, cached: None, mutation_queue_length: Some(2), mutation_retry_count: Some(0), }, error: None, execution_time: std::time::Duration::from_millis(100), usage_stats: AggregatedFunctionUsageStats { database_read_bytes: 512, database_write_bytes: 256, database_read_documents: 3, storage_read_bytes: 0, storage_write_bytes: 0, vector_index_read_bytes: 0, vector_index_write_bytes: 0, action_memory_used_mb: None, return_bytes: Some(64), }, occ_info: Some(OccInfo { table_name: Some("test_table".to_string()), document_id: Some("doc123".to_string()), write_source: Some("mutation".to_string()), retry_count: 1, }), scheduler_info: Some(SchedulerInfo { job_id: "scheduled_job_456".to_string(), }), }, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(function_execution_json)?; let mut metadata = serde_json::Map::new(); metadata.insert( "action".to_string(), serde_json::Value::String("deploy".to_string()), ); let audit_log_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(4000), event: StructuredLogEvent::DeploymentAuditLog { action: "schema_push".to_string(), metadata, }, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(audit_log_json)?; let scheduler_stats_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(5000), event: StructuredLogEvent::SchedulerStats { lag_seconds: std::time::Duration::from_secs(10), num_running_jobs: 25, }, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(scheduler_stats_json)?; let job_lag_json = serde_json::to_value( &LogEvent { timestamp: UnixTimestamp::from_millis(6000), event: StructuredLogEvent::ScheduledJobLag { lag_seconds: std::time::Duration::from_secs(5), }, } .to_json_map(LogEventFormatVersion::V2)?, )?; let _: LogStreamEvent = serde_json::from_value(job_lag_json)?; Ok(()) } #[test] fn test_log_stream_schema_matches() -> anyhow::Result<()> { use std::{ fs, path::Path, }; const LOG_STREAM_SCHEMA_FILE: &str = "../../npm-packages/convex/log-stream-openapi.json"; // Generate OpenAPI spec using utoipa let openapi_spec = LogStreamApiDoc::openapi(); let current_schema = openapi_spec.to_pretty_json()?; // Check if file exists and compare if Path::new(LOG_STREAM_SCHEMA_FILE).exists() { let existing_schema = fs::read_to_string(LOG_STREAM_SCHEMA_FILE)?; if existing_schema.trim() != current_schema.trim() { // Write updated schema fs::write(LOG_STREAM_SCHEMA_FILE, &current_schema)?; panic!( "{LOG_STREAM_SCHEMA_FILE} does not match current schema. This test \ automatically updated the file so you can run again: `cargo test -p common \ test_log_stream_schema_matches`" ); } } else { // Create directory if it doesn't exist if let Some(parent) = Path::new(LOG_STREAM_SCHEMA_FILE).parent() { fs::create_dir_all(parent)?; } fs::write(LOG_STREAM_SCHEMA_FILE, &current_schema)?; panic!( "Created new {LOG_STREAM_SCHEMA_FILE}. Run the test again to verify: `cargo test \ -p common test_log_stream_schema_matches`" ); } Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server