log_lines.rs•25.5 kB
//! Common types representing database identifiers.
use std::{
collections::BTreeMap,
fmt::Display,
ops::{
Deref,
DerefMut,
},
str::FromStr,
};
use futures::future::BoxFuture;
use itertools::Itertools;
#[cfg(any(test, feature = "testing"))]
use proptest::prelude::*;
use serde::{
Deserialize,
Serialize,
};
use serde_json::Value as JsonValue;
pub use sync_types::{
SessionId,
SessionRequestSeqNumber,
Timestamp,
};
use tokio::sync::mpsc;
use value::{
heap_size::{
HeapSize,
WithHeapSize,
},
obj,
remove_boolean,
remove_int64,
remove_nullable_object,
remove_nullable_string,
remove_string,
remove_vec,
remove_vec_of_strings,
ConvexObject,
ConvexValue,
};
use crate::{
components::CanonicalizedComponentFunctionPath,
runtime::UnixTimestamp,
};
pub const TRUNCATED_LINE_SUFFIX: &str = " (truncated due to length)";
pub const MAX_LOG_LINE_LENGTH: usize = 32768;
/// List of log lines from a Convex function execution.
#[derive(Default, Clone, Debug, PartialEq)]
pub struct LogLines(WithHeapSize<Vec<LogLine>>);
pub type RawLogLines = WithHeapSize<Vec<String>>;
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum LogLevel {
Debug,
Error,
Warn,
Info,
Log,
}
impl Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Debug => f.write_str("DEBUG"),
LogLevel::Error => f.write_str("ERROR"),
LogLevel::Warn => f.write_str("WARN"),
LogLevel::Info => f.write_str("INFO"),
LogLevel::Log => f.write_str("LOG"),
}
}
}
impl FromStr for LogLevel {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let v = match s {
"DEBUG" => LogLevel::Debug,
"ERROR" => LogLevel::Error,
"WARN" => LogLevel::Warn,
"INFO" => LogLevel::Info,
"LOG" => LogLevel::Log,
_ => anyhow::bail!("Unknown log level"),
};
Ok(v)
}
}
impl HeapSize for LogLevel {
fn heap_size(&self) -> usize {
0
}
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct SystemLogMetadata {
pub code: String,
}
impl HeapSize for SystemLogMetadata {
fn heap_size(&self) -> usize {
self.code.heap_size()
}
}
impl TryFrom<ConvexObject> for SystemLogMetadata {
type Error = anyhow::Error;
fn try_from(value: ConvexObject) -> Result<Self, Self::Error> {
let mut fields = BTreeMap::from(value);
let code = remove_string(&mut fields, "code")?;
Ok(SystemLogMetadata { code })
}
}
impl TryFrom<SystemLogMetadata> for ConvexValue {
type Error = anyhow::Error;
fn try_from(value: SystemLogMetadata) -> Result<Self, Self::Error> {
Ok(ConvexValue::Object(obj!("code" => value.code)?))
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct LogLineStructured {
// Each of these is a string representation of an argument to `console.log`
// or similar
pub messages: WithHeapSize<Vec<String>>,
pub level: LogLevel,
pub is_truncated: bool,
pub timestamp: UnixTimestamp,
pub system_metadata: Option<SystemLogMetadata>,
}
impl LogLineStructured {
pub fn to_pretty_string(&self) -> String {
let LogLineStructured {
messages,
level,
is_truncated,
timestamp: _timestamp,
system_metadata: _system_metadata,
} = self;
if *is_truncated {
format!("[{level}] {}{TRUNCATED_LINE_SUFFIX}", messages.join(" "))
} else {
format!("[{level}] {}", messages.join(" "))
}
}
pub fn to_json(
self,
sub_function_path: Option<CanonicalizedComponentFunctionPath>,
allow_structured: bool,
include_system_metadata: bool,
) -> anyhow::Result<JsonValue> {
if !allow_structured {
Ok(JsonValue::String(self.to_pretty_string()))
} else {
let LogLineStructured {
messages,
level,
is_truncated,
timestamp,
system_metadata,
} = self;
let system_metadata = if include_system_metadata {
system_metadata
} else {
None
};
let log_line_json = LogLineJson {
messages: messages.into(),
is_truncated,
timestamp: timestamp.as_ms_since_epoch()?,
level: level.to_string(),
system_metadata: system_metadata.map(SystemLogMetadataJson::from),
component_path: sub_function_path.as_ref().map(|p| p.component.to_string()),
udf_path: sub_function_path.map(|p| p.udf_path.to_string()),
};
Ok(serde_json::to_value(log_line_json)?)
}
}
pub fn new_developer_log_line(
level: LogLevel,
messages: Vec<String>,
timestamp: UnixTimestamp,
) -> Self {
// total length of messages joined by a space
let total_length = messages
.iter()
.map(|m| m.len() + 1)
.sum::<usize>()
.saturating_sub(1);
if total_length <= MAX_LOG_LINE_LENGTH {
return Self {
messages: messages.into(),
level,
is_truncated: false,
timestamp,
system_metadata: None,
};
}
let mut total_length = 0;
let mut truncated_messages: Vec<String> = vec![];
for message in messages {
let remaining_space = MAX_LOG_LINE_LENGTH - total_length;
if message.len() <= remaining_space {
total_length += message.len() + 1;
truncated_messages.push(message);
} else {
let last_message =
message[..message.floor_char_boundary(remaining_space)].to_string();
truncated_messages.push(last_message);
break;
}
}
Self {
messages: truncated_messages.into(),
level,
is_truncated: true,
timestamp,
system_metadata: None,
}
}
}
impl Deref for LogLines {
type Target = WithHeapSize<Vec<LogLine>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for LogLines {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<Vec<LogLine>> for LogLines {
fn from(value: Vec<LogLine>) -> Self {
Self(value.into())
}
}
impl IntoIterator for LogLines {
type IntoIter = <Vec<LogLine> as IntoIterator>::IntoIter;
type Item = LogLine;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl FromIterator<LogLine> for LogLines {
fn from_iter<T: IntoIterator<Item = LogLine>>(iter: T) -> Self {
Self(iter.into_iter().collect::<Vec<_>>().into())
}
}
impl LogLines {
pub fn to_jsons(
self,
allow_structured: bool,
include_system_metadata: bool,
) -> anyhow::Result<Vec<JsonValue>> {
self.into_iter()
.map(|log_line| log_line.to_jsons(None, allow_structured, include_system_metadata))
.flatten_ok()
.collect()
}
pub fn len(&self) -> usize {
self.0
.iter()
.map(|log_line| match log_line {
LogLine::Structured(_) => 1,
LogLine::SubFunction { log_lines, .. } => log_lines.len(),
})
.sum()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn truncated(self, mut len: usize) -> Self {
let mut log_lines = Self::default();
for log_line in self {
if len == 0 {
break;
}
match log_line {
LogLine::Structured(line) => {
log_lines.push(LogLine::Structured(line));
len -= 1;
},
LogLine::SubFunction {
path,
log_lines: sub_log_lines,
} => {
let sub_log_lines = sub_log_lines.truncated(len);
len -= sub_log_lines.len();
log_lines.push(LogLine::SubFunction {
path,
log_lines: sub_log_lines,
});
},
}
}
log_lines
}
}
#[cfg(any(test, feature = "testing"))]
impl Arbitrary for LogLines {
type Parameters = ();
type Strategy = impl proptest::strategy::Strategy<Value = LogLines>;
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
prop::collection::vec(any::<LogLine>(), 0..6).prop_map(LogLines::from)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum LogLine {
Structured(LogLineStructured),
SubFunction {
path: CanonicalizedComponentFunctionPath,
log_lines: LogLines,
},
}
#[cfg(any(test, feature = "testing"))]
impl Arbitrary for LogLineStructured {
type Parameters = ();
type Strategy = impl proptest::strategy::Strategy<Value = LogLineStructured>;
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
(
prop::collection::vec(any::<String>(), 1..4),
any::<LogLevel>(),
any::<bool>(),
(u64::MIN..(i64::MAX as u64)),
any::<Option<SystemLogMetadata>>(),
)
.prop_map(
|(messages, level, is_truncated, timestamp_ms, system_metadata)| {
LogLineStructured {
messages: messages.into(),
level,
is_truncated,
timestamp: UnixTimestamp::from_millis(timestamp_ms),
system_metadata,
}
},
)
}
}
#[cfg(any(test, feature = "testing"))]
impl Arbitrary for LogLine {
type Parameters = ();
type Strategy = impl proptest::strategy::Strategy<Value = LogLine>;
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
let leaf = any::<LogLineStructured>().prop_map(LogLine::Structured);
leaf.prop_recursive(3, 8, 2, |inner| {
prop_oneof![
(
any::<CanonicalizedComponentFunctionPath>(),
prop::collection::vec(inner.clone(), 1..4)
)
.prop_map(|(path, log_lines)| LogLine::SubFunction {
path,
log_lines: log_lines.into()
}),
inner
]
})
}
}
impl LogLine {
pub fn to_pretty_strings(self) -> Vec<String> {
match self {
LogLine::Structured(log_line) => vec![log_line.to_pretty_string()],
LogLine::SubFunction { path: _, log_lines } => log_lines
.into_iter()
.flat_map(LogLine::to_pretty_strings)
.collect::<Vec<_>>(),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn to_pretty_string_test_only(self) -> String {
self.to_pretty_strings().join("\n")
}
pub fn to_jsons(
self,
sub_function_path: Option<CanonicalizedComponentFunctionPath>,
allow_structured: bool,
include_system_metadata: bool,
) -> anyhow::Result<Vec<JsonValue>> {
if !allow_structured {
Ok(self
.to_pretty_strings()
.into_iter()
.map(JsonValue::String)
.collect())
} else {
match self {
LogLine::Structured(log_line) => {
let log_line_json = log_line.to_json(
sub_function_path,
allow_structured,
include_system_metadata,
)?;
Ok(vec![log_line_json])
},
LogLine::SubFunction { path, log_lines } => log_lines
.into_iter()
.map(|log_line| {
log_line.to_jsons(
Some(path.clone()),
allow_structured,
include_system_metadata,
)
})
.flatten_ok()
.collect(),
}
}
}
pub fn new_developer_log_line(
level: LogLevel,
messages: Vec<String>,
timestamp: UnixTimestamp,
) -> Self {
LogLine::Structured(LogLineStructured::new_developer_log_line(
level, messages, timestamp,
))
}
pub fn new_system_log_line(
level: LogLevel,
messages: Vec<String>,
timestamp: UnixTimestamp,
system_log_metadata: SystemLogMetadata,
) -> Self {
// Never truncate system log lines
LogLine::Structured(LogLineStructured {
messages: messages.into(),
level,
is_truncated: false,
timestamp,
system_metadata: Some(system_log_metadata),
})
}
}
impl HeapSize for LogLine {
fn heap_size(&self) -> usize {
match self {
LogLine::Structured(LogLineStructured {
messages,
level,
timestamp,
is_truncated,
system_metadata,
}) => {
messages.heap_size()
+ level.heap_size()
+ timestamp.heap_size()
+ is_truncated.heap_size()
+ system_metadata.heap_size()
},
LogLine::SubFunction { path, log_lines } => path.heap_size() + log_lines.heap_size(),
}
}
}
impl TryFrom<ConvexValue> for LogLine {
type Error = anyhow::Error;
fn try_from(value: ConvexValue) -> Result<Self, Self::Error> {
let result = match value {
ConvexValue::Object(o) => {
let mut fields = BTreeMap::from(o);
match remove_nullable_string(&mut fields, "kind")?.as_deref() {
None => {
let messages = remove_vec_of_strings(&mut fields, "messages")?;
let is_truncated = remove_boolean(&mut fields, "is_truncated")?;
let level = remove_string(&mut fields, "level")?;
let timestamp = remove_int64(&mut fields, "timestamp")?;
let system_metadata: Option<SystemLogMetadata> =
remove_nullable_object(&mut fields, "system_metadata")?;
LogLine::Structured(LogLineStructured {
messages: messages.into(),
is_truncated,
level: LogLevel::from_str(&level)?,
timestamp: UnixTimestamp::from_millis(timestamp.try_into()?),
system_metadata,
})
},
Some("SubFunction") => {
let component = remove_string(&mut fields, "component")?;
let udf_path = remove_string(&mut fields, "udf_path")?;
let log_lines = remove_vec(&mut fields, "log_lines")?;
LogLine::SubFunction {
path: CanonicalizedComponentFunctionPath {
component: component.parse()?,
udf_path: udf_path.parse()?,
},
log_lines: log_lines.into_iter().map(Self::try_from).collect::<Result<
LogLines,
_,
>>(
)?,
}
},
_ => anyhow::bail!("unrecognized kind of log line"),
}
},
_ => anyhow::bail!("Invalid value type for log line"),
};
Ok(result)
}
}
impl TryFrom<LogLine> for ConvexValue {
type Error = anyhow::Error;
fn try_from(value: LogLine) -> Result<Self, Self::Error> {
let result = match value {
LogLine::Structured(LogLineStructured {
messages,
level,
is_truncated,
timestamp,
system_metadata,
}) => {
let timestamp_ms: i64 = timestamp.as_ms_since_epoch()?.try_into()?;
let system_metadata_value = match system_metadata {
Some(m) => ConvexValue::try_from(m)?,
None => ConvexValue::Null,
};
ConvexValue::Object(obj!(
"messages" => messages.into_iter().map(ConvexValue::try_from).try_collect::<_, Vec<_>, _>()?,
"level" => level.to_string(),
"is_truncated" => is_truncated,
"timestamp" => timestamp_ms,
"system_metadata" => system_metadata_value,
)?)
},
LogLine::SubFunction { path, log_lines } => ConvexValue::Object(obj!(
"kind" => "SubFunction",
"component" => path.component.to_string(),
"udf_path" => path.udf_path.to_string(),
"log_lines" => log_lines.into_iter().map(ConvexValue::try_from).try_collect::<_, Vec<_>, _>()?,
)?),
};
Ok(result)
}
}
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct LogLineJson {
messages: Vec<String>,
is_truncated: bool,
timestamp: u64,
level: String,
system_metadata: Option<SystemLogMetadataJson>,
#[serde(skip_serializing_if = "Option::is_none")]
component_path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
udf_path: Option<String>,
}
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SystemLogMetadataJson {
code: String,
}
impl From<SystemLogMetadataJson> for SystemLogMetadata {
fn from(value: SystemLogMetadataJson) -> SystemLogMetadata {
SystemLogMetadata { code: value.code }
}
}
impl From<SystemLogMetadata> for SystemLogMetadataJson {
fn from(value: SystemLogMetadata) -> SystemLogMetadataJson {
SystemLogMetadataJson { code: value.code }
}
}
impl TryFrom<JsonValue> for SystemLogMetadata {
type Error = anyhow::Error;
fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
let json_val: SystemLogMetadataJson = serde_json::from_value(value)?;
Ok(SystemLogMetadata::from(json_val))
}
}
impl TryFrom<SystemLogMetadata> for JsonValue {
type Error = anyhow::Error;
fn try_from(value: SystemLogMetadata) -> Result<Self, Self::Error> {
Ok(serde_json::to_value(SystemLogMetadataJson::from(value))?)
}
}
impl TryFrom<JsonValue> for LogLineStructured {
type Error = anyhow::Error;
fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
let log_line_json: LogLineJson = serde_json::from_value(value)?;
// Conversion from JsonValue -> LogLineStructured only happens when
// we know it's not within a SubFunction.
// Contrast with conversion from LogLineStructured -> JsonValue
// which *can* happen within a SubFunction.
anyhow::ensure!(
log_line_json.component_path.is_none() && log_line_json.udf_path.is_none(),
"SubFunction not supported"
);
Ok(LogLineStructured {
messages: log_line_json.messages.into(),
is_truncated: log_line_json.is_truncated,
timestamp: UnixTimestamp::from_millis(log_line_json.timestamp),
level: log_line_json.level.parse()?,
system_metadata: log_line_json.system_metadata.map(SystemLogMetadata::from),
})
}
}
impl TryFrom<LogLineStructured> for JsonValue {
type Error = anyhow::Error;
fn try_from(value: LogLineStructured) -> Result<Self, Self::Error> {
value.to_json(None, true, true)
}
}
impl From<LogLine> for pb::outcome::LogLine {
fn from(value: LogLine) -> Self {
match value {
LogLine::Structured(LogLineStructured {
messages,
level,
is_truncated,
timestamp,
system_metadata,
}) => pb::outcome::LogLine {
log_type: Some(pb::outcome::log_line::LogType::Line(
pb::outcome::StructuredLogLine {
messages: messages.into(),
level: level.to_string(),
is_truncated,
timestamp: Some(timestamp.into()),
system_metadata: system_metadata
.map(|m| pb::outcome::SystemLogMetadata { code: m.code }),
},
)),
},
LogLine::SubFunction { path, log_lines } => pb::outcome::LogLine {
log_type: Some(pb::outcome::log_line::LogType::SubFunction(
pb::outcome::SubFunctionLogLines {
component_path: path.component.to_string(),
udf_path: path.udf_path.to_string(),
log_lines: log_lines
.into_iter()
.map(pb::outcome::LogLine::from)
.collect(),
},
)),
},
}
}
}
impl TryFrom<pb::outcome::LogLine> for LogLine {
type Error = anyhow::Error;
fn try_from(value: pb::outcome::LogLine) -> Result<Self, Self::Error> {
let result = match value.log_type {
Some(pb::outcome::log_line::LogType::Line(line)) => {
LogLine::Structured(LogLineStructured {
messages: line.messages.into(),
is_truncated: line.is_truncated,
level: line.level.parse()?,
timestamp: line
.timestamp
.ok_or_else(|| anyhow::anyhow!("Missing timestamp"))?
.try_into()?,
system_metadata: line
.system_metadata
.map(|m| SystemLogMetadata { code: m.code }),
})
},
Some(pb::outcome::log_line::LogType::SubFunction(sub_function)) => {
LogLine::SubFunction {
path: CanonicalizedComponentFunctionPath {
component: sub_function.component_path.parse()?,
udf_path: sub_function.udf_path.parse()?,
},
log_lines: sub_function
.log_lines
.into_iter()
.map(LogLine::try_from)
.collect::<Result<LogLines, _>>()?,
}
},
None => anyhow::bail!("`log_type` missing in LogLine proto"),
};
Ok(result)
}
}
pub async fn run_function_and_collect_log_lines<Outcome>(
get_outcome: BoxFuture<'_, Outcome>,
mut log_line_receiver: mpsc::UnboundedReceiver<LogLine>,
on_log_line: impl Fn(LogLine),
) -> (Outcome, LogLines) {
let log_line_consumer = async move {
let mut full_log_lines = vec![];
while let Some(log_line) = log_line_receiver.recv().await {
on_log_line(log_line.clone());
full_log_lines.push(log_line)
}
LogLines::from(full_log_lines)
};
tokio::join!(get_outcome, log_line_consumer)
}
#[cfg(test)]
mod tests {
use cmd_util::env::env_config;
use proptest::prelude::*;
use serde_json::Value as JsonValue;
use value::{
testing::assert_roundtrips,
ConvexValue,
};
use crate::{
log_lines::{
LogLine,
LogLineStructured,
},
runtime::UnixTimestamp,
};
proptest! {
#![proptest_config(
ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() }
)]
#[test]
fn test_structured_round_trips(log_line in any::<LogLine>()) {
assert_roundtrips::<LogLine, ConvexValue>(log_line);
}
#[test]
fn test_json_round_trips(log_line in any::<LogLineStructured>()) {
assert_roundtrips::<LogLineStructured, JsonValue>(log_line);
}
}
#[test]
pub fn empty_log_line() {
// This used to panic due to a underflow bug when calculating message length.
LogLineStructured::new_developer_log_line(
super::LogLevel::Info,
vec![],
UnixTimestamp::from_secs_f64(1733952824.).unwrap(),
);
}
}