"""
CORAL-style context checkpointing for Ember V3.
Inspired by CORAL (Cognitive Resource Self-Allocation):
- Agent detects cognitive overload
- Saves state before context loss
- Resumes cleanly from saved state
Three checkpoint triggers:
1. Agent-requested: Claude calls ember_checkpoint explicitly
2. Token threshold: Working memory > 8000 chars → soft warning
3. Task switch: Agent stores a memory tagged task_switch=true
"""
from __future__ import annotations
import json
import time
import uuid
import logging
from typing import Optional
from ember.memory.db import get_db
logger = logging.getLogger("ember.checkpoint")
WORKING_MEMORY_CHAR_THRESHOLD = 8000
def save_checkpoint(
session_id: str,
state_summary: str,
active_task: str = "",
clear_working: bool = False,
) -> dict:
"""
Save a checkpoint of current cognitive state.
If clear_working=True, all working-tier memories are deleted after
the checkpoint is saved (freeing context).
"""
db = get_db()
now = time.time()
checkpoint_id = str(uuid.uuid4())
# Snapshot working memory IDs
rows = db.fetchall(
"SELECT id FROM memories WHERE tier = 'working' AND is_shadowed = 0"
)
working_ids = [r["id"] for r in rows]
checkpoint_state = json.dumps({
"checkpoint_id": checkpoint_id,
"created_at": now,
"active_task": active_task,
"state_summary": state_summary,
"working_memory_snapshot": working_ids,
"session_id": session_id,
})
# Store checkpoint in sessions table
db.execute(
"""UPDATE sessions SET checkpoint_state = ? WHERE id = ?""",
(checkpoint_state, session_id),
)
# Also store as a session-tier memory for retrieval
from ember.memory.operations import store_memory
store_memory(
content=f"Checkpoint: {state_summary}" + (f" | Task: {active_task}" if active_task else ""),
tier="session",
importance=0.7,
tags="checkpoint",
source="checkpoint",
)
if clear_working:
db.execute("DELETE FROM memories WHERE tier = 'working'")
logger.info(f"Cleared {len(working_ids)} working memories after checkpoint")
db.commit()
return {
"checkpoint_id": checkpoint_id,
"saved_at": now,
"working_memories_saved": len(working_ids),
"cleared": clear_working,
}
def load_checkpoint(
session_id: Optional[str] = None,
checkpoint_id: Optional[str] = None,
) -> Optional[dict]:
"""
Load the most recent checkpoint.
If checkpoint_id is provided, loads that specific checkpoint.
Otherwise loads the most recent from the given session,
or the most recent across all sessions.
"""
db = get_db()
if checkpoint_id:
# Search all sessions for this checkpoint
rows = db.fetchall(
"SELECT checkpoint_state FROM sessions WHERE checkpoint_state IS NOT NULL"
)
for row in rows:
state = json.loads(row["checkpoint_state"])
if state.get("checkpoint_id") == checkpoint_id:
return state
return None
if session_id:
row = db.fetchone(
"SELECT checkpoint_state FROM sessions WHERE id = ? AND checkpoint_state IS NOT NULL",
(session_id,),
)
else:
row = db.fetchone(
"""SELECT checkpoint_state FROM sessions
WHERE checkpoint_state IS NOT NULL
ORDER BY started_at DESC LIMIT 1"""
)
if row and row["checkpoint_state"]:
return json.loads(row["checkpoint_state"])
return None
def check_overload() -> dict:
"""
Check if working memory is approaching cognitive overload.
Returns:
{ overloaded: bool, char_count: int, threshold: int, recommendation: str }
"""
db = get_db()
row = db.fetchone(
"""SELECT COALESCE(SUM(LENGTH(content)), 0) as total_chars,
COUNT(*) as count
FROM memories
WHERE tier = 'working' AND is_shadowed = 0"""
)
char_count = row["total_chars"] if row else 0
memory_count = row["count"] if row else 0
overloaded = char_count > WORKING_MEMORY_CHAR_THRESHOLD
result = {
"overloaded": overloaded,
"char_count": char_count,
"memory_count": memory_count,
"threshold": WORKING_MEMORY_CHAR_THRESHOLD,
}
if overloaded:
result["recommendation"] = (
"Working memory is cluttered. Consider calling ember_checkpoint "
"to save state and optionally clear working memory."
)
return result