Skip to main content
Glama
management.rs9.11 kB
use axum::{ Json, Router, extract::Path, http::StatusCode, response::{ IntoResponse, Response, }, routing::{ get, post, }, }; use chrono::{ Duration, Utc, }; use dal::{ ChangeSet, ChangeSetError, ChangeSetId, ComponentError, ComponentId, DalContext, FuncError, FuncId, SchemaVariantError, TransactionsError, WorkspacePk, WsEvent, WsEventError, diagram::{ DiagramError, view::{ View, ViewId, }, }, func::authoring::FuncAuthoringError, management::{ ManagementError, prototype::{ ManagementPrototypeError, ManagementPrototypeId, }, }, prop::PropError, schema::variant::authoring::VariantAuthoringError, }; use sdf_core::api_error::ApiError; use sdf_extract::{ PosthogEventTracker, change_set::ChangeSetDalContext, }; use serde::{ Deserialize, Serialize, }; use si_db::{ ManagementFuncExecutionError, ManagementFuncJobState, ManagementState, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use veritech_client::ManagementFuncStatus; use super::func::FuncAPIError; use crate::{ AppState, service::force_change_set_response::ForceChangeSetResponse, }; mod history; mod latest; mod state; pub type ManagementApiResult<T> = Result<T, ManagementApiError>; #[remain::sorted] #[derive(Debug, Error)] pub enum ManagementApiError { #[error("change set error: {0}")] ChangeSet(#[from] ChangeSetError), #[error("component error: {0}")] Component(#[from] ComponentError), #[error("diagram error: {0}")] Diagram(#[from] DiagramError), #[error("func error: {0}")] Func(#[from] FuncError), #[error("func api error: {0}")] FuncAPI(#[from] FuncAPIError), #[error("func authoring error: {0}")] FuncAuthoring(#[from] FuncAuthoringError), #[error("generated mgmt func {0} has no prototype")] FuncMissingPrototype(FuncId), #[error("hyper error: {0}")] Http(#[from] axum::http::Error), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), #[error("management error: {0}")] Management(#[from] ManagementError), #[error("management func job status error: {0}")] ManagementFuncJobStatus(#[from] ManagementFuncExecutionError), #[error("management history missing a field - this is a bug!: {0}")] ManagementHistoryFieldMissing(String), #[error("management prototype error: {0}")] ManagementPrototype(#[from] ManagementPrototypeError), #[error("management prototype execution failure: {0}")] ManagementPrototypeExecutionFailure(ManagementPrototypeId), #[error("prop path connot be calculated: {0}")] PropError(#[from] PropError), #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("translating string to ulid: {0} is not a valid ulid")] UlidDecode(#[from] ulid::DecodeError), #[error("variant authoring error: {0}")] VariantAuthoring(#[from] VariantAuthoringError), #[error("ws event error: {0}")] WsEvent(#[from] WsEventError), } impl IntoResponse for ManagementApiError { fn into_response(self) -> Response { let (status_code, error_message) = match self { ManagementApiError::ManagementPrototype( dal::management::prototype::ManagementPrototypeError::FuncExecutionFailure(message), ) => (StatusCode::BAD_REQUEST, message), ManagementApiError::Component(dal::ComponentError::NotFound(_)) | ManagementApiError::Component(dal::ComponentError::SchemaVariantNotFound(_)) => { (StatusCode::NOT_FOUND, self.to_string()) } _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), }; ApiError::new(status_code, error_message).into_response() } } #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct RunPrototypeRequest { request_ulid: Option<ulid::Ulid>, } #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct RunPrototypeResponse { status: ManagementFuncStatus, message: Option<String>, } const MAX_PENDING_DURATION: Duration = Duration::minutes(7); pub async fn run_prototype_inner( ctx: &mut DalContext, prototype_id: ManagementPrototypeId, component_id: ComponentId, view_id: ViewId, request: RunPrototypeRequest, tracker: PosthogEventTracker, ) -> ManagementApiResult<ForceChangeSetResponse<RunPrototypeResponse>> { let force_change_set_id = ChangeSet::force_new(ctx).await?; let state_ctx = ctx.clone(); // Is there already a pending run? if so fail it if it's too old, so that we can start a new one if let Some(pending) = ManagementFuncJobState::get_pending(&state_ctx, component_id, prototype_id).await? { if pending.timestamp().updated_at <= Utc::now() - MAX_PENDING_DURATION { ManagementFuncJobState::transition_state( &state_ctx, pending.id(), ManagementState::Failure, None, Some("hit max pending duration".to_string()), ) .await?; state_ctx.commit().await?; } } if let Err(err) = ManagementFuncJobState::new_pending(ctx, component_id, prototype_id).await { match err { ManagementFuncExecutionError::CreationFailed => { return Ok(ForceChangeSetResponse::new( force_change_set_id, RunPrototypeResponse { status: ManagementFuncStatus::Error, message: "This management func is already running".to_string().into(), }, )); } other => Err(other)?, } } // Enqueue the management func and if there is not a request ULID, we will provide one to the // event payload. This is so that the activity tracker has something to work with for tracking // the lifecycle of a management function in flight. let request_ulid = request.request_ulid.unwrap_or_default(); ctx.enqueue_management_func(prototype_id, component_id, view_id, request_ulid) .await?; WsEvent::management_operations_in_progress(ctx, request_ulid) .await? .publish_on_commit(ctx) .await?; tracker.track( ctx, "run_prototype", serde_json::json!({ "how": "/management/run_prototype", "view_id": view_id, "prototype_id": prototype_id, "component_id": component_id, }), ); ctx.commit().await?; Ok(ForceChangeSetResponse::new( force_change_set_id, RunPrototypeResponse { status: ManagementFuncStatus::Ok, message: "enqueued".to_string().into(), }, )) } pub async fn run_prototype( ChangeSetDalContext(ref mut ctx): ChangeSetDalContext, tracker: PosthogEventTracker, Path((_workspace_pk, _change_set_id, prototype_id, component_id, view_id)): Path<( WorkspacePk, ChangeSetId, ManagementPrototypeId, ComponentId, ViewId, )>, Json(request): Json<RunPrototypeRequest>, ) -> ManagementApiResult<ForceChangeSetResponse<RunPrototypeResponse>> { run_prototype_inner(ctx, prototype_id, component_id, view_id, request, tracker).await } pub async fn run_prototype_for_default_view( ChangeSetDalContext(ref mut ctx): ChangeSetDalContext, tracker: PosthogEventTracker, Path((_workspace_pk, _change_set_id, prototype_id, component_id)): Path<( WorkspacePk, ChangeSetId, ManagementPrototypeId, ComponentId, )>, Json(request): Json<RunPrototypeRequest>, ) -> ManagementApiResult<ForceChangeSetResponse<RunPrototypeResponse>> { run_prototype_inner( ctx, prototype_id, component_id, View::get_id_for_default(ctx).await?, request, tracker, ) .await } pub fn v2_routes() -> Router<AppState> { Router::new() // TODO(Victor): rewrite these to use a viewId resolver instead of two endpoints .route( "/prototype/:prototypeId/:componentId/DEFAULT", post(run_prototype_for_default_view), ) .route( "/prototype/:prototypeId/:componentId/:viewId", post(run_prototype), ) .route( "/prototype/:prototypeId/:componentId/latest", get(latest::latest), ) .route( "/component/:componentId/latest", get(latest::all_latest_for_component), ) .route("/history", get(history::history)) .route("/state/:funcRunId", get(state::state)) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server