"""
SQLite database layer for Ember V3.
Single database file at ~/.ember-v3/ember.db holds all state:
- memories table (core storage)
- sessions table (session lifecycle)
- consolidation_log (tier promotion/discard history)
- memories_fts (FTS5 virtual table for keyword fallback)
- edges table (knowledge graph)
- region_stats (Voronoi cell statistics)
- schema_version (migration tracking)
"""
from __future__ import annotations
import os
import sqlite3
import threading
from pathlib import Path
from contextlib import contextmanager
from typing import Optional
EMBER_DIR = Path.home() / ".ember-v3"
DB_PATH = EMBER_DIR / "ember.db"
SCHEMA_VERSION = 1
# Schema DDL — executed on first run or migration
_SCHEMA_SQL = """
-- Schema version tracking
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at REAL NOT NULL
);
-- Main memory store
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
tier TEXT NOT NULL DEFAULT 'session',
importance REAL DEFAULT 0.5,
tags TEXT DEFAULT '',
source TEXT DEFAULT '',
status TEXT DEFAULT '',
embedding BLOB,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
accessed_at REAL,
access_count INTEGER DEFAULT 0,
shadow_load REAL DEFAULT 0.0,
is_shadowed INTEGER DEFAULT 0,
shadowed_by TEXT,
parent_id TEXT,
source_path TEXT DEFAULT '',
FOREIGN KEY (parent_id) REFERENCES memories(id)
);
-- Indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_memories_tier ON memories(tier);
CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status);
CREATE INDEX IF NOT EXISTS idx_memories_updated ON memories(updated_at);
CREATE INDEX IF NOT EXISTS idx_memories_importance ON memories(importance);
CREATE INDEX IF NOT EXISTS idx_memories_shadowed ON memories(is_shadowed);
CREATE INDEX IF NOT EXISTS idx_memories_created ON memories(created_at);
-- Session tracking
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
started_at REAL NOT NULL,
ended_at REAL,
checkpoint_state TEXT,
summary TEXT DEFAULT ''
);
-- Consolidation log
CREATE TABLE IF NOT EXISTS consolidation_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_at REAL NOT NULL,
memories_processed INTEGER,
memories_promoted INTEGER,
memories_discarded INTEGER,
notes TEXT
);
-- Knowledge graph edges
CREATE TABLE IF NOT EXISTS edges (
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
edge_type TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (source_id, target_id, edge_type),
FOREIGN KEY (source_id) REFERENCES memories(id) ON DELETE CASCADE,
FOREIGN KEY (target_id) REFERENCES memories(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id);
CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id);
-- Voronoi region statistics
CREATE TABLE IF NOT EXISTS region_stats (
cell_id INTEGER PRIMARY KEY,
ember_count INTEGER DEFAULT 0,
shadow_accumulator REAL DEFAULT 0.0,
conflict_density REAL DEFAULT 0.0,
mean_importance REAL DEFAULT 0.0,
last_updated REAL,
welford_count INTEGER DEFAULT 0,
welford_mean BLOB,
welford_m2 BLOB
);
-- Metrics log for health trends
CREATE TABLE IF NOT EXISTS metrics_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
recorded_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics_log(metric_name);
CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics_log(recorded_at);
-- Full-text search (FTS5) for keyword fallback
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
id UNINDEXED,
content,
tags,
content='memories',
content_rowid='rowid'
);
"""
# FTS5 sync triggers — separate because they reference the virtual table
_TRIGGER_SQL = """
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, id, content, tags)
VALUES (new.rowid, new.id, new.content, new.tags);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, id, content, tags)
VALUES ('delete', old.rowid, old.id, old.content, old.tags);
END;
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, id, content, tags)
VALUES ('delete', old.rowid, old.id, old.content, old.tags);
INSERT INTO memories_fts(rowid, id, content, tags)
VALUES (new.rowid, new.id, new.content, new.tags);
END;
"""
class Database:
"""Thread-safe SQLite connection manager with WAL mode and foreign keys."""
def __init__(self, db_path: Optional[Path] = None):
self._db_path = db_path or DB_PATH
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._local = threading.local()
self._initialized = False
self._init_lock = threading.Lock()
def _get_connection(self) -> sqlite3.Connection:
if not hasattr(self._local, "conn") or self._local.conn is None:
conn = sqlite3.connect(
str(self._db_path),
timeout=10.0,
check_same_thread=False,
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000")
self._local.conn = conn
return self._local.conn
def initialize(self):
"""Create schema if needed. Idempotent."""
with self._init_lock:
if self._initialized:
return
conn = self._get_connection()
# Check if schema exists
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='schema_version'"
)
schema_exists = cursor.fetchone() is not None
if not schema_exists:
# Fresh install — create everything
conn.executescript(_SCHEMA_SQL)
conn.executescript(_TRIGGER_SQL)
import time
conn.execute(
"INSERT INTO schema_version (version, applied_at) VALUES (?, ?)",
(SCHEMA_VERSION, time.time()),
)
conn.commit()
else:
# Check version and migrate if needed
cursor = conn.execute(
"SELECT MAX(version) FROM schema_version"
)
row = cursor.fetchone()
current_version = row[0] if row and row[0] else 0
if current_version < SCHEMA_VERSION:
self._migrate(conn, current_version)
self._initialized = True
def _migrate(self, conn: sqlite3.Connection, from_version: int):
"""Apply schema migrations incrementally."""
import time
# Future migrations go here as version increments
# if from_version < 2:
# conn.execute("ALTER TABLE memories ADD COLUMN new_field TEXT")
conn.execute(
"INSERT INTO schema_version (version, applied_at) VALUES (?, ?)",
(SCHEMA_VERSION, time.time()),
)
conn.commit()
@contextmanager
def transaction(self):
"""Context manager for atomic multi-statement transactions."""
conn = self._get_connection()
try:
conn.execute("BEGIN IMMEDIATE")
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
def execute(self, sql: str, params: tuple = ()) -> sqlite3.Cursor:
conn = self._get_connection()
return conn.execute(sql, params)
def executemany(self, sql: str, params_list: list) -> sqlite3.Cursor:
conn = self._get_connection()
return conn.executemany(sql, params_list)
def commit(self):
conn = self._get_connection()
conn.commit()
def fetchone(self, sql: str, params: tuple = ()) -> Optional[sqlite3.Row]:
return self.execute(sql, params).fetchone()
def fetchall(self, sql: str, params: tuple = ()) -> list[sqlite3.Row]:
return self.execute(sql, params).fetchall()
def close(self):
if hasattr(self._local, "conn") and self._local.conn:
self._local.conn.close()
self._local.conn = None
# Module-level singleton
_db: Database | None = None
def get_db(db_path: Optional[Path] = None) -> Database:
"""Get or create the module-level Database singleton."""
global _db
if _db is None:
_db = Database(db_path)
_db.initialize()
return _db
def initialize_db(db_path: Optional[Path] = None):
"""Initialize the database. Call once at startup."""
get_db(db_path)