"""
Memory consolidation engine for Ember V3.
Runs at session close and on-demand. Handles:
- Tier promotion (working→session→relational→glacier)
- Redundancy detection and merging
- TTL-based expiry
- Shadow cleanup
"""
from __future__ import annotations
import time
import logging
from typing import Optional
from ember.memory.db import get_db
from ember.memory.tiers import should_promote, should_expire, VALID_TIERS
from ember.memory.operations import (
list_memories,
update_memory,
shadow_memory,
delete_memory,
store_memory,
log_metric,
)
logger = logging.getLogger("ember.consolidation")
def run_consolidation(engine=None) -> dict:
"""
Full consolidation pass across all tiers.
1. Check for TTL-expired memories → delete
2. Check for promotion candidates → promote
3. Check for semantic duplicates → merge (if engine available)
4. Log results
Returns stats dict.
"""
now = time.time()
stats = {
"expired": 0,
"promoted": 0,
"merged": 0,
"processed": 0,
}
# Process each tier (skip glacier — never expires, never auto-promotes)
for tier in ("working", "session", "relational"):
memories = list_memories(
tier_filter=[tier],
include_shadowed=False,
limit=10000,
)
for mem in memories:
stats["processed"] += 1
# Check expiry
if should_expire(tier, mem):
delete_memory(mem["id"])
stats["expired"] += 1
logger.debug(f"Expired {mem['id'][:8]} from {tier}")
continue
# Check promotion
target_tier = should_promote(tier, mem)
if target_tier:
update_memory(mem["id"], tier=target_tier)
stats["promoted"] += 1
logger.debug(f"Promoted {mem['id'][:8]}: {tier} → {target_tier}")
# Semantic deduplication (if embedding engine available)
if engine and engine.is_semantic:
merge_count = _deduplicate_similar(engine)
stats["merged"] = merge_count
# Log consolidation run
db = get_db()
db.execute(
"""INSERT INTO consolidation_log
(run_at, memories_processed, memories_promoted, memories_discarded, notes)
VALUES (?, ?, ?, ?, ?)""",
(now, stats["processed"], stats["promoted"], stats["expired"],
f"Full consolidation: {stats}"),
)
db.commit()
log_metric("consolidation_expired", stats["expired"])
log_metric("consolidation_promoted", stats["promoted"])
return stats
def _deduplicate_similar(engine, similarity_threshold: float = 0.92) -> int:
"""
Find and merge semantically duplicate memories within the same tier.
Two memories are considered duplicates if cosine_similarity > threshold
AND they're in the same tier. The older one is shadowed, the newer one
gets importance = max(both).
"""
merged = 0
for tier in ("session", "relational"):
memories = list_memories(
tier_filter=[tier],
include_shadowed=False,
limit=5000,
)
# Only process memories with embeddings
with_embeddings = []
db = get_db()
for mem in memories:
row = db.fetchone(
"SELECT embedding FROM memories WHERE id = ?", (mem["id"],)
)
if row and row["embedding"]:
mem["_embedding"] = row["embedding"]
with_embeddings.append(mem)
# O(n²) pairwise check — acceptable for < 5000 memories per tier
shadowed_ids = set()
for i, mem_a in enumerate(with_embeddings):
if mem_a["id"] in shadowed_ids:
continue
for j in range(i + 1, len(with_embeddings)):
mem_b = with_embeddings[j]
if mem_b["id"] in shadowed_ids:
continue
sim = engine.similarity(mem_a["_embedding"], mem_b["_embedding"])
if sim >= similarity_threshold:
# Shadow the older one, boost the newer one
older, newer = (
(mem_a, mem_b)
if mem_a["created_at"] <= mem_b["created_at"]
else (mem_b, mem_a)
)
shadow_memory(older["id"], shadowed_by=newer["id"])
shadowed_ids.add(older["id"])
# Boost newer memory's importance
new_importance = max(
older.get("importance", 0.5),
newer.get("importance", 0.5),
)
update_memory(newer["id"], importance=new_importance)
merged += 1
logger.debug(
f"Merged duplicate: {older['id'][:8]} → {newer['id'][:8]} "
f"(sim={sim:.3f})"
)
return merged
def consolidate_memories(
memory_ids: list[str],
merged_content: str,
tier: str = "session",
importance: float = 0.6,
) -> dict:
"""
Manually merge multiple memories into one consolidated memory.
Original memories are shadowed. A new memory is created with the
merged content and parent references.
"""
from ember.memory.operations import get_memories_batch
if len(memory_ids) < 2:
return {"error": "Need at least 2 memories to consolidate"}
memories = get_memories_batch(memory_ids)
if len(memories) != len(memory_ids):
found_ids = {m["id"] for m in memories}
missing = [mid for mid in memory_ids if mid not in found_ids]
return {"error": f"Memories not found: {missing}"}
# Create merged memory
result = store_memory(
content=merged_content,
tier=tier,
importance=importance,
tags="consolidated",
source="consolidation",
)
new_id = result["id"]
# Shadow originals
shadowed_ids = []
for mem in memories:
shadow_memory(mem["id"], shadowed_by=new_id)
shadowed_ids.append(mem["id"])
return {
"new_id": new_id,
"shadowed_ids": shadowed_ids,
}