#!/usr/bin/env python3
"""
Migration script: Ember Phase 1 → Ember V3
Reads individual JSON files from ~/.ember/embers/ and imports them
into the unified SQLite database at ~/.ember-v3/ember.db.
Also migrates edges from ~/.ember/cells/stats.db.
V1 field mapping:
ember_id → id
importance → tier (string type like "fact" maps to tier) + importance (numeric)
is_stale → is_shadowed
last_accessed_at → accessed_at
No embedding in JSON — embeddings stored in FAISS index only
Usage:
python -m ember.migrate_v1_to_v3 [--dry-run]
"""
import json
import sys
import time
import sqlite3
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("migrate")
V1_DIR = Path.home() / ".ember"
V1_EMBERS_DIR = V1_DIR / "embers"
V1_STATS_DB = V1_DIR / "cells" / "stats.db"
# V1 importance type (string) → V3 tier mapping
IMPORTANCE_TYPE_TO_TIER = {
"fact": "relational",
"decision": "session",
"preference": "relational",
"context": "session",
"learning": "session",
}
# V1 importance type → numeric importance value for V3
IMPORTANCE_TYPE_TO_SCORE = {
"fact": 0.7,
"decision": 0.6,
"preference": 0.8,
"context": 0.4,
"learning": 0.5,
}
def parse_timestamp(val) -> float:
"""Convert a V1 timestamp (ISO string or float) to epoch float."""
if val is None:
return time.time()
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
try:
dt = datetime.fromisoformat(val.replace("Z", "+00:00"))
return dt.timestamp()
except Exception:
return time.time()
return time.time()
def load_v1_embers() -> list[dict]:
"""Load all ember JSON files from Phase 1."""
if not V1_EMBERS_DIR.exists():
logger.error(f"V1 embers directory not found: {V1_EMBERS_DIR}")
return []
embers = []
for f in V1_EMBERS_DIR.glob("*.json"):
try:
data = json.loads(f.read_text())
embers.append(data)
except Exception as e:
logger.warning(f"Failed to read {f.name}: {e}")
logger.info(f"Loaded {len(embers)} embers from Phase 1")
return embers
def load_v1_edges() -> list[dict]:
"""Load edges from Phase 1 SQLite stats.db."""
if not V1_STATS_DB.exists():
logger.info("No V1 stats.db found — skipping edge migration")
return []
conn = sqlite3.connect(str(V1_STATS_DB))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute("SELECT * FROM edges").fetchall()
edges = [dict(r) for r in rows]
logger.info(f"Loaded {len(edges)} edges from Phase 1")
return edges
except sqlite3.OperationalError:
logger.info("No edges table in V1 stats.db")
return []
finally:
conn.close()
def determine_tier(ember: dict) -> str:
"""Map V1 ember to V3 tier based on importance type and age."""
importance_type = ember.get("importance", "")
# V1 importance is a string type (fact/decision/preference/context/learning)
if isinstance(importance_type, str) and importance_type in IMPORTANCE_TYPE_TO_TIER:
return IMPORTANCE_TYPE_TO_TIER[importance_type]
# If numeric importance (shouldn't happen in V1, but handle gracefully)
try:
importance_num = float(importance_type)
if importance_num >= 0.8:
return "relational"
except (TypeError, ValueError):
pass
# Old and stable → relational
created = parse_timestamp(ember.get("created_at"))
age_days = (time.time() - created) / 86400
if age_days > 90:
return "relational"
return "session"
def importance_to_score(ember: dict) -> float:
"""Convert V1 string importance type to numeric score for V3."""
importance_type = ember.get("importance", "")
if isinstance(importance_type, str) and importance_type in IMPORTANCE_TYPE_TO_SCORE:
return IMPORTANCE_TYPE_TO_SCORE[importance_type]
# If already numeric
try:
return float(importance_type)
except (TypeError, ValueError):
return 0.5
def migrate(dry_run: bool = False):
"""Run the full migration."""
embers = load_v1_embers()
edges = load_v1_edges()
if not embers:
logger.info("Nothing to migrate.")
return
if dry_run:
logger.info("\n--- DRY RUN ---")
tier_counts = {}
for e in embers:
tier = determine_tier(e)
tier_counts[tier] = tier_counts.get(tier, 0) + 1
for tier, count in sorted(tier_counts.items()):
logger.info(f" {tier}: {count} memories")
logger.info(f" edges: {len(edges)}")
logger.info("Run without --dry-run to execute migration.")
return
# Initialize V3 database
from ember.memory.db import initialize_db, get_db
initialize_db()
db = get_db()
migrated = 0
skipped = 0
migrated_ids = set()
for ember in embers:
try:
# V1 uses ember_id, not id
ember_id = ember.get("ember_id") or ember.get("id")
if not ember_id:
logger.debug("Skipping ember with no ID")
skipped += 1
continue
# Check if already migrated
existing = db.fetchone("SELECT id FROM memories WHERE id = ?", (ember_id,))
if existing:
skipped += 1
migrated_ids.add(ember_id)
continue
content = ember.get("content", "")
if not content:
logger.debug(f"Skipping {ember_id[:8]} — no content")
skipped += 1
continue
tier = determine_tier(ember)
importance = importance_to_score(ember)
# Parse timestamps
created_at = parse_timestamp(ember.get("created_at"))
updated_at = parse_timestamp(ember.get("updated_at") or ember.get("created_at"))
accessed_at = parse_timestamp(
ember.get("last_accessed_at") or ember.get("accessed_at") or ember.get("created_at")
)
# V1 tags — ensure string
tags = ember.get("tags", "") or ""
if isinstance(tags, list):
tags = ",".join(tags)
# V1 status — None becomes empty string
status = ember.get("status") or ""
# V1 source — default to migrated_v1
source = ember.get("source") or "migrated_v1"
# V1 source_path — None becomes empty string
source_path = ember.get("source_path") or ""
# is_shadowed: V1 uses is_stale
is_shadowed = 1 if ember.get("is_stale") else 0
# shadowed_by
shadowed_by = ember.get("shadowed_by")
# No embeddings in V1 JSON — they're in FAISS index
# We'll re-embed after migration if needed
embedding = None
db.execute(
"""INSERT INTO memories
(id, content, tier, importance, tags, source, status,
embedding, created_at, updated_at, accessed_at,
access_count, shadow_load, is_shadowed, shadowed_by,
source_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
ember_id,
content,
tier,
importance,
tags,
source,
status,
embedding,
created_at,
updated_at,
accessed_at,
ember.get("access_count", 0) or 0,
ember.get("shadow_load", 0.0) or 0.0,
is_shadowed,
shadowed_by,
source_path,
),
)
migrated += 1
migrated_ids.add(ember_id)
except Exception as e:
eid = ember.get("ember_id") or ember.get("id") or "?"
logger.warning(f"Failed to migrate {eid[:8]}: {e}")
skipped += 1
db.commit()
logger.info(f"\nMemories: {migrated} migrated, {skipped} skipped")
# Migrate edges — only where both source and target exist
edge_count = 0
edge_skipped = 0
for edge in edges:
try:
source_id = edge.get("source_id")
target_id = edge.get("target_id")
# Only insert edge if both memories were migrated
if source_id not in migrated_ids or target_id not in migrated_ids:
edge_skipped += 1
continue
# Parse edge timestamp (V1 stores as ISO string)
edge_created = parse_timestamp(edge.get("created_at"))
db.execute(
"""INSERT OR IGNORE INTO edges (source_id, target_id, edge_type, created_at)
VALUES (?, ?, ?, ?)""",
(
source_id,
target_id,
edge.get("edge_type", "related"),
edge_created,
),
)
edge_count += 1
except Exception as e:
logger.warning(f"Failed to migrate edge: {e}")
db.commit()
logger.info(f"Edges: {edge_count} migrated, {edge_skipped} skipped (missing refs)")
logger.info(f"Database: {db._db_path}")
def backfill_embeddings(batch_size: int = 50):
"""
Generate embeddings for all memories that don't have them.
Run after migration to enable semantic search.
Usage:
python -m ember.migrate_v1_to_v3 --embed
"""
from ember.memory.db import initialize_db, get_db
from ember.memory.embeddings import EmbeddingEngine
initialize_db()
db = get_db()
engine = EmbeddingEngine()
if not engine.is_semantic:
logger.error("Sentence-transformers not available — cannot generate embeddings")
return
rows = db.fetchall(
"SELECT id, content FROM memories WHERE embedding IS NULL"
)
total = len(rows)
logger.info(f"Backfilling embeddings for {total} memories...")
embedded = 0
for i in range(0, total, batch_size):
batch = rows[i:i + batch_size]
contents = [r["content"] for r in batch]
embeddings = engine.embed_batch(contents)
for row, emb in zip(batch, embeddings):
db.execute(
"UPDATE memories SET embedding = ? WHERE id = ?",
(emb, row["id"]),
)
embedded += 1
db.commit()
logger.info(f" Embedded {min(i + batch_size, total)}/{total}")
logger.info(f"Done — {embedded} memories now have embeddings")
if __name__ == "__main__":
if "--embed" in sys.argv:
backfill_embeddings()
else:
dry_run = "--dry-run" in sys.argv
migrate(dry_run=dry_run)