log_streaming.rs•34.8 kB
use std::{
fmt,
fmt::Display,
str::FromStr,
time::Duration,
};
use serde::{
ser::SerializeMap,
Deserialize,
Serialize,
Serializer,
};
use serde_json::{
json,
Value as JsonValue,
};
use tonic::async_trait;
use value::heap_size::HeapSize;
use crate::{
components::ComponentPath,
errors::JsError,
execution_context::ExecutionContext,
log_lines::LogLineStructured,
runtime::{
Runtime,
UnixTimestamp,
},
types::{
ModuleEnvironment,
UdfType,
},
};
/// Public worker for the LogManager.
#[async_trait]
pub trait LogSender: Send + Sync {
fn send_logs(&self, logs: Vec<LogEvent>);
async fn shutdown(&self) -> anyhow::Result<()>;
}
/// Structured log
#[derive(Debug, Clone)]
pub struct LogEvent {
/// Rough timestamp of when this event was created, for the user's benefit.
/// We provide no guarantees on the consistency of this timestamp across
/// topics and log sources - it's best-effort.
/// This timestamp is serialized to milliseconds.
pub timestamp: UnixTimestamp,
pub event: StructuredLogEvent,
}
/// User-facing UDF stats, that is logged in the UDF execution log
/// and might be used for debugging purposes.
///
/// TODO(sarah) this is nearly identical to the type in the `usage_tracking`
/// crate, but there's a dependency cycle preventing us from using it directly.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct AggregatedFunctionUsageStats {
pub database_read_bytes: u64,
pub database_write_bytes: u64,
pub database_read_documents: u64,
pub storage_read_bytes: u64,
pub storage_write_bytes: u64,
pub vector_index_read_bytes: u64,
pub vector_index_write_bytes: u64,
pub action_memory_used_mb: Option<u64>,
pub return_bytes: Option<u64>,
}
#[derive(Serialize, Debug, Clone)]
pub struct OccInfo {
pub table_name: Option<String>,
pub document_id: Option<String>,
pub write_source: Option<String>,
pub retry_count: u64,
}
// Nothing yet. Can add information like parent scheduled job, scheduler lag,
// etc.
#[derive(Serialize, Debug, Clone)]
pub struct SchedulerInfo {
pub job_id: String,
}
// When adding a new event type:
// - add a Schema type in the tests at the bottom of this file
// - consider adding formatting of it in the CLI
// - add it to the docs
//
// Also consider getting rid of the V1 format!
#[derive(Debug, Clone)]
pub enum StructuredLogEvent {
/// Topic for verification logs. These are issued on sink startup and are
/// used to test that the backend can authenticate with the sink.
Verification,
/// Topic for logs generated by `console.*` events. This is considered a
/// `SystemLogTopic` since the topic is generated by the backend.
Console {
source: FunctionEventSource,
log_line: LogLineStructured,
},
/// Topic that records UDF executions and provides information on the
/// execution.
FunctionExecution {
source: FunctionEventSource,
error: Option<JsError>,
execution_time: Duration,
usage_stats: AggregatedFunctionUsageStats,
occ_info: Option<OccInfo>,
scheduler_info: Option<SchedulerInfo>,
},
/// Topic for exceptions. These happen when a UDF raises an exception from
/// JS
Exception {
error: JsError,
user_identifier: Option<sync_types::UserIdentifier>,
source: FunctionEventSource,
udf_server_version: Option<semver::Version>,
},
/// Topic for deployment audit logs. These are issued when developers
/// interact with a deployment.
DeploymentAuditLog {
action: String,
metadata: serde_json::Map<String, JsonValue>,
},
/// Topic for global stats from the scheduler. For function-specific stats,
/// look in FunctionExecution
SchedulerStats {
lag_seconds: Duration,
num_running_jobs: u64,
},
ScheduledJobLag {
lag_seconds: Duration,
},
// User-specified topics -- not yet implemented.
// See here for more details: https://www.notion.so/Log-Streaming-in-Convex-19a1dfadd6924c33b29b2796b0f5b2e2
// User {
// topic: String,
// payload: serde_json::Map<String, JsonValue>
// },
}
#[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum LogEventFormatVersion {
V1,
V2,
}
impl FromStr for LogEventFormatVersion {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" => Ok(Self::V1),
"2" => Ok(Self::V2),
v => anyhow::bail!("Invalid LogEventFormatVersion: {v}"),
}
}
}
impl Display for LogEventFormatVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V1 => write!(f, "1"),
Self::V2 => write!(f, "2"),
}
}
}
#[cfg(any(test, feature = "testing"))]
impl Default for LogEventFormatVersion {
fn default() -> Self {
Self::V2
}
}
/// Structured log
impl LogEvent {
pub fn default_for_verification<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> {
Ok(Self {
event: StructuredLogEvent::Verification,
timestamp: runtime.unix_timestamp(),
})
}
#[cfg(any(test, feature = "testing"))]
pub fn sample_exception<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> {
use sync_types::UserIdentifier;
let source = FunctionEventSource {
context: ExecutionContext::new_for_test(),
component_path: ComponentPath::test_user(),
udf_path: "test".to_string(),
udf_type: UdfType::Action,
module_environment: ModuleEnvironment::Isolate,
cached: None,
mutation_queue_length: None,
mutation_retry_count: None,
};
Ok(Self {
timestamp: runtime.unix_timestamp(),
event: StructuredLogEvent::Exception {
error: JsError::from_frames_for_test(
"test_message",
vec!["test_frame_1", "test_frame_2"],
),
user_identifier: Some(UserIdentifier("test|user".to_string())),
source,
udf_server_version: Some(semver::Version::new(1, 5, 1)),
},
})
}
pub fn to_json_map(
&self,
format: LogEventFormatVersion,
) -> anyhow::Result<serde_json::Map<String, JsonValue>> {
let object = self.to_json_serializer(format, serde_json::value::Serializer)?;
let JsonValue::Object(fields) = object else {
unreachable!();
};
Ok(fields)
}
pub fn to_json_serializer<S: Serializer>(
&self,
format: LogEventFormatVersion,
serializer: S,
) -> Result<S::Ok, S::Error> {
let ms = self
.timestamp
.as_ms_since_epoch()
.map_err(serde::ser::Error::custom)?;
macro_rules! serialize_map {
({$(
$key:literal: $value:expr
),* $(,)?}) => {{
let mut map_builder = serializer.serialize_map(None)?;
$(map_builder.serialize_entry($key, &$value)?;)*
map_builder.end()
}}
}
match format {
LogEventFormatVersion::V1 => match &self.event {
StructuredLogEvent::Verification => {
serialize_map!({
"_timestamp": ms,
"_topic": "_verification",
"message": "Convex connection test"
})
},
StructuredLogEvent::Console { source, log_line } => {
serialize_map!({
"_timestamp": ms,
"_topic": "_console",
"_functionPath": source.udf_path,
"_functionType": source.udf_type,
"_functionCached": source.cached,
"message": log_line.to_pretty_string()
})
},
StructuredLogEvent::FunctionExecution {
source,
error,
execution_time,
usage_stats,
occ_info: _,
scheduler_info: _,
} => {
let (reason, status) = match error {
Some(err) => (Some(err.to_string()), "failure"),
None => (None, "success"),
};
let execution_time_ms = execution_time.as_millis();
serialize_map!({
"_timestamp": ms,
"_topic": "_execution_record",
"_functionPath": source.udf_path,
"_functionType": source.udf_type,
"_functionCached": source.cached,
"status": status,
"reason": reason,
"executionTimeMs": execution_time_ms,
"databaseReadBytes": usage_stats.database_read_bytes,
"databaseWriteBytes": usage_stats.database_write_bytes,
"storageReadBytes": usage_stats.storage_read_bytes,
"storageWriteBytes": usage_stats.storage_write_bytes,
})
},
StructuredLogEvent::Exception {
error,
user_identifier,
source,
udf_server_version,
} => {
let message = &error.message;
let frames: Option<Vec<String>> = error
.frames
.as_ref()
.map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect());
serialize_map!({
"_timestamp": ms,
"_topic": "_exception",
"_functionPath": source.udf_path,
"_functionType": source.udf_type,
"_functionCached": source.cached,
"message": message,
"frames": frames,
"udfServerVersion": udf_server_version,
"userIdentifier": user_identifier,
})
},
StructuredLogEvent::DeploymentAuditLog { action, metadata } => {
serialize_map!({
"_timestamp": ms,
"_topic": "_audit_log",
"action": action,
"actionMetadata": metadata
})
},
StructuredLogEvent::SchedulerStats {
lag_seconds,
num_running_jobs,
} => serialize_map!({
"_timestamp": ms,
"_topic": "_scheduler_stats",
"lag_seconds": lag_seconds.as_secs(), "num_running_jobs": num_running_jobs
}),
StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
serialize_map!({
"_timestamp": ms,
"_topic": "_scheduled_job_lag",
"lag_seconds": lag_seconds.as_secs()
})
},
},
LogEventFormatVersion::V2 => match &self.event {
StructuredLogEvent::Verification => {
serialize_map!({
"timestamp": ms,
"topic": "verification",
"message": "Convex connection test"
})
},
StructuredLogEvent::Console { source, log_line } => {
let function_source = source.to_json_map();
let LogLineStructured {
messages,
level,
timestamp,
is_truncated,
system_metadata,
} = log_line;
let timestamp_ms = timestamp
.as_ms_since_epoch()
.map_err(serde::ser::Error::custom)?;
serialize_map!({
"timestamp": timestamp_ms,
"topic": "console",
"function": function_source,
"log_level": level.to_string(),
"message": messages.join(" "),
"is_truncated": is_truncated,
"system_code": system_metadata.as_ref().map(|s| &s.code)
})
},
StructuredLogEvent::FunctionExecution {
source,
error,
execution_time,
usage_stats,
occ_info,
scheduler_info,
} => {
let function_source = source.to_json_map();
let (status, error_message) = match error {
Some(error) => ("failure", Some(error.to_string())),
None => ("success", None),
};
#[derive(Serialize)]
struct Usage {
database_read_bytes: u64,
database_write_bytes: u64,
database_read_documents: u64,
file_storage_read_bytes: u64,
file_storage_write_bytes: u64,
vector_storage_read_bytes: u64,
vector_storage_write_bytes: u64,
action_memory_used_mb: Option<u64>,
}
serialize_map!({
"timestamp": ms,
"topic": "function_execution",
"function": function_source,
"execution_time_ms": execution_time.as_millis(),
"status": status,
"error_message": error_message,
"occ_info": occ_info,
"scheduler_info": scheduler_info,
"usage": Usage {
database_read_bytes: usage_stats.database_read_bytes,
database_write_bytes: usage_stats.database_write_bytes,
database_read_documents: usage_stats.database_read_documents,
file_storage_read_bytes: usage_stats.storage_read_bytes,
file_storage_write_bytes: usage_stats.storage_write_bytes,
vector_storage_read_bytes: usage_stats.vector_index_read_bytes,
vector_storage_write_bytes: usage_stats.vector_index_write_bytes,
action_memory_used_mb: usage_stats.action_memory_used_mb
}
})
},
// This codepath is unused because we filter out logs in default_log_filter and
// construct exception logs in the Sentry sink
StructuredLogEvent::Exception {
error,
user_identifier,
source,
udf_server_version,
} => {
let message = &error.message;
let frames: Option<Vec<String>> = error
.frames
.as_ref()
.map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect());
serialize_map!({
"_timestamp": ms,
"_topic": "_exception",
"_functionPath": source.udf_path,
"_functionType": source.udf_type,
"_functionCached": source.cached,
"message": message,
"frames": frames,
"udfServerVersion": udf_server_version,
"userIdentifier": user_identifier,
})
},
StructuredLogEvent::DeploymentAuditLog { action, metadata } => {
serialize_map!({
"timestamp": ms,
"topic": "audit_log",
"audit_log_action": action,
// stringified JSON to avoid
"audit_log_metadata": serde_json::to_string(metadata).map_err(serde::ser::Error::custom)?
})
},
StructuredLogEvent::SchedulerStats {
lag_seconds,
num_running_jobs,
} => {
serialize_map!({
"topic": "scheduler_stats",
"timestamp": ms,
"lag_seconds": lag_seconds.as_secs(),
"num_running_jobs": num_running_jobs
})
},
StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
serialize_map!({
"timestamp": ms,
"topic": "scheduled_job_lag",
"lag_seconds": lag_seconds.as_secs()
})
},
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum EventSource {
Function(FunctionEventSource),
System,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct FunctionEventSource {
pub context: ExecutionContext,
pub component_path: ComponentPath,
pub udf_path: String,
pub udf_type: UdfType,
pub module_environment: ModuleEnvironment,
// Only queries can be cached, so this is only Some for queries. This is important
// information to transmit to the client to distinguish from logs users explicitly created
// and logs that we created for by redoing a query when its readset changes.
pub cached: Option<bool>,
// For mutations, this is the length of the mutation queue at the time the mutation was
// executed. This is useful for monitoring and debugging mutation queue backlogs.
pub mutation_queue_length: Option<usize>,
// For mutations, this is the number of previous failed executions before a successful one.
pub mutation_retry_count: Option<usize>,
}
impl FunctionEventSource {
#[cfg(any(test, feature = "testing"))]
pub fn new_for_test() -> Self {
Self {
context: ExecutionContext::new_for_test(),
component_path: ComponentPath::test_user(),
udf_path: "path/to/file:myFunction".to_string(),
udf_type: UdfType::Mutation,
module_environment: ModuleEnvironment::Isolate,
cached: None,
mutation_queue_length: None,
mutation_retry_count: None,
}
}
pub fn to_json_map(&self) -> serde_json::Map<String, JsonValue> {
let udf_type = match self.udf_type {
UdfType::Query => "query",
UdfType::Mutation => "mutation",
UdfType::Action => "action",
UdfType::HttpAction => "http_action",
};
let JsonValue::Object(mut fields) = json!({
"path": self.udf_path,
"type": udf_type,
"cached": self.cached,
"request_id": self.context.request_id.to_string(),
"mutation_queue_length": self.mutation_queue_length,
"mutation_retry_count": self.mutation_retry_count,
}) else {
unreachable!()
};
if let Some(component_path_str) = self.component_path.clone().serialize() {
fields.insert(
"component_path".to_string(),
JsonValue::String(component_path_str),
);
}
fields
}
}
impl HeapSize for FunctionEventSource {
fn heap_size(&self) -> usize {
self.component_path.heap_size()
+ self.udf_path.heap_size()
+ self.udf_type.heap_size()
+ self.cached.heap_size()
+ self.mutation_queue_length.heap_size()
}
}
#[cfg(test)]
mod tests {
use serde::{
Deserialize,
Serialize,
};
use serde_json::{
json,
Value as JsonValue,
};
use utoipa::{
OpenApi,
ToSchema,
};
use crate::{
components::ComponentPath,
execution_context::ExecutionContext,
log_lines::{
LogLevel,
LogLineStructured,
},
log_streaming::{
AggregatedFunctionUsageStats,
FunctionEventSource,
LogEvent,
LogEventFormatVersion,
OccInfo,
SchedulerInfo,
StructuredLogEvent,
},
runtime::UnixTimestamp,
types::{
ModuleEnvironment,
UdfType,
},
};
#[test]
fn test_serialization_of_console_log_event() -> anyhow::Result<()> {
let timestamp = UnixTimestamp::from_millis(1000);
let context = ExecutionContext::new_for_test();
let request_id = context.request_id.clone();
let event = LogEvent {
timestamp,
event: StructuredLogEvent::Console {
source: FunctionEventSource {
context,
component_path: ComponentPath::test_user(),
udf_path: "test:test".to_string(),
udf_type: UdfType::Query,
module_environment: ModuleEnvironment::Isolate,
cached: Some(true),
mutation_queue_length: None,
mutation_retry_count: None,
},
log_line: LogLineStructured::new_developer_log_line(
LogLevel::Log,
vec!["my test log".to_string()],
timestamp,
),
},
};
// Test serialization
let fields: serde_json::Map<String, JsonValue> =
event.to_json_map(LogEventFormatVersion::default())?;
let value = serde_json::to_value(&fields)?;
assert_eq!(
value,
json!({
"topic": "console",
"timestamp": 1000,
"function": json!({
"path": "test:test",
"type": "query",
"cached": true,
"request_id": request_id.to_string(),
"mutation_queue_length": null,
"mutation_retry_count": null
}),
"log_level": "LOG",
"message": "my test log",
"is_truncated": false,
"system_code": JsonValue::Null
})
);
Ok(())
}
// Utoipa schemas for log stream events which are for documentation only.
// They are cursorily tested to check they can be used to parse some
// event log output.
//
// These types need to be updated manually.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct ConsoleLogEvent {
timestamp: u64,
#[schema(inline)]
function: SchemaFunctionEventSource,
log_level: String,
message: String,
is_truncated: bool,
system_code: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct SchemaFunctionEventSource {
path: String,
r#type: String,
cached: Option<bool>,
request_id: String,
mutation_queue_length: Option<usize>,
mutation_retry_count: Option<usize>,
component_path: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct SchemaOccInfo {
table_name: Option<String>,
document_id: Option<String>,
write_source: Option<String>,
retry_count: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct SchemaSchedulerInfo {
job_id: String,
}
// Additional log event schemas
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct VerificationEvent {
timestamp: u64,
message: String, // "Convex connection test"
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct FunctionExecutionEvent {
timestamp: u64,
#[schema(inline)]
function: SchemaFunctionEventSource,
execution_time_ms: u64,
status: String, // "success" or "failure"
error_message: Option<String>,
#[schema(inline)]
occ_info: Option<SchemaOccInfo>,
#[schema(inline)]
scheduler_info: Option<SchemaSchedulerInfo>,
#[schema(inline)]
usage: UsageStats,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct UsageStats {
database_read_bytes: u64,
database_write_bytes: u64,
database_read_documents: u64,
file_storage_read_bytes: u64,
file_storage_write_bytes: u64,
vector_storage_read_bytes: u64,
vector_storage_write_bytes: u64,
action_memory_used_mb: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct DeploymentAuditLogEvent {
timestamp: u64,
audit_log_action: String,
audit_log_metadata: String, // JSON-stringified metadata
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct SchedulerStatsEvent {
timestamp: u64,
lag_seconds: u64,
num_running_jobs: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[allow(dead_code)]
struct ScheduledJobLagEvent {
timestamp: u64,
lag_seconds: u64,
}
// Union type for all log events, discriminated by topic field
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(tag = "topic")]
#[allow(dead_code)]
enum LogStreamEvent {
#[serde(rename = "console")]
Console(ConsoleLogEvent),
#[serde(rename = "verification")]
Verification(VerificationEvent),
#[serde(rename = "function_execution")]
FunctionExecution(FunctionExecutionEvent),
#[serde(rename = "audit_log")]
DeploymentAuditLog(DeploymentAuditLogEvent),
#[serde(rename = "scheduler_stats")]
SchedulerStats(SchedulerStatsEvent),
#[serde(rename = "scheduled_job_lag")]
ScheduledJobLag(ScheduledJobLagEvent),
}
// OpenAPI document for log stream schemas
#[derive(OpenApi)]
#[openapi(
info(
title = "Convex Log Stream Events",
version = "2.0.0",
description = "Schema definitions for Convex log stream events (V2 format)"
),
components(schemas(LogStreamEvent))
)]
struct LogStreamApiDoc;
#[test]
fn test_v2_events_deserialize_to_schemas() -> anyhow::Result<()> {
let verification_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(1000),
event: StructuredLogEvent::Verification,
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(verification_json)?;
let console_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(2000),
event: StructuredLogEvent::Console {
source: FunctionEventSource {
context: ExecutionContext::new_for_test(),
component_path: ComponentPath::test_user(),
udf_path: "test:console".to_string(),
udf_type: UdfType::Query,
module_environment: ModuleEnvironment::Isolate,
cached: Some(true),
mutation_queue_length: None,
mutation_retry_count: None,
},
log_line: LogLineStructured {
messages: vec!["test console log".to_string()].into(),
level: LogLevel::Log,
is_truncated: false,
timestamp: UnixTimestamp::from_millis(2000),
system_metadata: None,
},
},
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(console_json)?;
let function_execution_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(3000),
event: StructuredLogEvent::FunctionExecution {
source: FunctionEventSource {
context: ExecutionContext::new_for_test(),
component_path: ComponentPath::test_user(),
udf_path: "test:function".to_string(),
udf_type: UdfType::Mutation,
module_environment: ModuleEnvironment::Isolate,
cached: None,
mutation_queue_length: Some(2),
mutation_retry_count: Some(0),
},
error: None,
execution_time: std::time::Duration::from_millis(100),
usage_stats: AggregatedFunctionUsageStats {
database_read_bytes: 512,
database_write_bytes: 256,
database_read_documents: 3,
storage_read_bytes: 0,
storage_write_bytes: 0,
vector_index_read_bytes: 0,
vector_index_write_bytes: 0,
action_memory_used_mb: None,
return_bytes: Some(64),
},
occ_info: Some(OccInfo {
table_name: Some("test_table".to_string()),
document_id: Some("doc123".to_string()),
write_source: Some("mutation".to_string()),
retry_count: 1,
}),
scheduler_info: Some(SchedulerInfo {
job_id: "scheduled_job_456".to_string(),
}),
},
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(function_execution_json)?;
let mut metadata = serde_json::Map::new();
metadata.insert(
"action".to_string(),
serde_json::Value::String("deploy".to_string()),
);
let audit_log_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(4000),
event: StructuredLogEvent::DeploymentAuditLog {
action: "schema_push".to_string(),
metadata,
},
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(audit_log_json)?;
let scheduler_stats_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(5000),
event: StructuredLogEvent::SchedulerStats {
lag_seconds: std::time::Duration::from_secs(10),
num_running_jobs: 25,
},
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(scheduler_stats_json)?;
let job_lag_json = serde_json::to_value(
&LogEvent {
timestamp: UnixTimestamp::from_millis(6000),
event: StructuredLogEvent::ScheduledJobLag {
lag_seconds: std::time::Duration::from_secs(5),
},
}
.to_json_map(LogEventFormatVersion::V2)?,
)?;
let _: LogStreamEvent = serde_json::from_value(job_lag_json)?;
Ok(())
}
#[test]
fn test_log_stream_schema_matches() -> anyhow::Result<()> {
use std::{
fs,
path::Path,
};
const LOG_STREAM_SCHEMA_FILE: &str = "../../npm-packages/convex/log-stream-openapi.json";
// Generate OpenAPI spec using utoipa
let openapi_spec = LogStreamApiDoc::openapi();
let current_schema = openapi_spec.to_pretty_json()?;
// Check if file exists and compare
if Path::new(LOG_STREAM_SCHEMA_FILE).exists() {
let existing_schema = fs::read_to_string(LOG_STREAM_SCHEMA_FILE)?;
if existing_schema.trim() != current_schema.trim() {
// Write updated schema
fs::write(LOG_STREAM_SCHEMA_FILE, ¤t_schema)?;
panic!(
"{LOG_STREAM_SCHEMA_FILE} does not match current schema. This test \
automatically updated the file so you can run again: `cargo test -p common \
test_log_stream_schema_matches`"
);
}
} else {
// Create directory if it doesn't exist
if let Some(parent) = Path::new(LOG_STREAM_SCHEMA_FILE).parent() {
fs::create_dir_all(parent)?;
}
fs::write(LOG_STREAM_SCHEMA_FILE, ¤t_schema)?;
panic!(
"Created new {LOG_STREAM_SCHEMA_FILE}. Run the test again to verify: `cargo test \
-p common test_log_stream_schema_matches`"
);
}
Ok(())
}
}