use std::{
collections::HashMap,
path::PathBuf,
sync::Arc,
};
use chrono::Utc;
use serde::{
Deserialize,
Serialize,
};
use si_data_nats::NatsClient;
use si_data_pg::PgPool;
use telemetry::prelude::*;
use telemetry_utils::{
histogram,
metric,
monotonic,
};
use tokio::{
join,
sync::{
mpsc::{
self,
},
oneshot,
},
};
use tokio_util::{
sync::CancellationToken,
task::TaskTracker,
};
use ulid::Ulid;
use crate::{
BackendType,
db::{
cas,
change_batch,
encrypted_secret,
func_run::{
self,
FuncRunDb,
},
func_run_log,
rebase_batch,
split_snapshot_rebase_batch,
split_snapshot_subgraph,
split_snapshot_supergraph,
workspace_snapshot,
},
error::{
LayerDbError,
LayerDbResult,
},
event::{
LayeredEvent,
LayeredEventClient,
LayeredEventKind,
},
nats::layerdb_events_stream,
pg::PgLayer,
s3::S3Layer,
};
/// Controls which storage backend(s) are used for reads and writes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum PersisterMode {
/// Write and read from PostgreSQL only.
#[default]
PostgresOnly,
/// Write to both PostgreSQL and S3, read from PostgreSQL.
DualWrite,
/// Write to both PostgreSQL and S3, read from S3 with PostgreSQL fallback.
S3Primary,
/// Write and read from S3 only.
S3Only,
}
#[derive(Debug)]
pub enum PersistMessage {
Write((LayeredEvent, PersisterStatusWriter)),
Evict((LayeredEvent, PersisterStatusWriter)),
EvictMemoryOnly((LayeredEvent, PersisterStatusWriter)),
}
#[derive(Debug)]
pub enum PersistStatus {
Finished,
Error(LayerDbError),
}
#[derive(Debug)]
pub struct PersisterStatusWriter {
tx: oneshot::Sender<PersistStatus>,
}
impl PersisterStatusWriter {
pub fn new(tx: oneshot::Sender<PersistStatus>) -> Self {
Self { tx }
}
pub fn send(self, msg: PersistStatus) {
// If the other end isn't listening, we really don't care!
let _ = self.tx.send(msg);
}
}
#[derive(Debug)]
pub struct PersisterStatusReader {
rx: oneshot::Receiver<PersistStatus>,
}
impl PersisterStatusReader {
pub fn new(rx: oneshot::Receiver<PersistStatus>) -> Self {
Self { rx }
}
pub async fn get_status(self) -> LayerDbResult<PersistStatus> {
Ok(self.rx.await?)
}
}
#[derive(Debug, Clone)]
pub struct PersisterClient {
tx: mpsc::UnboundedSender<PersistMessage>,
}
impl PersisterClient {
pub fn new(tx: mpsc::UnboundedSender<PersistMessage>) -> PersisterClient {
PersisterClient { tx }
}
fn get_status_channels(&self) -> (PersisterStatusWriter, PersisterStatusReader) {
let (status_tx, status_rx) = oneshot::channel();
(
PersisterStatusWriter::new(status_tx),
PersisterStatusReader::new(status_rx),
)
}
pub fn write_event(&self, event: LayeredEvent) -> LayerDbResult<PersisterStatusReader> {
let (status_write, status_read) = self.get_status_channels();
self.tx
.send(PersistMessage::Write((event, status_write)))
.map_err(Box::new)?;
Ok(status_read)
}
pub fn evict_event(&self, event: LayeredEvent) -> LayerDbResult<PersisterStatusReader> {
let (status_write, status_read) = self.get_status_channels();
self.tx
.send(PersistMessage::Evict((event, status_write)))
.map_err(Box::new)?;
Ok(status_read)
}
pub fn evict_memory_only_event(
&self,
event: LayeredEvent,
) -> LayerDbResult<PersisterStatusReader> {
let (status_write, status_read) = self.get_status_channels();
self.tx
.send(PersistMessage::EvictMemoryOnly((event, status_write)))
.map_err(Box::new)?;
Ok(status_read)
}
}
#[derive(Debug)]
pub struct PersisterTask {
messages: mpsc::UnboundedReceiver<PersistMessage>,
pg_pool: PgPool,
layered_event_client: LayeredEventClient,
tracker: TaskTracker,
shutdown_token: CancellationToken,
retry_queue_command_tx: mpsc::UnboundedSender<crate::retry_queue::RetryQueueMessage>,
pending_retry_rx:
mpsc::UnboundedReceiver<(crate::event::LayeredEvent, crate::retry_queue::RetryHandle)>,
s3_layers: Option<Arc<HashMap<&'static str, S3Layer>>>,
mode: PersisterMode,
}
impl PersisterTask {
const NAME: &'static str = "LayerDB::PersisterTask";
async fn do_persist_event(
event: &LayeredEvent,
mode: PersisterMode,
pg_pool: &PgPool,
s3_layers: &Option<Arc<HashMap<&'static str, S3Layer>>>,
layered_event_client: &LayeredEventClient,
retry_queue_command_tx: &mpsc::UnboundedSender<crate::retry_queue::RetryQueueMessage>,
) -> LayerDbResult<()> {
let cache_name = event.payload.db_name.as_ref();
match mode {
PersisterMode::PostgresOnly => {
// Write only to PG
if let Err(e) =
Self::do_write_to_backend(event, BackendType::Postgres, pg_pool, s3_layers)
.await
{
if Self::do_is_retryable(&e, BackendType::Postgres) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::Postgres,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
}
return Err(e);
}
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::Postgres.as_ref()
);
}
PersisterMode::DualWrite => {
// Write to both backends in parallel
let (pg_result, s3_result) = tokio::join!(
Self::do_write_to_backend(event, BackendType::Postgres, pg_pool, s3_layers),
Self::do_write_to_backend(event, BackendType::S3, pg_pool, s3_layers)
);
// Handle PG result
if let Err(e) = pg_result {
error!(
error = ?e,
backend = "postgres",
cache_name = cache_name,
"dual write failed for postgres backend"
);
if Self::do_is_retryable(&e, BackendType::Postgres) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::Postgres,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
monotonic!(
layer_cache_persister_write_failed_retryable = 1,
cache_name = cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
}
} else {
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::Postgres.as_ref()
);
}
// Handle S3 result
if let Err(e) = s3_result {
// Extract error_kind for structured logging/metrics
let (error_kind, error_key) = match &e {
LayerDbError::S3(s3_err) => (s3_err.kind(), s3_err.key()),
_ => ("unknown", ""),
};
error!(
error = ?e,
backend = "s3",
cache_name = cache_name,
error_kind = error_kind,
key = error_key, // Only in logs, not metrics
"dual write failed for s3 backend"
);
if Self::do_is_retryable(&e, BackendType::S3) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::S3,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
monotonic!(
layer_cache_persister_write_failed_retryable = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref(),
event_kind = event.event_kind.as_ref(),
error_kind = error_kind // New: bounded cardinality (6 values)
);
}
} else {
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref()
);
}
}
PersisterMode::S3Primary => {
// Write to both backends in parallel (same as DualWrite)
// Read preference is S3 first, but we keep PG updated for fallback
let (pg_result, s3_result) = tokio::join!(
Self::do_write_to_backend(event, BackendType::Postgres, pg_pool, s3_layers),
Self::do_write_to_backend(event, BackendType::S3, pg_pool, s3_layers)
);
// Handle PG result
if let Err(e) = pg_result {
if Self::do_is_retryable(&e, BackendType::Postgres) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::Postgres,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
}
} else {
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::Postgres.as_ref()
);
}
// Handle S3 result
if let Err(e) = s3_result {
// Extract error_kind for structured logging/metrics
let (error_kind, error_key) = match &e {
LayerDbError::S3(s3_err) => (s3_err.kind(), s3_err.key()),
_ => ("unknown", ""),
};
error!(
error = ?e,
backend = "s3",
cache_name = cache_name,
error_kind = error_kind,
key = error_key,
"s3 primary write failed for s3 backend"
);
if Self::do_is_retryable(&e, BackendType::S3) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::S3,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
monotonic!(
layer_cache_persister_write_failed_retryable = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref(),
event_kind = event.event_kind.as_ref(),
error_kind = error_kind
);
}
} else {
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref()
);
}
}
PersisterMode::S3Only => {
// Write only to S3
if let Err(e) =
Self::do_write_to_backend(event, BackendType::S3, pg_pool, s3_layers).await
{
// Extract error_kind for structured logging/metrics
let (error_kind, error_key) = match &e {
LayerDbError::S3(s3_err) => (s3_err.kind(), s3_err.key()),
_ => ("unknown", ""),
};
error!(
error = ?e,
backend = "s3",
cache_name = cache_name,
error_kind = error_kind,
key = error_key,
"s3 only write failed"
);
if Self::do_is_retryable(&e, BackendType::S3) {
retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event: event.clone(),
backend: BackendType::S3,
})
.map_err(|e| LayerDbError::RetryQueueSend(e.to_string()))?;
monotonic!(
layer_cache_persister_write_failed_retryable = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref(),
event_kind = event.event_kind.as_ref(),
error_kind = error_kind
);
}
return Err(e);
}
monotonic!(
layer_cache_persister_write_success = 1,
cache_name = cache_name,
backend = BackendType::S3.as_ref()
);
}
}
// Publish to NATS regardless of backend
let _ = layered_event_client
.publish(Arc::new(event.clone()))
.await?
.await?;
Ok(())
}
fn do_is_retryable(error: &LayerDbError, backend: BackendType) -> bool {
match backend {
BackendType::Postgres => crate::retry_queue::is_retryable_error(error),
BackendType::S3 => {
// S3 errors are generally retryable (network, throttling, etc)
matches!(error, LayerDbError::S3(_))
}
}
}
async fn do_write_to_backend(
event: &LayeredEvent,
backend: BackendType,
pg_pool: &PgPool,
s3_layers: &Option<Arc<HashMap<&'static str, S3Layer>>>,
) -> LayerDbResult<()> {
let cache_name = event.payload.db_name.as_ref();
let event_kind = event.event_kind.as_ref();
let write_start = std::time::Instant::now();
let result = match backend {
BackendType::Postgres => {
let pg_layer = PgLayer::new(pg_pool.clone(), event.payload.db_name.as_ref());
match event.event_kind {
LayeredEventKind::CasInsertion
| LayeredEventKind::ChangeBatchEvict
| LayeredEventKind::ChangeBatchWrite
| LayeredEventKind::EncryptedSecretInsertion
| LayeredEventKind::Raw
| LayeredEventKind::RebaseBatchEvict
| LayeredEventKind::RebaseBatchWrite
| LayeredEventKind::SnapshotEvict
| LayeredEventKind::SnapshotWrite
| LayeredEventKind::SplitSnapshotSubGraphEvict
| LayeredEventKind::SplitSnapshotSubGraphWrite
| LayeredEventKind::SplitSnapshotSuperGraphEvict
| LayeredEventKind::SplitSnapshotSuperGraphWrite
| LayeredEventKind::SplitRebaseBatchEvict
| LayeredEventKind::SplitRebaseBatchWrite => {
pg_layer
.insert(
&event.payload.key,
event.payload.sort_key.as_ref(),
&event.payload.value[..],
)
.await
}
LayeredEventKind::FuncRunLogWrite => {
// Skip doing the write here - we don't need it. - we do it in the FunRunLog
// write method directly, to ensure we write to PG in order.
Ok(())
}
LayeredEventKind::FuncRunWrite => {
FuncRunDb::insert_to_pg(&pg_layer, &event.payload).await
}
}
}
BackendType::S3 => {
let s3_layers = s3_layers.as_ref().ok_or(LayerDbError::S3NotConfigured)?;
let s3_layer = s3_layers
.get(event.payload.db_name.as_str())
.ok_or(LayerDbError::S3NotConfigured)?;
s3_layer.insert(event)
}
};
let write_duration = write_start.elapsed().as_millis() as f64;
let status = if result.is_ok() { "success" } else { "error" };
histogram!(
layer_cache_persister.write_duration_ms = write_duration,
cache_name = cache_name,
status = status,
backend = backend.as_ref(),
event_kind = event_kind
);
result
}
#[allow(clippy::too_many_arguments)]
pub async fn create(
messages: mpsc::UnboundedReceiver<PersistMessage>,
pg_pool: PgPool,
nats_client: &NatsClient,
instance_id: Ulid,
retry_queue_base_path: PathBuf,
shutdown_token: CancellationToken,
s3_layers: Option<Arc<HashMap<&'static str, S3Layer>>>,
mode: PersisterMode,
) -> LayerDbResult<Self> {
use crate::retry_queue::{
RetryQueueConfig,
RetryQueueManager,
};
let tracker = TaskTracker::new();
let context = si_data_nats::jetstream::new(nats_client.clone());
// Ensure the Jetstream is created
let _stream =
layerdb_events_stream(&context, nats_client.metadata().subject_prefix()).await?;
let layered_event_client = LayeredEventClient::new(
nats_client
.metadata()
.subject_prefix()
.map(|s| s.to_owned()),
instance_id,
context.clone(),
);
// Create channels for RetryQueueManager communication
let (retry_queue_command_tx, retry_queue_command_rx) = mpsc::unbounded_channel();
let (pending_retry_tx, pending_retry_rx) = mpsc::unbounded_channel();
// Initialize retry queue manager with provided path
let retry_queue_config = RetryQueueConfig {
base_path: retry_queue_base_path,
..Default::default()
};
let mut retry_queue_manager = RetryQueueManager::new(retry_queue_config);
// Scan for existing retry queues on startup
const CACHE_NAMES: &[&str] = &[
cas::CACHE_NAME,
change_batch::CACHE_NAME,
encrypted_secret::CACHE_NAME,
func_run::CACHE_NAME,
func_run_log::CACHE_NAME,
rebase_batch::CACHE_NAME,
workspace_snapshot::CACHE_NAME,
split_snapshot_subgraph::CACHE_NAME,
split_snapshot_supergraph::CACHE_NAME,
split_snapshot_rebase_batch::CACHE_NAME,
];
retry_queue_manager
.scan_existing_queues(CACHE_NAMES)
.await?;
// Spawn RetryQueueManager as independent task
tracker.spawn(retry_queue_manager.run(
retry_queue_command_rx,
pending_retry_tx,
shutdown_token.clone(),
));
Ok(Self {
messages,
pg_pool,
layered_event_client,
tracker,
shutdown_token,
retry_queue_command_tx,
pending_retry_rx,
s3_layers,
mode,
})
}
pub async fn run(mut self) {
let shutdown_token = self.shutdown_token.clone();
loop {
tokio::select! {
biased;
// Priority 1: Shutdown signal (highest)
_ = shutdown_token.cancelled() => {
debug!(task = Self::NAME, "received cancellation");
// Close receiver channel to ensure no further values can be received
self.messages.close();
break;
}
// Priority 2: New messages from channel
Some(msg) = self.messages.recv() => {
self.spawn_persist_task(msg);
}
// Priority 3: Ready retries from RetryQueueManager
Some((event, handle)) = self.pending_retry_rx.recv() => {
self.spawn_retry_task(event, handle);
}
}
}
// Drain remaining messages but don't process retry queue during shutdown
while let Some(msg) = self.messages.recv().await {
self.spawn_persist_task(msg);
}
// All remaining work has been dispatched (i.e. spawned) so no more tasks will be spawned
self.tracker.close();
// Wait for all in-flight writes work to complete
self.tracker.wait().await;
debug!(task = Self::NAME, "shutdown complete");
}
fn spawn_persist_task(&mut self, msg: PersistMessage) {
match msg {
PersistMessage::Write((event, status_tx)) => {
let layered_event_client = self.layered_event_client.clone();
let retry_queue_command_tx = self.retry_queue_command_tx.clone();
let cache_name = event.payload.db_name.to_string();
let mode = self.mode;
let pg_pool = self.pg_pool.clone();
let s3_layers = self.s3_layers.clone();
let backend = match self.mode {
PersisterMode::PostgresOnly => BackendType::Postgres,
PersisterMode::DualWrite => BackendType::Postgres, // Primary is PG
PersisterMode::S3Primary | PersisterMode::S3Only => BackendType::S3,
};
metric!(
counter.layer_cache_persister_write_attempted = 1,
cache_name = &cache_name,
backend = backend.as_ref(),
event_kind = event.event_kind.as_ref()
);
self.tracker.spawn(async move {
let result = Self::do_persist_event(
&event,
mode,
&pg_pool,
&s3_layers,
&layered_event_client,
&retry_queue_command_tx,
)
.await;
match result {
Ok(_) => {
// Emit end-to-end persistence latency
let latency = Utc::now()
.signed_duration_since(event.metadata.timestamp)
.to_std()
.unwrap_or_default();
metric!(
histogram.layer_cache_persistence_latency_seconds =
latency.as_secs_f64(),
cache_name = &cache_name,
operation = "write",
event_kind = event.event_kind.as_ref()
);
status_tx.send(PersistStatus::Finished)
}
Err(err) => status_tx.send(PersistStatus::Error(err)),
}
});
}
PersistMessage::Evict((event, status_tx)) => {
let task =
PersistEventTask::new(self.pg_pool.clone(), self.layered_event_client.clone());
let retry_queue_command_tx = self.retry_queue_command_tx.clone();
let cache_name = event.payload.db_name.to_string();
monotonic!(
layer_cache_persister_evict_attempted = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
self.tracker.spawn(async move {
match task.try_evict_layers(event.clone()).await {
Ok(_) => {
monotonic!(
layer_cache_persister_evict_success = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
// Emit end-to-end persistence latency
let latency = Utc::now()
.signed_duration_since(event.metadata.timestamp)
.to_std()
.unwrap_or_default();
metric!(
histogram.layer_cache_persistence_latency_seconds = latency.as_secs_f64(),
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
operation = "evict",
event_kind = event.event_kind.as_ref()
);
status_tx.send(PersistStatus::Finished)
}
Err(err) => {
// Check if error is retryable and enqueue if so
if crate::retry_queue::is_retryable_error(&err) {
monotonic!(
layer_cache_persister_evict_failed_retryable = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
let _ = retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event,
backend: BackendType::Postgres,
});
} else {
monotonic!(
layer_cache_persister_evict_failed_permanent = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
error!(error = ?err, "persister evict task failed with non-retryable error");
}
status_tx.send(PersistStatus::Error(err));
}
}
});
}
PersistMessage::EvictMemoryOnly((event, status_tx)) => {
let task =
PersistEventTask::new(self.pg_pool.clone(), self.layered_event_client.clone());
let retry_queue_command_tx = self.retry_queue_command_tx.clone();
let cache_name = event.payload.db_name.to_string();
monotonic!(
layer_cache_persister_evict_memory_only_attempted = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
self.tracker.spawn(async move {
match task.try_evict_memory_only(event.clone()).await {
Ok(_) => {
monotonic!(
layer_cache_persister_evict_memory_only_success = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
// Emit end-to-end persistence latency
let latency = Utc::now()
.signed_duration_since(event.metadata.timestamp)
.to_std()
.unwrap_or_default();
metric!(
histogram.layer_cache_persistence_latency_seconds = latency.as_secs_f64(),
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
operation = "evict_memory_only",
event_kind = event.event_kind.as_ref()
);
status_tx.send(PersistStatus::Finished)
}
Err(err) => {
// Check if error is retryable and enqueue if so
if crate::retry_queue::is_retryable_error(&err) {
monotonic!(
layer_cache_persister_evict_memory_only_failed_retryable = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
let _ = retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::Enqueue {
event,
backend: BackendType::Postgres,
});
} else {
monotonic!(
layer_cache_persister_evict_memory_only_failed_permanent = 1,
cache_name = &cache_name,
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
error!(error = ?err, "persister evict memory only task failed with non-retryable error");
}
status_tx.send(PersistStatus::Error(err));
}
}
});
}
}
}
fn spawn_retry_task(&mut self, event: LayeredEvent, handle: crate::retry_queue::RetryHandle) {
let pg_pool = self.pg_pool.clone();
let s3_layers = self.s3_layers.clone();
let retry_queue_command_tx = self.retry_queue_command_tx.clone();
let cache_name = handle.cache_name.clone();
let backend = handle.backend;
monotonic!(
layer_cache_persister_retry_attempted = 1,
cache_name = &cache_name,
backend = backend.as_ref(),
event_kind = event.event_kind.as_ref()
);
self.tracker.spawn(async move {
let start = std::time::Instant::now();
// Capture event_kind and timestamp before moving the event
let event_kind = event.event_kind;
let event_timestamp = event.metadata.timestamp;
// Attempt the retry using the backend-specific write
let result = Self::do_write_to_backend(&event, backend, &pg_pool, &s3_layers).await;
let duration = start.elapsed().as_secs_f64();
metric!(
histogram.layer_cache_persister_retry_duration_seconds = duration,
cache_name = &cache_name,
backend = backend.as_ref(),
event_kind = event_kind.as_ref()
);
match result {
Ok(_) => {
// Emit end-to-end persistence latency including retry time
let latency = Utc::now()
.signed_duration_since(event_timestamp)
.to_std()
.unwrap_or_default();
metric!(
histogram.layer_cache_persistence_latency_seconds = latency.as_secs_f64(),
cache_name = &cache_name,
backend = backend.as_ref(),
operation = "retry",
event_kind = event_kind.as_ref()
);
// Success - tell manager to remove from queue
let _ = retry_queue_command_tx
.send(crate::retry_queue::RetryQueueMessage::MarkSuccess(handle));
}
Err(err) if Self::do_is_retryable(&err, backend) => {
// Retryable failure - update backoff
let _ = retry_queue_command_tx.send(
crate::retry_queue::RetryQueueMessage::MarkRetryableFailure(handle, err),
);
}
Err(err) => {
// Permanent failure - remove from queue
error!(
cache.name = %cache_name,
backend = ?backend,
error = ?err,
"retry failed with non-retryable error"
);
let _ = retry_queue_command_tx.send(
crate::retry_queue::RetryQueueMessage::MarkPermanentFailure(handle),
);
}
}
});
}
}
#[derive(Debug, Clone)]
pub enum PersisterTaskErrorKind {
Write,
Evict,
}
#[derive(Debug, Clone)]
pub struct PersisterTaskError {
pub kind: PersisterTaskErrorKind,
pub pg_error: Option<String>,
pub nats_error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PersistEventTask {
pg_pool: PgPool,
layered_event_client: LayeredEventClient,
}
impl PersistEventTask {
pub fn new(pg_pool: PgPool, layered_event_client: LayeredEventClient) -> Self {
PersistEventTask {
pg_pool,
layered_event_client,
}
}
#[instrument(level = "debug", skip_all)]
pub async fn evict_layers(self, event: LayeredEvent, status_tx: PersisterStatusWriter) {
match self.try_evict_layers(event).await {
Ok(_) => status_tx.send(PersistStatus::Finished),
Err(err) => {
error!(error = ?err, "persister evict task failed");
status_tx.send(PersistStatus::Error(err));
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn try_evict_layers(&self, event: LayeredEvent) -> LayerDbResult<()> {
let start = std::time::Instant::now();
let cache_name = event.payload.db_name.to_string();
let event = Arc::new(event);
// Write the eviction to nats
let nats_join = self.layered_event_client.publish(event.clone()).await?;
// Evict from to pg
let pg_self = self.clone();
let pg_event = event.clone();
let pg_join = tokio::task::spawn(async move { pg_self.evict_from_pg(pg_event).await });
let result = match join![pg_join, nats_join] {
(Ok(Ok(_)), Ok(Ok(_))) => Ok(()),
(pg_res, nats_res) => {
let kind = PersisterTaskErrorKind::Evict;
let pg_error = match pg_res {
Ok(Err(e)) => Some(e.to_string()),
Err(e) => Some(e.to_string()),
_ => None,
};
let nats_error = match nats_res {
Ok(Err(e)) => Some(e.to_string()),
Err(e) => Some(e.to_string()),
_ => None,
};
// Track which component failed
if pg_error.is_some() && nats_error.is_some() {
info!(
metrics = true,
counter.layer_cache_persister_both_error = 1,
cache_name = &cache_name,
operation = "evict",
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
} else if pg_error.is_some() {
info!(
metrics = true,
counter.layer_cache_persister_pg_error = 1,
cache_name = &cache_name,
operation = "evict",
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
} else if nats_error.is_some() {
info!(
metrics = true,
counter.layer_cache_persister_nats_error = 1,
cache_name = &cache_name,
operation = "evict",
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
}
Err(LayerDbError::PersisterTaskFailed(PersisterTaskError {
kind,
pg_error,
nats_error,
}))
}
};
let duration = start.elapsed().as_secs_f64();
let status = if result.is_ok() { "success" } else { "error" };
let event_kind = event.event_kind.as_ref();
info!(
metrics = true,
histogram.layer_cache_persister_evict_duration_seconds = duration,
cache_name = &cache_name,
status = status,
backend = BackendType::Postgres.as_ref(),
event_kind = event_kind
);
result
}
#[instrument(level = "debug", skip_all)]
pub async fn try_evict_memory_only(&self, event: LayeredEvent) -> LayerDbResult<()> {
let start = std::time::Instant::now();
let cache_name = event.payload.db_name.to_string();
let event = Arc::new(event);
// Only publish to NATS, skip PostgreSQL deletion
let nats_join = self.layered_event_client.publish(event.clone()).await?;
let result = nats_join.await.map_err(|e| {
info!(
metrics = true,
counter.layer_cache_persister_nats_error = 1,
cache_name = &cache_name,
operation = "evict_memory_only",
backend = BackendType::Postgres.as_ref(),
event_kind = event.event_kind.as_ref()
);
LayerDbError::PersisterTaskFailed(PersisterTaskError {
kind: PersisterTaskErrorKind::Evict,
pg_error: None,
nats_error: Some(e.to_string()),
})
})?;
let duration = start.elapsed().as_secs_f64();
let status = if result.is_ok() { "success" } else { "error" };
let event_kind = event.event_kind.as_ref();
info!(
metrics = true,
histogram.layer_cache_persister_evict_memory_only_duration_seconds = duration,
cache_name = &cache_name,
status = status,
backend = BackendType::Postgres.as_ref(),
event_kind = event_kind
);
result
}
#[instrument(level = "debug", skip_all)]
pub async fn evict_from_pg(&self, event: Arc<LayeredEvent>) -> LayerDbResult<()> {
let pg_layer = PgLayer::new(self.pg_pool.clone(), event.payload.db_name.as_ref());
pg_layer.delete(&event.payload.key).await?;
Ok(())
}
// Write an event to the pg layer
#[instrument(level = "debug", skip_all)]
pub async fn write_to_pg(&self, event: Arc<LayeredEvent>) -> LayerDbResult<()> {
let cache_name = event.payload.db_name.to_string();
let event_kind = format!("{:?}", event.event_kind);
info!(
metrics = true,
counter.layer_cache_persister_event_by_kind = 1,
cache_name = &cache_name,
event_kind = &event_kind,
backend = BackendType::Postgres.as_ref()
);
let pg_layer = PgLayer::new(self.pg_pool.clone(), event.payload.db_name.as_ref());
match event.event_kind {
LayeredEventKind::CasInsertion
| LayeredEventKind::ChangeBatchEvict
| LayeredEventKind::ChangeBatchWrite
| LayeredEventKind::EncryptedSecretInsertion
| LayeredEventKind::Raw
| LayeredEventKind::RebaseBatchEvict
| LayeredEventKind::RebaseBatchWrite
| LayeredEventKind::SnapshotEvict
| LayeredEventKind::SnapshotWrite
| LayeredEventKind::SplitSnapshotSubGraphEvict
| LayeredEventKind::SplitSnapshotSubGraphWrite
| LayeredEventKind::SplitSnapshotSuperGraphEvict
| LayeredEventKind::SplitSnapshotSuperGraphWrite
| LayeredEventKind::SplitRebaseBatchEvict
| LayeredEventKind::SplitRebaseBatchWrite => {
pg_layer
.insert(
&event.payload.key,
event.payload.sort_key.as_ref(),
&event.payload.value[..],
)
.await?;
}
LayeredEventKind::FuncRunLogWrite => {
// Skip doing the write here - we don't need it. - we do it in the FunRunLog
// write method directly, to ensure we write to PG in order.
//
// FuncRunLogDb::insert_to_pg(&pg_layer, &event.payload).await?
}
LayeredEventKind::FuncRunWrite => {
FuncRunDb::insert_to_pg(&pg_layer, &event.payload).await?
}
}
Ok(())
}
}