use std::{
collections::{
BTreeMap,
BTreeSet,
HashMap,
HashSet,
VecDeque,
},
sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
},
time::Instant,
};
use dashmap::DashMap;
use itertools::Itertools;
use serde::Serialize;
use si_id::{
AttributeValueId,
ComponentId,
SchemaVariantId,
WorkspacePk,
};
use telemetry::prelude::*;
use thiserror::Error;
use crate::{
AttributeValue,
Component,
ComponentError,
DalContext,
Prop,
Schema,
SchemaError,
SchemaVariant,
SchemaVariantError,
WorkspaceSnapshotError,
attribute::{
attributes::{
AttributeValueIdent,
Source,
},
path::AttributePath,
value::default_subscription::PropSuggestion,
},
component::ComponentResult,
prop::PropError,
workspace_snapshot::node_weight::reason_node_weight::Reason,
};
#[remain::sorted]
#[derive(Debug, Error)]
pub enum PropSuggestionCacheError {
#[error("Component error: {0}")]
Component(#[from] Box<ComponentError>),
#[error("Prop error: {0}")]
Prop(#[from] Box<PropError>),
#[error("Schema error: {0}")]
Schema(#[from] Box<SchemaError>),
#[error("Schema Variant error: {0}")]
SchemaVariant(#[from] Box<SchemaVariantError>),
#[error("WorkspaceSnapshot error: {0}")]
WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>),
}
pub type PropSuggestionCacheResult<T> = Result<T, PropSuggestionCacheError>;
impl From<ComponentError> for PropSuggestionCacheError {
fn from(value: ComponentError) -> Self {
Box::new(value).into()
}
}
impl From<PropError> for PropSuggestionCacheError {
fn from(value: PropError) -> Self {
Box::new(value).into()
}
}
impl From<SchemaVariantError> for PropSuggestionCacheError {
fn from(value: SchemaVariantError) -> Self {
Box::new(value).into()
}
}
impl From<WorkspaceSnapshotError> for PropSuggestionCacheError {
fn from(value: WorkspaceSnapshotError) -> Self {
Box::new(value).into()
}
}
/// Cache for Prop suggestions and Components by Schema Name
/// This is used for each DalCtx lifetime and is helpful particularly during
/// Discovery as we will autosubscribe multiple components in that case
/// When we need longer lived/more shared caching, we can move this elsewhere
#[derive(Debug, Clone, Default)]
pub struct PropSuggestionsCache {
populated: Arc<AtomicBool>,
/// Map of schema variant ID to its suggestion patterns
schema_suggestions: DashMap<SchemaVariantId, SchemaSuggestionMap>,
/// Map of schema name to components for efficient lookup
schema_to_components: DashMap<String, BTreeSet<ComponentId>>,
}
/// Cache of prop suggestion patterns for a specific schema variant
#[derive(Debug, Clone, Default)]
pub struct SchemaSuggestionMap {
/// Map of prop path -> list of prop suggestions for that path (suggestSources)
suggest_sources_map: BTreeMap<String, Vec<PropSuggestion>>,
}
impl PropSuggestionsCache {
/// Has the cache been populated?
pub fn populated(&self) -> bool {
self.populated.load(Ordering::Relaxed)
}
/// Suggestions by schema id
pub fn schema_suggestions(&self) -> &DashMap<SchemaVariantId, SchemaSuggestionMap> {
&self.schema_suggestions
}
/// A map of schema names to components
pub fn schema_to_components(&self) -> &DashMap<String, BTreeSet<ComponentId>> {
&self.schema_to_components
}
/// Clear the suggestions cache
pub fn clear(&self) {
self.populated.store(false, Ordering::Relaxed);
self.schema_suggestions().clear();
self.schema_to_components().clear()
}
/// Populate this cache instance with data from the database
/// Not sure how noisy this will be but I'd like to get the timings in the short term
#[instrument(level = "info", name = "dal.prop_suggestion_cache.populate", skip_all,
fields(
si.change_set.id = Empty,
si.workspace.id = Empty,
))]
pub async fn populate(&self, ctx: &DalContext) -> PropSuggestionCacheResult<()> {
let span = Span::current();
span.record("si.change_set.id", ctx.change_set_id().to_string());
span.record(
"si.workspace.id",
ctx.tenancy()
.workspace_pk_opt()
.unwrap_or(WorkspacePk::NONE)
.to_string(),
);
let mut processed_schema_variants = HashSet::new();
// Build schema suggestions map for each unique schema variant and
// schema_to_components map
for component_id in Component::list_ids(ctx).await? {
let comp_schema = Component::schema_for_component_id(ctx, component_id).await?;
let schema_name = comp_schema.name().to_string();
// Update or insert the component list for this schema
self.schema_to_components
.entry(schema_name.clone())
.or_default()
.insert(component_id);
let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?;
if processed_schema_variants.contains(&schema_variant_id) {
continue; // Already processed this schema variant
}
processed_schema_variants.insert(schema_variant_id);
// Build suggestion map for this schema variant
let mut suggest_sources_map = BTreeMap::new();
let all_props = SchemaVariant::all_props(ctx, schema_variant_id).await?;
for prop in all_props {
let prop_path = prop.path(ctx).await?.as_prop_suggestion_path();
// Store suggestSources patterns
let suggest_sources = prop.suggested_sources_for()?;
if !suggest_sources.is_empty() {
suggest_sources_map.insert(prop_path.clone(), suggest_sources);
}
// suggestSources says, for this prop path, this variant and
// path is a possible sub source. suggestAsSourceFor says,
// *this* prop path and schema is a possible source for these
// following schemas and paths. let's map the suggestAsSourceFor
// for this prop into suggest sources
let suggest_as_source_for = prop.suggested_as_source_for()?;
for suggestion in suggest_as_source_for {
let Some(suggested_schema) = Schema::get_by_name_opt(ctx, suggestion.schema)
.await
.map_err(Box::new)?
else {
continue;
};
for suggested_variant_id in
Schema::list_schema_variant_ids(ctx, suggested_schema.id())
.await
.map_err(Box::new)?
{
self.schema_suggestions
.entry(suggested_variant_id)
.or_default()
.suggest_sources_map
.entry(suggestion.prop.clone())
.or_default()
.push(PropSuggestion {
schema: schema_name.clone(),
prop: prop_path.clone(),
});
}
}
}
// Store in cache
for (k, v) in suggest_sources_map.into_iter() {
self.schema_suggestions
.entry(schema_variant_id)
.or_default()
.suggest_sources_map
.entry(k)
.or_default()
.extend(v);
}
}
self.populated.store(true, Ordering::Relaxed);
Ok(())
}
pub fn remove_component(&self, component_id: ComponentId) -> PropSuggestionCacheResult<()> {
for mut component_set in self.schema_to_components.iter_mut() {
component_set.value_mut().remove(&component_id);
}
Ok(())
}
pub async fn add_component(
&self,
ctx: &DalContext,
component_id: ComponentId,
) -> PropSuggestionCacheResult<()> {
let comp_schema = Component::schema_for_component_id(ctx, component_id).await?;
let schema_name = comp_schema.name().to_string();
self.schema_to_components
.entry(schema_name)
.or_default()
.insert(component_id);
Ok(())
}
}
/// Result of an autosubscribe operation with improved structure and context
#[derive(Debug, Clone, Default)]
pub struct AutosubscribeResult {
pub successful: Vec<SuccessfulSubscription>,
pub conflicts: Vec<ConflictedSubscription>,
pub errors: Vec<SubscriptionError>,
}
/// A successfully created subscription with context
#[derive(Debug, Clone, Serialize)]
pub struct SuccessfulSubscription {
pub target_path: AttributePath,
pub subscription_source: Source,
pub matched_value: serde_json::Value,
}
/// A subscription that couldn't be created due to conflicts
#[derive(Debug, Clone, Serialize)]
pub struct ConflictedSubscription {
pub target_path: AttributePath,
pub matches: Vec<SubscriptionMatch>,
}
/// A potential subscription match
#[derive(Debug, Clone, Serialize)]
pub struct SubscriptionMatch {
pub component_id: ComponentId,
pub source_path: AttributePath,
pub value: serde_json::Value,
}
/// An error that occurred during subscription creation
#[derive(Debug, Clone)]
pub struct SubscriptionError {
pub target_path: AttributePath,
pub error: String,
pub attempted_source: Option<(ComponentId, AttributePath)>,
}
/// Context object to hold shared data during autosubscribe operations
#[derive(Debug)]
struct AutosubscribeContext {
component_id: ComponentId,
schema_name: String,
schema_variant_id: SchemaVariantId,
attribute_values: Vec<AttributeValueId>,
}
impl AutosubscribeResult {
/// Number of successful subscriptions created
pub fn success_count(&self) -> usize {
self.successful.len()
}
/// Number of conflicted subscriptions that need user intervention
pub fn conflict_count(&self) -> usize {
self.conflicts.len()
}
/// Number of errors that occurred
pub fn error_count(&self) -> usize {
self.errors.len()
}
/// Whether there are any issues that need attention
pub fn has_issues(&self) -> bool {
!self.conflicts.is_empty() || !self.errors.is_empty()
}
/// Human-readable summary of the operation
pub fn summary(&self) -> String {
format!(
"Created {} subscriptions, {} conflicts, {} errors",
self.success_count(),
self.conflict_count(),
self.error_count()
)
}
/// Get all successful subscriptions for a specific target path
pub fn successful_for_path(&self, path: &AttributePath) -> Vec<&SuccessfulSubscription> {
self.successful
.iter()
.filter(|s| &s.target_path == path)
.collect()
}
/// Get all conflicts for a specific target path
pub fn conflicts_for_path(&self, path: &AttributePath) -> Vec<&ConflictedSubscription> {
self.conflicts
.iter()
.filter(|c| &c.target_path == path)
.collect()
}
}
impl AutosubscribeContext {
/// Create a new context for autosubscribe operations
async fn new(ctx: &DalContext, component_id: ComponentId) -> ComponentResult<Self> {
let component = Component::get_by_id(ctx, component_id).await?;
let schema = component.schema(ctx).await?;
let schema_name = schema.name().to_string();
let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?;
// Get all attribute values for this component
let attribute_value_tree = AttributeValue::tree_for_component(ctx, component_id).await?;
let mut flattened = Vec::with_capacity(attribute_value_tree.keys().len());
let mut queue = VecDeque::from_iter(attribute_value_tree.keys().copied());
while let Some(av_id) = queue.pop_front() {
flattened.push(av_id);
if let Some(children) = attribute_value_tree.get(&av_id) {
flattened.reserve(children.len());
queue.reserve(children.len());
queue.extend(children);
}
}
Ok(Self {
component_id,
schema_name,
schema_variant_id,
attribute_values: flattened,
})
}
/// Find potential subscription matches based on prop suggestions within this component
async fn find_suggestion_matches(
&self,
ctx: &DalContext,
cache: &PropSuggestionsCache,
potential_matches: &mut HashMap<AttributeValueId, HashSet<PotentialSource>>,
potential_match_values: &mut HashMap<AttributeValueId, (AttributePath, serde_json::Value)>,
) -> ComponentResult<()> {
// Get cached schema suggestions for this component's schema variant
let Some(component_suggestion_map) = cache.schema_suggestions.get(&self.schema_variant_id)
else {
return Ok(());
};
// Check each attribute value in this component for incoming subscriptions
for &attribute_value_id in &self.attribute_values {
// Skip if this attribute value doesn't have a prop or a manually set value
let Some(prop_id) = AttributeValue::prop_id_opt(ctx, attribute_value_id).await? else {
continue;
};
// Skip if this av is set by a dependent function (which means it's not been set manually)
if AttributeValue::is_set_by_dependent_function(ctx, attribute_value_id).await? {
continue;
}
let Some(current_value) = AttributeValue::view(ctx, attribute_value_id).await? else {
continue; // Skip if no value is set
};
let prop = Prop::get_by_id(ctx, prop_id).await?;
let prop_path = prop.path(ctx).await?.as_prop_suggestion_path();
let prop_suggestion = PropSuggestion {
schema: self.schema_name.clone(),
prop: prop_path.clone(),
};
let target_path = AttributePath::JsonPointer(prop_path);
// Check for explicit suggestions from the cache
let Some(suggestions) = component_suggestion_map
.suggest_sources_map
.get(&prop_suggestion.prop)
else {
continue;
};
// This prop has explicit suggested sources, check them
for suggestion in suggestions {
let Some(source_components) = cache.schema_to_components.get(&suggestion.schema)
else {
continue;
};
for &source_component_id in source_components.iter() {
if source_component_id == self.component_id {
continue; // Skip self
}
// Find the attribute value at the suggested prop path.
// Suggestions just use strings for the path, so could be
// wrong, if a suggestion doesn't resolve, ignore it.
let Some(source_av_id) = AttributeValueIdent::new(&suggestion.prop)
.resolve(ctx, source_component_id)
.await?
else {
continue;
};
let source_value = AttributeValue::view(ctx, source_av_id).await?;
// If values match, this is a potential subscription
if source_value.as_ref() == Some(¤t_value) {
let source = PotentialSource {
component_id: source_component_id,
attribute_value_id: source_av_id,
path: AttributePath::JsonPointer(suggestion.prop.clone()),
};
potential_matches
.entry(attribute_value_id)
.or_default()
.insert(source);
potential_match_values.insert(
attribute_value_id,
(target_path.clone(), current_value.clone()),
);
}
}
}
}
Ok(())
}
async fn process_matches(
&self,
ctx: &DalContext,
potential_matches: HashMap<AttributeValueId, HashSet<PotentialSource>>,
potential_match_values: HashMap<AttributeValueId, (AttributePath, serde_json::Value)>,
result: &mut AutosubscribeResult,
) -> ComponentResult<()> {
// Process each potential match
for (dest_av_id, matches) in potential_matches {
let (target_path, stored_value) = potential_match_values
.get(&dest_av_id)
.cloned()
.unwrap_or_else(|| {
(
AttributePath::JsonPointer("".to_string()),
serde_json::Value::Null,
)
});
if let Ok(source) = matches.iter().exactly_one() {
// Unambiguous match - create the subscription
let subscription =
AttributeValue::as_subscription(ctx, source.attribute_value_id).await?;
// Make sure the subscribed-to path is valid
match subscription.validate(ctx).await {
Ok(_) => {
// Prevent autosubscribe from creating cycles and
let guard = ctx.workspace_snapshot()?.enable_cycle_check().await;
// Create the subscription
match AttributeValue::set_to_subscription(
ctx,
dest_av_id,
subscription.clone(),
None,
Reason::Autosubscription,
)
.await
{
Ok(_) => {
result.successful.push(SuccessfulSubscription {
target_path,
subscription_source: Source::Subscription {
component: source.component_id.into(),
path: subscription.path.to_string(),
func: None,
_keep_existing_subscriptions: None,
},
matched_value: stored_value,
});
}
Err(err) => {
// If we found an error while attempting to create the subscription, we should roll
// back to a manually set value. This is because there are multiple steps in creating
// a subscription, and there's a chance the graph ends up in a half-state
// even if the subscription isn't ultimately created
// For example, if attempting to create this subscription causes a graph cycle,
// we create a new AttributePrototypeArgument before creating the subscription edge,
// and as the edge would cause a cycle, it's never created but the newly created
// AttributePrototypeArgument still exists and becomes orphaned causing various shenanigans,
// most noticeably, failing to build the AttributeTreeMV for this component
error!(si.error.message = ?err, si.attribute_value.id = ?dest_av_id, si.attribute_value.path=?target_path, "Error autosubscribing, rolling back to manually set value");
AttributeValue::update(ctx, dest_av_id, Some(stored_value)).await?;
result.errors.push(SubscriptionError {
target_path,
error: format!("Failed to create subscription: {err}"),
attempted_source: Some((
source.component_id,
source.path.clone(),
)),
});
}
}
drop(guard);
}
Err(err) => {
result.errors.push(SubscriptionError {
target_path,
error: format!("Invalid subscription path: {err}"),
attempted_source: Some((source.component_id, source.path.clone())),
});
}
}
} else {
// Multiple matches - record as conflict
let subscription_matches: Vec<SubscriptionMatch> = matches
.iter()
.map(|source| SubscriptionMatch {
component_id: source.component_id,
source_path: source.path.clone(),
value: stored_value.clone(),
})
.collect();
if !subscription_matches.is_empty() {
result.conflicts.push(ConflictedSubscription {
target_path,
matches: subscription_matches,
});
}
}
}
Ok(())
}
}
/// A potential source for a subscription
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
struct PotentialSource {
component_id: ComponentId,
attribute_value_id: AttributeValueId,
path: AttributePath,
}
impl Component {
/// Automatically creates prop subscriptions based on prop suggestions and matching values.
///
/// This function mimics the prior behavior of `autoconnect`, but instead of dealing with sockets,
/// it works with prop subscriptions.
///
/// 1. For each attribute value in the component, if it has a value that's set manually,
/// and has a prop suggestion, look for components that match the prop suggestion and
/// check if the attribute value's value matches the one we're trying to find subscriptions for.
/// 2. Find components with prop suggestions (suggest_source_as) that go the other way, and see if those match.
/// 3. If there's a single match, replace the manually set value with a subscription
/// but if there are multiple eligible matches, return them as ambiguous for user decision.
#[instrument(level = "info", name = "dal.component.autosubscribe_1", skip_all,
fields(
si.change_set.id = Empty,
si.workspace.id = Empty,
si.dal.autosubscribe.fetched_cache_ms = Empty,
si.dal.autosubscribe.suggest_ms = Empty,
si.dal.autosubscribe.suggest_as_source_for_ms = Empty,
si.dal.autosubscribe.created_subcriptions = Empty,
si.dal.autosubscribe.conflicted_subscriptions = Empty,
))]
pub async fn autosubscribe(
ctx: &DalContext,
component_id: ComponentId,
) -> ComponentResult<AutosubscribeResult> {
let span = Span::current();
let start = Instant::now();
span.record("si.change_set.id", ctx.change_set_id().to_string());
span.record(
"si.workspace.id",
ctx.tenancy()
.workspace_pk_opt()
.unwrap_or(WorkspacePk::NONE)
.to_string(),
);
let mut result = AutosubscribeResult::default();
// Use the cached prop suggestions from workspace snapshot
let workspace_snapshot = ctx.workspace_snapshot()?;
let cache = workspace_snapshot.prop_suggestions_cache(ctx).await?;
span.record(
"si.dal.autosubscribe.fetched_cache_ms",
start.elapsed().as_millis(),
);
// Create context for this operation
let context = AutosubscribeContext::new(ctx, component_id).await?;
// Build a map of potential matches by destination attribute value
let mut potential_matches: HashMap<AttributeValueId, HashSet<PotentialSource>> =
HashMap::new();
let mut potential_match_values: HashMap<
AttributeValueId,
(AttributePath, serde_json::Value),
> = HashMap::new();
// 1. Find matches based on explicit prop suggestions
context
.find_suggestion_matches(
ctx,
cache,
&mut potential_matches,
&mut potential_match_values,
)
.await?;
span.record(
"si.dal.autosubscribe.suggest_ms",
start.elapsed().as_millis(),
);
if potential_matches.is_empty() {
return Ok(result);
}
// 3. Process all potential matches into final results
context
.process_matches(ctx, potential_matches, potential_match_values, &mut result)
.await?;
span.record(
"si.dal.autosubscribe.created_subcriptions",
result.success_count(),
);
span.record(
"si.dal.autosubscribe.conflicted_subscriptions",
result.conflict_count(),
);
Ok(result)
}
}