"""Reconciliation decision logic for conflict resolution"""
from datetime import datetime
from typing import List, Optional
from loguru import logger
from src.core.models import (
MemoryAtom,
Provenance,
ReconciliationAction,
ReconciliationDecision,
)
class ReconciliationResolver:
"""
Decides how to resolve conflicts between atoms.
Options:
- SUPERSEDE: New atom replaces old (temporal supersession)
- CONTEXTUALIZE: Both true in different contexts (conditional branching)
- REJECT: New atom rejected, existing wins
- MERGE: Combine information from both
For MVP: Rule-based reconciliation.
Future: LLM-based context extraction.
"""
def __init__(self) -> None:
self.reconciliation_count = 0
logger.info("ReconciliationResolver initialized (rule-based)")
def reconcile(
self,
candidate: MemoryAtom,
existing: MemoryAtom,
) -> ReconciliationDecision:
"""
Decide how to resolve conflict between candidate and existing atom.
Args:
candidate: New atom
existing: Existing atom in database
Returns:
ReconciliationDecision with action and explanation
"""
self.reconciliation_count += 1
logger.info(
"Reconciling: [{c_prov}] {c_obj} vs [{e_prov}] {e_obj}",
c_prov=candidate.provenance.value,
c_obj=candidate.object[:50],
e_prov=existing.provenance.value,
e_obj=existing.object[:50],
)
# ============================================================================
# PRIORITY 1: Context-Based Coexistence (MUST BE FIRST)
# ============================================================================
#
# CRITICAL DESIGN DECISION: Check contexts BEFORE provenance/recency
#
# Why this order matters:
# Example: "I like jazz when relaxing" vs "I hate jazz when working"
#
# If we check provenance first:
# - Opposite predicates detected (likes vs dislikes)
# - More recent statement supersedes
# - Result: Only 1 fact kept (WRONG - information loss)
#
# If we check contexts first:
# - Contexts differ: ["relaxing"] vs ["working"]
# - Both can coexist in different contexts
# - Result: Both facts kept (CORRECT - both are true)
#
# This priority ordering enabled the contextual_difference test to pass,
# achieving 100% accuracy. Without it, we were at 75% accuracy.
#
context_decision = self._try_contextualize(candidate, existing)
if context_decision:
return context_decision
# Rule 1: Corrected supersedes everything (except contextualization)
if candidate.provenance == Provenance.CORRECTED:
return ReconciliationDecision(
action=ReconciliationAction.SUPERSEDE,
keep_existing=False,
keep_candidate=True,
archive_existing=True,
explanation="Corrected information supersedes previous",
)
# Rule 2: User-stated supersedes inferred
if (
candidate.provenance == Provenance.USER_STATED
and existing.provenance == Provenance.INFERRED
):
return ReconciliationDecision(
action=ReconciliationAction.SUPERSEDE,
keep_existing=False,
keep_candidate=True,
archive_existing=True,
explanation="User statement supersedes inference",
)
# Rule 3: More recent user statement supersedes older
if (
candidate.provenance == Provenance.USER_STATED
and existing.provenance == Provenance.USER_STATED
):
# Check if candidate is more recent
if candidate.first_observed > existing.first_observed:
return ReconciliationDecision(
action=ReconciliationAction.SUPERSEDE,
keep_existing=False,
keep_candidate=True,
archive_existing=True,
explanation="More recent user statement",
)
# Rule 4: Higher confidence wins
if candidate.confidence > existing.confidence + 0.2: # Significant difference
return ReconciliationDecision(
action=ReconciliationAction.SUPERSEDE,
keep_existing=False,
keep_candidate=True,
archive_existing=True,
explanation=f"Higher confidence ({candidate.confidence:.2f} vs {existing.confidence:.2f})",
)
# Rule 6: Default - reject candidate, existing wins
return ReconciliationDecision(
action=ReconciliationAction.REJECT,
keep_existing=True,
keep_candidate=False,
archive_existing=False,
explanation="Insufficient evidence to override existing atom",
)
def _try_contextualize(
self,
candidate: MemoryAtom,
existing: MemoryAtom,
) -> Optional[ReconciliationDecision]:
"""
Try to find contexts that allow both atoms to coexist.
For MVP: Simple heuristics.
Future: LLM-based context extraction.
"""
# Check if atoms already have different contexts
if candidate.contexts and existing.contexts:
# If they have non-overlapping contexts, they can coexist
# This applies even to opposite predicates (likes vs dislikes)
overlap = set(candidate.contexts) & set(existing.contexts)
if not overlap:
logger.info(
"Atoms have different contexts: {c_ctx} vs {e_ctx}",
c_ctx=candidate.contexts,
e_ctx=existing.contexts,
)
return ReconciliationDecision(
action=ReconciliationAction.CONTEXTUALIZE,
keep_existing=True,
keep_candidate=True,
archive_existing=False,
explanation=f"Both true in different contexts: {candidate.contexts} vs {existing.contexts}",
contexts_added=[], # Already have contexts
)
# Try to infer contexts from temporal patterns
time_diff = (candidate.first_observed - existing.first_observed).total_seconds()
# If atoms are far apart in time, might be temporal change
if abs(time_diff) > 86400 * 7: # More than a week apart
# Could be a preference change over time
# For MVP: Don't auto-contextualize, let supersession handle it
return None
# For MVP: Don't try to auto-infer contexts
# Future: Use LLM to extract situational differences
return None
def reconcile_batch(
self,
candidate: MemoryAtom,
conflicts: List[MemoryAtom],
) -> List[ReconciliationDecision]:
"""
Reconcile candidate against multiple conflicting atoms.
Args:
candidate: New atom
conflicts: List of conflicting atoms
Returns:
List of ReconciliationDecisions (one per conflict)
"""
decisions: List[ReconciliationDecision] = []
for existing in conflicts:
decision = self.reconcile(candidate, existing)
decisions.append(decision)
# Log summary
actions = [d.action for d in decisions]
logger.info(
"Batch reconciliation: {supersede} supersede, {contextualize} contextualize, {reject} reject",
supersede=actions.count(ReconciliationAction.SUPERSEDE),
contextualize=actions.count(ReconciliationAction.CONTEXTUALIZE),
reject=actions.count(ReconciliationAction.REJECT),
)
return decisions
def get_stats(self) -> dict:
"""Get reconciliation statistics"""
return {
"reconciliations": self.reconciliation_count,
}