Skip to main content
Glama
debug_func.rs6.76 kB
use std::time::Duration; use async_trait::async_trait; use pinga_core::api_types::job_execution_request::JobArgsVCurrent; use si_id::{ ChangeSetId, DebugFuncJobStateId, FuncRunId, WorkspacePk, }; use telemetry::prelude::*; use crate::{ DalContext, Func, func::{ debug::{ DebugFuncError, DebugFuncJobState, DebugFuncJobStateRow, DebugFuncResult, prepare_debug_func_args, }, runner::{ FuncRunner, FuncRunnerValueChannel, }, }, job::consumer::{ DalJob, JobCompletionState, JobConsumer, JobConsumerResult, }, workspace_snapshot::DependentValueRoot, }; const WAIT_MS: u64 = 50; const MAX_WAITS: u64 = 10_000; #[derive(Clone, Debug)] pub struct DebugFuncJob { workspace_id: WorkspacePk, change_set_id: ChangeSetId, job_state_id: DebugFuncJobStateId, } impl DebugFuncJob { pub fn new( workspace_id: WorkspacePk, change_set_id: ChangeSetId, job_state_id: DebugFuncJobStateId, ) -> Box<Self> { Self { workspace_id, change_set_id, job_state_id, } .into() } async fn attempt_dispatch( &self, ctx: &DalContext, job_state_row: &DebugFuncJobStateRow, ) -> DebugFuncResult<(FuncRunId, FuncRunnerValueChannel)> { let args = prepare_debug_func_args( ctx, job_state_row.component_id, job_state_row.debug_input.as_ref(), ) .await?; let func = Func::new_debug( &job_state_row.func_name, &job_state_row.code, &job_state_row.handler, ); let (func_run_id, result_channel) = FuncRunner::run_debug(ctx, func, job_state_row.component_id, args) .await .map_err(Box::new)?; Ok((func_run_id, result_channel)) } async fn fail_run(&self, ctx: &DalContext, func_run_id: Option<FuncRunId>, error: String) { if let Err(err) = DebugFuncJobStateRow::set_failed(ctx, self.job_state_id, func_run_id, error).await { error!( error=%err, err=%err, "Failed to set job state to failed for func run job state id: {}", self.job_state_id ); } } pub async fn run_debug_func(&self, ctx: &DalContext) -> DebugFuncResult<JobCompletionState> { let job_state_row = DebugFuncJobStateRow::get_by_id(ctx, self.job_state_id).await?; if job_state_row.state != DebugFuncJobState::Pending { return Err(DebugFuncError::FuncAlreadyRunning(self.job_state_id)); } let mut ctx_clone = ctx.clone(); match self.spin_until_ready(&mut ctx_clone, MAX_WAITS).await { Ok(true) => {} Ok(false) => { self.fail_run( ctx, None, "Waited too long for dependent functions to finish".to_string(), ) .await; } Err(err) => { self.fail_run(ctx, None, err.to_string()).await; } } let ctx = &ctx_clone; let (func_run_id, result_channel) = match self.attempt_dispatch(ctx, &job_state_row).await { Ok((func_run_id, result_channel)) => (func_run_id, result_channel), Err(err) => { error!(error=%err, err=%err, "failed to dispatch debug function job state id: {}", self.job_state_id); self.fail_run(ctx, None, err.to_string()).await; return Err(err); } }; if let Err(err) = DebugFuncJobStateRow::set_running(ctx, self.job_state_id, func_run_id).await { error!(error=%err, err=%err, "failed to set debug function job state to running with id: {}", self.job_state_id); self.fail_run(ctx, Some(func_run_id), err.to_string()).await; return Err(err); } match result_channel.await { Ok(value_result) => match value_result { Ok(mut value) => { let result = value.take_value(); if let Err(err) = DebugFuncJobStateRow::set_success( ctx, self.job_state_id, func_run_id, result, ) .await { error!(error=%err, err=%err, "failed to set debug function job state to success with id: {}", self.job_state_id); self.fail_run(ctx, Some(func_run_id), err.to_string()).await; Err(err) } else { Ok(JobCompletionState::Done) } } Err(err) => { error!(error=%err, err=%err, "failed to execute debug function with id: {}", self.job_state_id); self.fail_run(ctx, Some(func_run_id), err.to_string()).await; Err(Box::new(err))? } }, Err(err) => { error!(error=%err, err=%err, "failed to receive result from debug function job state id: {}", self.job_state_id); self.fail_run(ctx, Some(func_run_id), err.to_string()).await; Err(err)? } } } async fn spin_until_ready(&self, ctx: &mut DalContext, max: u64) -> DebugFuncResult<bool> { let mut count = 0; loop { if count >= max { return Ok(false); } ctx.update_snapshot_to_visibility().await?; if !DependentValueRoot::roots_exist(ctx) .await .map_err(Box::new)? { break; } tokio::time::sleep(Duration::from_millis(WAIT_MS)).await; count += 1; } Ok(true) } } impl DalJob for DebugFuncJob { fn args(&self) -> JobArgsVCurrent { JobArgsVCurrent::DebugFunc { debug_func_job_state_id: self.job_state_id, } } fn workspace_id(&self) -> WorkspacePk { self.workspace_id } fn change_set_id(&self) -> ChangeSetId { self.change_set_id } } #[async_trait] impl JobConsumer for DebugFuncJob { #[instrument(name = "debug_func.run", level = "info", skip_all)] async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState> { Ok(self.run_debug_func(ctx).await.map_err(Box::new)?) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server