Skip to main content
Glama
serial_dvu_task.rs6.37 kB
use std::{ result, sync::Arc, }; use dal::ChangeSetError; use pinga_client::{ PingaClient, api_types::job_execution_response::JobExecutionResultVCurrent, }; use si_events::{ ChangeSetId, WorkspacePk, }; use telemetry::prelude::*; use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use crate::{ ServerMetadata, Shutdown, }; #[remain::sorted] #[derive(Debug, Error)] pub(crate) enum SerialDvuTaskError { #[error("change set error: {0}")] ChangeSet(#[from] ChangeSetError), /// Error when using a Pinga client #[error("pinga client error: {0}")] PingaClient(#[from] pinga_client::ClientError), /// When failing to do an operation using the [`WorkspaceSnapshot`] #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), } type Result<T> = result::Result<T, SerialDvuTaskError>; pub(crate) struct SerialDvuTask { metadata: Arc<ServerMetadata>, workspace_id: WorkspacePk, change_set_id: ChangeSetId, pinga: PingaClient, run_notify: Arc<Notify>, quiesced_notify: Arc<Notify>, quiesced_token: CancellationToken, token: CancellationToken, } impl SerialDvuTask { const NAME: &'static str = "rebaser_server::serial_dvu_task"; #[allow(clippy::too_many_arguments)] pub(crate) fn create( metadata: Arc<ServerMetadata>, workspace_id: WorkspacePk, change_set_id: ChangeSetId, pinga: PingaClient, run_notify: Arc<Notify>, quiesced_notify: Arc<Notify>, quiesced_token: CancellationToken, token: CancellationToken, ) -> Self { Self { metadata, workspace_id, change_set_id, pinga, run_notify, quiesced_notify, quiesced_token, token, } } pub(crate) async fn try_run(self) -> Result<Shutdown> { metric!(counter.serial_dvu_task.change_set_in_progress = 1); let shutdown_cause = loop { tokio::select! { biased; // Signal to run a DVU has fired _ = self.run_notify.notified() => { info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "notified, preparing dvu run", ); self.run_dvu().await; } // Signal to shutdown from a quiet period has fired _ = self.quiesced_notify.notified() => { info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "quiesced notified, starting to shut down", ); // Fire the quiesced_token so that the processing task immediately stops // processing additional requests self.quiesced_token.cancel(); break Shutdown::Quiesced; } // Cancellation token has fired, time to shut down _ = self.token.cancelled() => { info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "received cancellation, shutting down", ); break Shutdown::Graceful; } } }; info!( task = Self::NAME, cause = ?shutdown_cause, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "shutdown complete", ); metric!(counter.serial_dvu_task.change_set_in_progress = -1); Ok(shutdown_cause) } #[instrument( name = "serial_dvu_task.run_dvu", level = "info", skip_all, fields( service.instance.id = self.metadata.instance_id(), si.change_set.id = %self.change_set_id, si.workspace.id = %self.workspace_id, ), )] async fn run_dvu(&self) { metric!(counter.serial_dvu_task.dvu_running = 1); if let Err(err) = self.try_run_dvu().await { error!( si.error.message = ?err, "error encountered when waiting on dvu job from pinga; continuing", ); } metric!(counter.serial_dvu_task.dvu_running = -1); } #[inline] async fn try_run_dvu(&self) -> Result<()> { let (_request_id, response_fut) = self .pinga .await_dependent_values_update_job(self.workspace_id, self.change_set_id, false) .await?; // TODO(fnichol): here's another spot to consider a timeout, otherwise this task will wait // indefinitely for the Pinga DVU job to complete. // // Future work may change this response future idea an instead check the job state from a // KV store which is being actively updated by a Pinga task. Something like that to avoid // the sender terminating and this receiver never getting a reply either way. match response_fut.await { Ok(response) => match &response.result { JobExecutionResultVCurrent::Ok => { trace!("pinga job reported a successful job execution"); } JobExecutionResultVCurrent::Err { message } => { warn!( job_execution_error_report = message, "pinga job reported an error during execution; continuing", ); } }, Err(client_error) => return Err(client_error.into()), } Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server