"""
OMEGA Migration: JSON graphs + JSONL sidecar → SQLite + sqlite-vec.
Reads the existing graph JSON files (semantic.json, temporal.json, causal.json,
entity.json) and JSONL store, generates embeddings, and inserts everything into
the new SQLite database.
Usage:
omega migrate-db # Interactive migration
omega migrate-db --force # Overwrite existing database
"""
import hashlib
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Set
from omega import json_compat as json
logger = logging.getLogger("omega.migrate")
OMEGA_DIR = Path(os.environ.get("OMEGA_HOME", str(Path.home() / ".omega")))
GRAPHS_DIR = OMEGA_DIR / "graphs"
DB_PATH = OMEGA_DIR / "omega.db"
def _read_json(path: Path) -> Dict:
"""Read a JSON file with orjson."""
return json.loads(path.read_bytes())
def _load_graph_nodes(graph_name: str) -> List[Dict[str, Any]]:
"""Load nodes from a graph JSON file."""
path = GRAPHS_DIR / f"{graph_name}.json"
if not path.exists():
return []
try:
data = _read_json(path)
nodes = data.get("nodes", [])
logger.info(f" {graph_name}.json: {len(nodes)} nodes")
return nodes
except Exception as e:
logger.warning(f" Failed to read {graph_name}.json: {e}")
return []
def _load_graph_edges(graph_name: str) -> List[Dict[str, Any]]:
"""Load edges from a graph JSON file."""
path = GRAPHS_DIR / f"{graph_name}.json"
if not path.exists():
return []
try:
data = _read_json(path)
edges = data.get("edges", [])
logger.info(f" {graph_name}.json: {len(edges)} edges")
return edges
except Exception as e:
logger.warning(f" Failed to read {graph_name}.json edges: {e}")
return []
def _load_jsonl_entries(store_path: Path) -> List[Dict[str, Any]]:
"""Load entries from the JSONL sidecar store."""
if not store_path.exists():
return []
entries = []
errors = 0
for line in store_path.read_text().splitlines():
line = line.strip()
if not line:
continue
try:
entry = json.loads(line.encode() if isinstance(line, str) else line)
entries.append(entry)
except Exception:
errors += 1
if errors:
logger.warning(f" {errors} malformed JSONL lines skipped")
return entries
def _normalize_metadata(node_data: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize metadata from graph node format."""
meta = dict(node_data.get("metadata", {}) or {})
# Unify "type" -> "event_type"
if "type" in meta and "event_type" not in meta:
meta["event_type"] = meta["type"]
return meta
def migrate(force: bool = False, batch_size: int = 50) -> Dict[str, Any]:
"""Run the full migration from JSON graphs to SQLite.
Returns a report dict with counts.
"""
report = {
"nodes_found": 0,
"nodes_migrated": 0,
"edges_migrated": 0,
"duplicates_skipped": 0,
"jsonl_only": 0,
"errors": 0,
"warnings": [],
}
# Check for existing database
if DB_PATH.exists() and not force:
# Check if it has data
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
try:
count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]
if count > 0:
report["warnings"].append(f"Database already exists with {count} memories. Use --force to overwrite.")
return report
except Exception:
pass # Table doesn't exist yet, safe to proceed
finally:
conn.close()
if DB_PATH.exists() and force:
# Back up before overwriting
backup_path = DB_PATH.with_suffix(".db.bak")
import shutil
shutil.copy2(DB_PATH, backup_path)
DB_PATH.unlink()
logger.info(f" Backed up existing database to {backup_path}")
# ---- Phase 1: Load all nodes from JSON graphs ----
print("\n[1/4] Loading graph data...")
semantic_nodes = _load_graph_nodes("semantic")
temporal_nodes = _load_graph_nodes("temporal")
causal_nodes = _load_graph_nodes("causal")
entity_data = {}
entity_path = GRAPHS_DIR / "entity.json"
if entity_path.exists():
try:
entity_data = _read_json(entity_path)
print(" entity.json: loaded")
except Exception as e:
logger.warning(f" Failed to read entity.json: {e}")
# Deduplicate nodes across graphs (semantic is the canonical source)
all_nodes: Dict[str, Dict[str, Any]] = {}
for node in semantic_nodes:
nid = node.get("id")
if nid:
all_nodes[nid] = node
# Add any nodes from temporal/causal that aren't in semantic
for nodes in [temporal_nodes, causal_nodes]:
for node in nodes:
nid = node.get("id")
if nid and nid not in all_nodes:
all_nodes[nid] = node
report["nodes_found"] = len(all_nodes)
print(f" Total unique nodes: {len(all_nodes)}")
# ---- Phase 2: Load JSONL entries not in graphs ----
store_path = OMEGA_DIR / "store.jsonl"
jsonl_entries = _load_jsonl_entries(store_path)
print(f" JSONL entries: {len(jsonl_entries)}")
# Find entries in JSONL but not in graphs
graph_content_hashes: Set[str] = set()
for node in all_nodes.values():
content = node.get("content", "")
if content:
graph_content_hashes.add(hashlib.sha256(content.encode()).hexdigest())
jsonl_only_entries = []
for entry in jsonl_entries:
content = entry.get("content", "")
if content:
h = hashlib.sha256(content.encode()).hexdigest()
if h not in graph_content_hashes:
jsonl_only_entries.append(entry)
graph_content_hashes.add(h) # Prevent JSONL-internal dupes
if jsonl_only_entries:
print(f" JSONL-only entries (not in graphs): {len(jsonl_only_entries)}")
report["jsonl_only"] = len(jsonl_only_entries)
# ---- Phase 3: Generate embeddings and insert into SQLite ----
print(f"\n[2/4] Generating embeddings for {len(all_nodes) + len(jsonl_only_entries)} entries...")
from omega.sqlite_store import SQLiteStore
store = SQLiteStore(db_path=DB_PATH)
try:
return _migrate_into_store(store, all_nodes, jsonl_only_entries, entity_data, report)
finally:
store.close()
def _migrate_into_store(
store,
all_nodes,
jsonl_only_entries,
entity_data,
report,
) -> Dict[str, Any]:
"""Insert nodes, edges, and entities into the store. Called by migrate()."""
# Prepare all items for batch insertion
items_to_store: List[Dict[str, Any]] = []
for node_data in all_nodes.values():
meta = _normalize_metadata(node_data)
content = node_data.get("content", "")
if not content.strip():
continue
items_to_store.append(
{
"node_id": node_data.get("id"),
"content": content,
"metadata": meta,
"event_type": meta.get("event_type", ""),
"session_id": meta.get("session_id", ""),
"project": meta.get("project", ""),
"created_at": node_data.get("created_at"),
"access_count": node_data.get("access_count", 0),
"last_accessed": node_data.get("last_accessed"),
"ttl_seconds": node_data.get("ttl_seconds"),
"embedding": node_data.get("embedding"), # Usually None (not in JSON)
}
)
for entry in jsonl_only_entries:
content = entry.get("content", "")
if not content.strip():
continue
meta = {}
for key in ["event_type", "session_id", "project", "type"]:
if key in entry:
meta[key] = entry[key]
if "type" in meta and "event_type" not in meta:
meta["event_type"] = meta.pop("type")
items_to_store.append(
{
"node_id": entry.get("id"),
"content": content,
"metadata": meta,
"event_type": meta.get("event_type", ""),
"session_id": meta.get("session_id", entry.get("session_id", "")),
"project": meta.get("project", entry.get("project", "")),
"created_at": entry.get("timestamp") or entry.get("created_at"),
"access_count": 0,
"last_accessed": None,
"ttl_seconds": None,
"embedding": None,
}
)
# Batch-generate embeddings
items_needing_embedding = [i for i, item in enumerate(items_to_store) if item.get("embedding") is None]
if items_needing_embedding:
print(f" Generating embeddings for {len(items_needing_embedding)} nodes...")
try:
from omega.graphs import generate_embedding
done = 0
for idx in items_needing_embedding:
content = items_to_store[idx]["content"]
try:
emb = generate_embedding(content)
items_to_store[idx]["embedding"] = emb
except Exception:
pass
done += 1
if done % 100 == 0:
print(f" {done}/{len(items_needing_embedding)} embeddings generated...")
print(f" {done}/{len(items_needing_embedding)} embeddings generated.")
except ImportError:
print(" WARNING: Could not import embedding model. Nodes will be stored without embeddings.")
report["warnings"].append("Embedding model not available; stored without vectors")
# Insert into SQLite
print(f"\n[3/4] Inserting {len(items_to_store)} memories into SQLite...")
migrated = 0
dupes = 0
errors = 0
for i, item in enumerate(items_to_store):
try:
# Build metadata for store()
meta = dict(item.get("metadata", {}))
if item.get("event_type"):
meta["event_type"] = item["event_type"]
if item.get("session_id"):
meta["session_id"] = item["session_id"]
if item.get("project"):
meta["project"] = item["project"]
if item.get("created_at"):
meta["_original_created_at"] = item["created_at"]
if item.get("access_count"):
meta["access_count"] = item["access_count"]
if item.get("last_accessed"):
meta["last_accessed"] = item["last_accessed"]
node_id = store.store(
content=item["content"],
session_id=item.get("session_id"),
metadata=meta,
embedding=item.get("embedding"),
ttl_seconds=item.get("ttl_seconds"),
skip_inference=True, # We already generated embeddings
)
if node_id:
migrated += 1
else:
dupes += 1
except Exception as e:
errors += 1
if errors <= 5:
logger.warning(f" Error migrating node: {e}")
if (i + 1) % 200 == 0:
print(f" {i + 1}/{len(items_to_store)} inserted...")
report["nodes_migrated"] = migrated
report["duplicates_skipped"] = dupes
report["errors"] = errors
# ---- Phase 4: Migrate causal edges ----
# Note: _conn direct access is intentional — migration is single-threaded
# and offline, so the store's _lock is not needed.
print("\n[4/4] Migrating edges...")
causal_edges = _load_graph_edges("causal")
edges_migrated = 0
for edge_data in causal_edges:
try:
source = edge_data.get("source")
target = edge_data.get("target")
if source and target:
store._conn.execute(
"INSERT OR IGNORE INTO edges (source_id, target_id, edge_type, weight, metadata) VALUES (?, ?, ?, ?, ?)",
(
source,
target,
edge_data.get("edge_type", "causal"),
edge_data.get("weight", 1.0),
json.dumps(edge_data.get("metadata", {})),
),
)
edges_migrated += 1
except Exception as e:
if edges_migrated == 0:
logger.warning(f" Edge migration error: {e}")
store._conn.commit()
report["edges_migrated"] = edges_migrated
# Migrate entity index
entity_index = entity_data.get("entity_index", {})
if entity_index:
for entity_id, node_ids in entity_index.items():
for nid in node_ids:
try:
store._conn.execute(
"INSERT OR IGNORE INTO entity_index (entity_id, node_id) VALUES (?, ?)",
(entity_id, nid),
)
except Exception:
pass
store._conn.commit()
print(f" Entity index: {len(entity_index)} entities migrated")
# ---- Rename old files ----
store_path = OMEGA_DIR / "store.jsonl"
backed_up = []
for name in ["semantic.json", "temporal.json", "causal.json", "entity.json"]:
old = GRAPHS_DIR / name
if old.exists():
bak = old.with_suffix(".json.bak")
old.rename(bak)
backed_up.append(name)
# Also back up the JSONL sidecar
if store_path.exists():
store_path.rename(store_path.with_suffix(".jsonl.pre-sqlite"))
# Back up auxiliary index files
for name in ["stats.json", "type_index.json", "session_index.json", "project_index.json", "feedback_index.json"]:
old = GRAPHS_DIR / name
if old.exists():
old.rename(old.with_suffix(".json.bak"))
# Clean up lock files and WAL
for lock in GRAPHS_DIR.glob("*.lock"):
lock.unlink()
wal_path = GRAPHS_DIR / "wal.jsonl"
if wal_path.exists():
wal_path.rename(wal_path.with_suffix(".jsonl.bak"))
if backed_up:
print(f" Backed up: {', '.join(backed_up)}")
print(f"\n{'=' * 50}")
print("Migration complete!")
print(f" Migrated: {migrated} memories")
print(f" Duplicates: {dupes} skipped")
print(f" Edges: {edges_migrated}")
print(f" Errors: {errors}")
print(f" Database: {DB_PATH} ({DB_PATH.stat().st_size / 1024:.0f} KB)")
if backed_up:
print(f" Old files renamed to .bak in {GRAPHS_DIR}")
return report
def auto_migrate_if_needed() -> bool:
"""Auto-migrate on first run if JSON graphs exist but SQLite doesn't.
Called by bridge._get_store() to handle transparent migration.
Returns True if migration was performed.
"""
# Already have a database with data — no migration needed
if DB_PATH.exists():
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
try:
count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]
if count > 0:
return False
except Exception:
pass # Table doesn't exist yet, proceed with migration
finally:
conn.close()
# Check if there are JSON graphs to migrate
has_graphs = any((GRAPHS_DIR / f"{name}.json").exists() for name in ["semantic", "temporal", "causal"])
has_jsonl = (OMEGA_DIR / "store.jsonl").exists()
if not has_graphs and not has_jsonl:
return False # Fresh install, no migration needed
print("OMEGA: Auto-migrating to SQLite backend...")
try:
report = migrate(force=False)
if report.get("nodes_migrated", 0) > 0:
print(f"OMEGA: Migrated {report['nodes_migrated']} memories to SQLite.")
return True
return False
except Exception as e:
logger.error(f"Auto-migration failed: {e}", exc_info=True)
print(f"OMEGA: Auto-migration failed: {e}")
print(" Run 'omega migrate-db --force' to retry manually.")
return False