"""
Core CRUD operations for Ember V3 memories.
All database interaction goes through this module.
MCP tools call these functions — they never touch SQLite directly.
"""
from __future__ import annotations
import time
import uuid
import logging
from typing import Optional, List
from ember.memory.db import get_db
from ember.memory.tiers import validate_tier, VALID_TIERS
logger = logging.getLogger("ember.operations")
def store_memory(
content: str,
tier: str = "session",
importance: float = 0.5,
tags: str = "",
source: str = "agent",
status: str = "",
embedding: Optional[bytes] = None,
source_path: str = "",
parent_id: Optional[str] = None,
) -> dict:
"""Store a new memory. Returns { id, tier, created_at }."""
validate_tier(tier)
now = time.time()
memory_id = str(uuid.uuid4())
db = get_db()
db.execute(
"""INSERT INTO memories
(id, content, tier, importance, tags, source, status,
embedding, created_at, updated_at, accessed_at, source_path, parent_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
memory_id, content, tier,
max(0.0, min(1.0, importance)),
tags, source, status,
embedding, now, now, now, source_path, parent_id,
),
)
db.commit()
logger.debug(f"Stored memory {memory_id[:8]} in tier={tier}")
return {"id": memory_id, "tier": tier, "created_at": now}
def get_memory(memory_id: str) -> Optional[dict]:
"""Fetch a single memory by ID. Returns dict or None."""
db = get_db()
row = db.fetchone("SELECT * FROM memories WHERE id = ?", (memory_id,))
if row is None:
return None
return dict(row)
def get_memories_batch(memory_ids: list[str]) -> list[dict]:
"""Fetch multiple memories by ID. Efficient batched query."""
if not memory_ids:
return []
db = get_db()
results = []
# SQLite has a 999 parameter limit — chunk if needed
chunk_size = 900
for i in range(0, len(memory_ids), chunk_size):
chunk = memory_ids[i : i + chunk_size]
placeholders = ",".join("?" * len(chunk))
rows = db.fetchall(
f"SELECT * FROM memories WHERE id IN ({placeholders})", tuple(chunk)
)
results.extend(dict(r) for r in rows)
return results
def update_memory(
memory_id: str,
content: Optional[str] = None,
tier: Optional[str] = None,
importance: Optional[float] = None,
tags: Optional[str] = None,
status: Optional[str] = None,
embedding: Optional[bytes] = None,
source_path: Optional[str] = None,
) -> dict:
"""Update fields on an existing memory. Only provided fields are changed."""
db = get_db()
# Check existence
existing = get_memory(memory_id)
if existing is None:
return {"id": memory_id, "updated": False, "error": "MEMORY_NOT_FOUND"}
updates = []
params = []
if content is not None:
updates.append("content = ?")
params.append(content)
if tier is not None:
validate_tier(tier)
updates.append("tier = ?")
params.append(tier)
if importance is not None:
updates.append("importance = ?")
params.append(max(0.0, min(1.0, importance)))
if tags is not None:
updates.append("tags = ?")
params.append(tags)
if status is not None:
updates.append("status = ?")
params.append(status)
if embedding is not None:
updates.append("embedding = ?")
params.append(embedding)
if source_path is not None:
updates.append("source_path = ?")
params.append(source_path)
if not updates:
return {"id": memory_id, "updated": False, "error": "NO_FIELDS_TO_UPDATE"}
updates.append("updated_at = ?")
params.append(time.time())
params.append(memory_id)
db.execute(
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?", tuple(params)
)
db.commit()
return {"id": memory_id, "updated": True}
def shadow_memory(memory_id: str, shadowed_by: Optional[str] = None, reason: str = "") -> dict:
"""Mark a memory as shadowed (outdated/superseded)."""
db = get_db()
existing = get_memory(memory_id)
if existing is None:
return {"id": memory_id, "shadowed": False, "error": "MEMORY_NOT_FOUND"}
now = time.time()
db.execute(
"""UPDATE memories
SET is_shadowed = 1, shadowed_by = ?, shadow_load = 1.0, updated_at = ?
WHERE id = ?""",
(shadowed_by, now, memory_id),
)
db.commit()
return {"id": memory_id, "shadowed": True}
def delete_memory(memory_id: str) -> dict:
"""Delete a memory and cascade-delete its edges."""
db = get_db()
existing = get_memory(memory_id)
if existing is None:
return {"id": memory_id, "deleted": False, "error": "MEMORY_NOT_FOUND"}
with db.transaction() as conn:
# Cascade edges (foreign key ON DELETE CASCADE handles this,
# but explicit for clarity)
conn.execute(
"DELETE FROM edges WHERE source_id = ? OR target_id = ?",
(memory_id, memory_id),
)
conn.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
return {"id": memory_id, "deleted": True}
def delete_memories_batch(memory_ids: list[str]) -> int:
"""Delete multiple memories. Returns count of deleted."""
if not memory_ids:
return 0
db = get_db()
deleted = 0
with db.transaction() as conn:
for mid in memory_ids:
conn.execute(
"DELETE FROM edges WHERE source_id = ? OR target_id = ?",
(mid, mid),
)
cursor = conn.execute("DELETE FROM memories WHERE id = ?", (mid,))
deleted += cursor.rowcount
return deleted
def touch_memory(memory_id: str):
"""Update accessed_at and increment access_count."""
db = get_db()
db.execute(
"""UPDATE memories
SET accessed_at = ?, access_count = access_count + 1
WHERE id = ?""",
(time.time(), memory_id),
)
db.commit()
def touch_memories_batch(memory_ids: list[str]):
"""Batch update access timestamps."""
if not memory_ids:
return
db = get_db()
now = time.time()
db.executemany(
"UPDATE memories SET accessed_at = ?, access_count = access_count + 1 WHERE id = ?",
[(now, mid) for mid in memory_ids],
)
db.commit()
def list_memories(
tier_filter: Optional[list[str]] = None,
status_filter: Optional[str] = None,
include_shadowed: bool = False,
limit: int = 50,
offset: int = 0,
min_importance: float = 0.0,
) -> list[dict]:
"""List memories with filtering and pagination."""
db = get_db()
conditions = []
params = []
if not include_shadowed:
conditions.append("is_shadowed = 0")
if tier_filter:
placeholders = ",".join("?" * len(tier_filter))
conditions.append(f"tier IN ({placeholders})")
params.extend(tier_filter)
if status_filter:
conditions.append("status = ?")
params.append(status_filter)
if min_importance > 0:
conditions.append("importance >= ?")
params.append(min_importance)
where = " AND ".join(conditions) if conditions else "1=1"
params.extend([limit, offset])
rows = db.fetchall(
f"""SELECT * FROM memories
WHERE {where}
ORDER BY updated_at DESC
LIMIT ? OFFSET ?""",
tuple(params),
)
return [dict(r) for r in rows]
def search_fts(query: str, limit: int = 10) -> list[dict]:
"""Full-text search fallback using FTS5 BM25."""
db = get_db()
rows = db.fetchall(
"""SELECT m.*, rank
FROM memories_fts fts
JOIN memories m ON m.id = fts.id
WHERE memories_fts MATCH ?
AND m.is_shadowed = 0
ORDER BY rank
LIMIT ?""",
(query, limit),
)
return [dict(r) for r in rows]
def semantic_search(
query_embedding: bytes,
engine,
top_k: int = 5,
tier_filter: Optional[list[str]] = None,
include_shadowed: bool = False,
min_importance: float = 0.0,
) -> list[dict]:
"""
Semantic search: load embeddings from DB, compute similarity, rank.
This is the in-process cosine similarity approach from the v2 spec.
No separate vector DB — embeddings stored as BLOBs in SQLite.
"""
db = get_db()
conditions = ["embedding IS NOT NULL"]
params = []
if not include_shadowed:
conditions.append("is_shadowed = 0")
if tier_filter:
placeholders = ",".join("?" * len(tier_filter))
conditions.append(f"tier IN ({placeholders})")
params.extend(tier_filter)
if min_importance > 0:
conditions.append("importance >= ?")
params.append(min_importance)
where = " AND ".join(conditions)
rows = db.fetchall(
f"""SELECT id, content, tier, importance, tags, status, source,
embedding, created_at, updated_at, accessed_at,
access_count, shadow_load, source_path
FROM memories
WHERE {where}
ORDER BY updated_at DESC""",
tuple(params),
)
if not rows:
return []
# Compute similarities
candidates = []
for row in rows:
row_dict = dict(row)
emb = row_dict.get("embedding")
if emb is None:
continue
cos_sim = engine.similarity(query_embedding, emb)
# Recency score: 1.0 for now, decays to 0 over 90 days
now = time.time()
age_seconds = now - (row_dict.get("accessed_at") or row_dict["created_at"])
max_age = 90 * 86400 # 90 days
recency = max(0.0, 1.0 - (age_seconds / max_age))
importance = row_dict.get("importance", 0.5)
shadow = row_dict.get("shadow_load", 0.0)
# HESTIA-inspired scoring:
# similarity × shadow_penalty × (importance_weight + recency_weight)
shadow_penalty = (1.0 - shadow) ** 2.0
final_score = cos_sim * shadow_penalty * (
0.6 + 0.3 * importance + 0.1 * recency
)
row_dict["similarity_score"] = round(cos_sim, 4)
row_dict["final_score"] = round(final_score, 4)
# Remove raw embedding from results
row_dict.pop("embedding", None)
candidates.append(row_dict)
# Sort by final score, return top_k
candidates.sort(key=lambda x: x["final_score"], reverse=True)
return candidates[:top_k]
# --- Edge / KG operations ---
def save_edge(source_id: str, target_id: str, edge_type: str):
"""Create a knowledge graph edge."""
db = get_db()
try:
db.execute(
"""INSERT OR IGNORE INTO edges (source_id, target_id, edge_type, created_at)
VALUES (?, ?, ?, ?)""",
(source_id, target_id, edge_type, time.time()),
)
db.commit()
except Exception as e:
logger.warning(f"Failed to save edge: {e}")
def get_edges(memory_id: str) -> list[dict]:
"""Get all edges connected to a memory (as source or target)."""
db = get_db()
rows = db.fetchall(
"""SELECT * FROM edges
WHERE source_id = ? OR target_id = ?""",
(memory_id, memory_id),
)
return [dict(r) for r in rows]
def traverse_kg(
start_ids: list[str],
depth: int = 2,
edge_types: Optional[list[str]] = None,
) -> list[str]:
"""
BFS traversal of the knowledge graph.
Returns all discovered memory IDs within `depth` hops.
Includes cycle detection.
"""
db = get_db()
visited = set(start_ids)
frontier = list(start_ids)
for _ in range(depth):
if not frontier:
break
# Batch fetch neighbors
chunk_size = 900
next_frontier = []
for i in range(0, len(frontier), chunk_size):
chunk = frontier[i : i + chunk_size]
placeholders = ",".join("?" * len(chunk))
type_filter = ""
params = list(chunk) + list(chunk)
if edge_types:
type_placeholders = ",".join("?" * len(edge_types))
type_filter = f" AND edge_type IN ({type_placeholders})"
params += edge_types
rows = db.fetchall(
f"""SELECT source_id, target_id FROM edges
WHERE (source_id IN ({placeholders})
OR target_id IN ({placeholders}))
{type_filter}""",
tuple(params),
)
for row in rows:
for neighbor_id in (row["source_id"], row["target_id"]):
if neighbor_id not in visited:
visited.add(neighbor_id)
next_frontier.append(neighbor_id)
frontier = next_frontier
return list(visited)
# --- Session operations ---
def start_session() -> str:
"""Create a new session record."""
db = get_db()
session_id = str(uuid.uuid4())
db.execute(
"INSERT INTO sessions (id, started_at) VALUES (?, ?)",
(session_id, time.time()),
)
db.commit()
return session_id
def close_session(session_id: str, summary: str = "", decisions: str = "", next_steps: str = "") -> dict:
"""
Close a session:
1. Evaluate working memories for promotion
2. Store session summary as a session-tier memory
3. Mark session ended
"""
from ember.memory.tiers import get_working_memories_to_promote
db = get_db()
now = time.time()
# Get all working-tier memories
working = list_memories(tier_filter=["working"], include_shadowed=False, limit=1000)
to_promote, to_discard = get_working_memories_to_promote(working)
promoted = 0
discarded = 0
with db.transaction() as conn:
# Promote worthy working memories to session tier
for mem in to_promote:
conn.execute(
"UPDATE memories SET tier = 'session', updated_at = ? WHERE id = ?",
(now, mem["id"]),
)
promoted += 1
# Delete disposable working memories
for mem in to_discard:
conn.execute("DELETE FROM memories WHERE id = ?", (mem["id"],))
discarded += 1
# Mark session ended
conn.execute(
"UPDATE sessions SET ended_at = ?, summary = ? WHERE id = ?",
(now, summary, session_id),
)
# Store session summary as a memory (outside transaction)
if summary:
full_summary = f"Session [{time.strftime('%Y-%m-%d')}]: {summary}"
if decisions:
full_summary += f" Decisions: {decisions}"
if next_steps:
full_summary += f" Next steps: {next_steps}"
store_memory(
content=full_summary,
tier="session",
importance=0.6,
tags="session_summary",
source="session",
)
# Log consolidation
db.execute(
"""INSERT INTO consolidation_log (run_at, memories_processed, memories_promoted, memories_discarded, notes)
VALUES (?, ?, ?, ?, ?)""",
(now, len(working), promoted, discarded, f"Session close: {session_id[:8]}"),
)
db.commit()
return {
"session_id": session_id,
"memories_promoted": promoted,
"memories_discarded": discarded,
}
# --- Region stats ---
def update_region_stats(cell_id: int, **kwargs):
"""Upsert region stats for a Voronoi cell."""
db = get_db()
now = time.time()
existing = db.fetchone("SELECT * FROM region_stats WHERE cell_id = ?", (cell_id,))
if existing is None:
db.execute(
"""INSERT INTO region_stats (cell_id, last_updated) VALUES (?, ?)""",
(cell_id, now),
)
updates = ["last_updated = ?"]
params = [now]
for key, value in kwargs.items():
updates.append(f"{key} = ?")
params.append(value)
params.append(cell_id)
db.execute(
f"UPDATE region_stats SET {', '.join(updates)} WHERE cell_id = ?",
tuple(params),
)
db.commit()
def get_region_stats(cell_id: int) -> Optional[dict]:
"""Get stats for a specific Voronoi cell."""
db = get_db()
row = db.fetchone("SELECT * FROM region_stats WHERE cell_id = ?", (cell_id,))
return dict(row) if row else None
# --- Metrics ---
def log_metric(metric_name: str, value: float):
"""Log a metric value for trend tracking."""
db = get_db()
db.execute(
"INSERT INTO metrics_log (metric_name, value, recorded_at) VALUES (?, ?, ?)",
(metric_name, value, time.time()),
)
db.commit()
def get_metric_history(metric_name: str, limit: int = 10) -> list[dict]:
"""Get recent metric values for trend analysis."""
db = get_db()
rows = db.fetchall(
"""SELECT * FROM metrics_log
WHERE metric_name = ?
ORDER BY recorded_at DESC
LIMIT ?""",
(metric_name, limit),
)
return [dict(r) for r in rows]