import asyncio
import logging
import os
import signal
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List
import aiofiles
import numpy as np
from mcp.server.fastmcp import FastMCP
from ember.core import VectorEngine
from ember.storage import StorageManager
from ember.config import load_config
from ember.models import Ember, DECAY_HALF_LIVES
from ember.utils import (
compute_temporal_score,
compute_shadow_potential,
compute_shadow_load,
compute_topic_vitality,
compute_hestia_score,
compute_hallucination_risk,
detect_kg_edges,
update_region_stats,
)
# ---------------------------------------------------------------------------
# Token Budget Constants
# ---------------------------------------------------------------------------
PREVIEW_CHARS = 150 # Max chars of ember preview in recall/auto
SOURCE_FILE_MAX_CHARS = 2000 # Max chars per source file in deep recall
MAX_SOURCE_FILES = 3 # Max source files to read in deep recall
# Initialize FastMCP with Shadow-Decay instructions
mcp = FastMCP(
"Ember",
instructions=(
"Ember is persistent memory. On conversation start, call ember_auto with a "
"summary of the user's first message. After each response where the user "
"shared facts, preferences, decisions, or corrections, call ember_learn "
"silently. Never announce memory operations. Never mention internal mechanics "
"(vectors, scores, cells). Use recalled memories naturally. When correcting "
"old info, use ember_contradict. For source file detail, use ember_deep_recall. "
"Use ember_read to get full content of a specific memory. "
"Use ember_compact to identify and compress stale memories. "
"Use ember_actionable to view task items, ember_set_status to update them."
),
)
# Global singleton instances (lazy loaded)
_engine: Optional[VectorEngine] = None
_storage: Optional[StorageManager] = None
async def _ensure_init():
"""Lazy initialization of the VectorEngine and StorageManager."""
global _engine, _storage
if _engine is None or _storage is None:
config = load_config()
_engine = VectorEngine(config)
_storage = StorageManager(config)
await _storage.init_db()
async def _reload_from_disk():
"""Reload FAISS index and ID mappings from disk to pick up external writes."""
_engine.reload_index()
await _storage.reload_id_map()
def _shutdown_handler(signum, frame):
"""Flush dirty index and release locks on SIGTERM/SIGINT."""
if _engine is not None:
_engine._save_index_sync()
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _shutdown_handler)
signal.signal(signal.SIGINT, _shutdown_handler)
# ---------------------------------------------------------------------------
# Helpers: Preview + Fetch-and-Rerank + Shadow-on-Insert
# ---------------------------------------------------------------------------
def _make_preview(text: str, max_chars: int = PREVIEW_CHARS) -> str:
"""Create a preview of text for token-efficient search results."""
if len(text) <= max_chars:
return text
return text[:max_chars].rsplit(" ", 1)[0] + "..."
async def _fetch_and_rerank(
query_text: str, top_k: int = 5, fetch_multiplier: int = 10
) -> list:
"""
Fetch-and-Rerank pipeline using HESTIA scoring.
1. Embed query
2. Broad FAISS search: k = top_k * fetch_multiplier candidates
3. Compute topic vitality V(q,t) via radius search
4. Batch-load all candidate + radius embers concurrently (get_embers_batch)
5. For each candidate: compute HESTIA score using cached shadow_load
6. Sort by HESTIA score, return top_k as list of (ember, score, breakdown)
7. Flush access-count updates concurrently after scoring (deferred write-back)
"""
now = datetime.now(timezone.utc)
config = load_config()
vector = await _engine.embed(query_text)
total = _engine.memory_index.ntotal
fetch_k = min(top_k * fetch_multiplier, total) if total > 0 else 0
if fetch_k == 0:
return []
results = _engine.search(vector, top_k=fetch_k)
if not results:
return []
# Topic vitality: search within radius, get neighbor times
radius_l2 = VectorEngine.cosine_to_l2(1.0 - config.topic_radius)
radius_results = _engine.search_radius(vector, radius_l2)
# --- Batch load all embers needed (candidates + radius neighbors) ---
candidate_uuids = [
_storage.int_to_uuid[fid]
for fid, _ in results
if fid in _storage.int_to_uuid
]
radius_uuids = [
_storage.int_to_uuid[fid]
for fid, _ in radius_results
if fid in _storage.int_to_uuid
]
all_uuids = list(dict.fromkeys(candidate_uuids + radius_uuids)) # dedup, preserve order
ember_cache = await _storage.get_embers_batch(all_uuids)
# Build vitality inputs from batch cache
neighbor_times = []
neighbor_dists = []
for faiss_id, dist_sq in radius_results:
uuid = _storage.int_to_uuid.get(faiss_id)
if uuid:
ember = ember_cache.get(uuid)
if ember:
neighbor_times.append(ember.created_at)
neighbor_dists.append(dist_sq)
vitality = compute_topic_vitality(
neighbor_dists, neighbor_times, now, radius_l2, config.vitality_lambda
)
# Score each candidate with HESTIA (all embers already in cache)
scored = []
v_max = max(vitality, 0.001) # prevent division by zero
to_update: list[Ember] = []
for faiss_id, dist in results:
uuid = _storage.int_to_uuid.get(faiss_id)
if not uuid:
continue
ember = ember_cache.get(uuid)
if not ember:
continue
cos_sim = VectorEngine.l2_to_cosine(dist)
score, breakdown = compute_hestia_score(
cos_sim,
ember.shadow_load,
vitality,
v_max,
config.shadow_gamma,
config.nostalgia_alpha,
)
# Stage access-count update (deferred — flushed concurrently after scoring)
ember.last_accessed_at = now
ember.access_count += 1
to_update.append(ember)
scored.append((ember, score, breakdown))
scored.sort(key=lambda x: x[1], reverse=True)
# Flush all access-count updates concurrently (deferred write-back)
if to_update:
await asyncio.gather(*(_storage.update_ember(e) for e in to_update))
return scored[:top_k]
async def _shadow_on_insert(new_ember: Ember, vector: np.ndarray) -> None:
"""Shadow-on-Insert: update shadow_load on existing neighbors bidirectionally."""
config = load_config()
# Ensure vector is 2D (1, dim) for FAISS compatibility
if vector.ndim == 1:
vector = vector.reshape(1, -1)
results = _engine.search(vector, top_k=config.shadow_k)
if not results:
return
cos_sims = []
shadow_potentials = []
neighbor_ids = []
for faiss_id, dist in results:
uuid = _storage.int_to_uuid.get(faiss_id)
if not uuid or uuid == new_ember.ember_id:
continue
neighbor = await _storage.get_ember(uuid)
if not neighbor:
continue
cos_sim = VectorEngine.l2_to_cosine(dist)
phi = compute_shadow_potential(
cos_sim,
neighbor.created_at,
new_ember.created_at,
config.shadow_delta,
config.shadow_epsilon,
)
cos_sims.append(cos_sim)
shadow_potentials.append(phi)
neighbor_ids.append(uuid)
# Update shadow_load on older neighbors if new ember shadows them harder
if phi > neighbor.shadow_load:
neighbor.shadow_load = phi
neighbor.shadowed_by = new_ember.ember_id
neighbor.shadow_updated_at = datetime.now(timezone.utc)
await _storage.update_ember(neighbor)
await _storage.save_edge(
neighbor.ember_id, new_ember.ember_id, "shadow", phi
)
# Update region stats with conflict density
region = await _storage.get_region_stats(neighbor.cell_id)
updated_region = update_region_stats(neighbor.cell_id, phi, region)
await _storage.update_region(
updated_region.cell_id,
updated_region.vitality_score,
updated_region.shadow_accum,
)
# Detect KG edges (related but not shadowing)
kg_edges = detect_kg_edges(cos_sims, shadow_potentials, neighbor_ids)
if kg_edges:
new_ember.related_ids = kg_edges[:5]
await _storage.update_ember(new_ember)
for related_id in kg_edges:
await _storage.save_edge(new_ember.ember_id, related_id, "related", 0.0)
# ---------------------------------------------------------------------------
# MCP Tools: Core Memory Operations
# ---------------------------------------------------------------------------
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_store(
name: str,
content: str,
tags: str = "",
importance: str = "context",
source_path: str = "",
status: str = "",
edges: str = "",
) -> str:
"""
Store a memory ember with importance level. The content will be embedded and assigned
to a Voronoi cell for persistent retrieval.
Args:
name: Short descriptive name for this memory
content: The actual content to remember
tags: Comma-separated tags for categorization
importance: One of: fact, decision, preference, context, learning
source_path: Optional path to the source file this info came from.
Enables deep recall to read the full source for richer context.
status: Optional task status: open, in_progress, done, or empty for no status.
edges: Optional edges to create. Format: "type:ember_id,type:ember_id"
Valid types: depends_on, child_of, context_for
Example: "depends_on:abc123,context_for:def456"
"""
await _ensure_init()
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
valid_importance = ["fact", "decision", "preference", "context", "learning"]
if importance not in valid_importance:
return f"Error: invalid importance '{importance}'. Valid values: {', '.join(valid_importance)}."
valid_statuses = ["open", "in_progress", "done"]
if status and status not in valid_statuses:
return f"Error: invalid status '{status}'. Valid values: {', '.join(valid_statuses)}, or empty for no status."
resolved_status = status if status in valid_statuses else None
vector = await _engine.embed(content)
cell_id = _engine.assign_cell(vector)
ember = Ember(
name=name,
content=content,
tags=tag_list,
cell_id=cell_id,
importance=importance,
source="manual",
source_path=source_path if source_path else None,
status=resolved_status,
)
faiss_id = await _storage.save_ember(ember)
await _engine.add_vector(faiss_id, vector)
await _shadow_on_insert(ember, vector.flatten())
# Process user-specified typed edges
if edges:
from ember.models import USER_EDGE_TYPES
for edge_spec in edges.split(","):
edge_spec = edge_spec.strip()
if ":" not in edge_spec:
continue
edge_type, target_id = edge_spec.split(":", 1)
edge_type = edge_type.strip()
target_id = target_id.strip()
if edge_type in USER_EDGE_TYPES:
target = await _storage.get_ember(target_id)
if target:
await _storage.save_edge(
ember.ember_id, target_id, edge_type, 0.0
)
half_life = DECAY_HALF_LIVES.get(importance, 30.0)
status_note = f" Status: {resolved_status}." if resolved_status else ""
return (
f"Stored ember '{name}' (ID: {ember.ember_id}) in Cell {cell_id}. "
f"Importance: {importance} (half-life: {int(half_life)}d){status_note}"
)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_recall(query: str, top_k: int = 5) -> str:
"""
Retrieve memory embers semantically similar to the query, ranked by HESTIA score.
Newer, non-shadowed, actively-discussed memories rank higher.
"""
await _ensure_init()
await _reload_from_disk()
now = datetime.now(timezone.utc)
scored = await _fetch_and_rerank(query, top_k=top_k)
if not scored:
return "No embers found."
lines = []
for ember, score, breakdown in scored:
age_days = (now - ember.created_at).total_seconds() / 86400.0
freshness = "fresh" if age_days < 7 else f"{int(age_days)}d ago"
stale_mark = " [STALE]" if ember.is_stale else ""
shadow_mark = f" [shadow:{ember.shadow_load:.1f}]" if ember.shadow_load > 0.1 else ""
source_note = f"\n source: {ember.source_path}" if ember.source_path else ""
preview = _make_preview(ember.content)
lines.append(
f"🔥 {ember.name} [id: {ember.ember_id}] "
f"(score: {score:.2f}, {freshness}{stale_mark}{shadow_mark})\n"
f" {preview}{source_note}"
)
lines.append("\n→ Use ember_read(id) for full content of any memory.")
return "\n\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_deep_recall(query: str, top_k: int = 3) -> str:
"""
Retrieve memory embers AND read their source files for full context.
Use this when you need deeper detail than what the ember summary provides.
Flow: semantic search → find embers → read source files on disk → return combined context.
Args:
query: What to search for in memory
top_k: Number of embers to retrieve (default 3, reads source files for each)
"""
await _ensure_init()
await _reload_from_disk()
now = datetime.now(timezone.utc)
scored = await _fetch_and_rerank(query, top_k=top_k)
if not scored:
return "No embers found."
lines = []
source_contents = {} # Deduplicate source file reads
for ember, score, breakdown in scored:
age_days = (now - ember.created_at).total_seconds() / 86400.0
freshness = "fresh" if age_days < 7 else f"{int(age_days)}d ago"
stale_mark = " [STALE]" if ember.is_stale else ""
# Deep recall returns full content (this IS the "read the full page" action)
lines.append(
f"🔥 {ember.name} [id: {ember.ember_id}] "
f"(score: {score:.2f}, {freshness}{stale_mark})\n{ember.content}"
)
# Read source file if available, not already read, and under cap
if (
ember.source_path
and ember.source_path not in source_contents
and len(source_contents) < MAX_SOURCE_FILES
):
# Security: only read files within the user's home directory
try:
safe_root = Path.home().resolve()
resolved = Path(ember.source_path).expanduser().resolve()
path_is_safe = str(resolved).startswith(str(safe_root))
except Exception:
path_is_safe = False
if not path_is_safe:
source_contents[ember.source_path] = "[Skipped: path outside home directory]"
elif os.path.isfile(ember.source_path):
try:
async with aiofiles.open(ember.source_path, mode="r") as f:
raw = await f.read()
source_contents[ember.source_path] = _make_preview(raw, SOURCE_FILE_MAX_CHARS)
except Exception as e:
source_contents[ember.source_path] = f"[Error reading: {e}]"
else:
source_contents[ember.source_path] = "[File not found]"
output = "\n\n---\n\n".join(lines)
if source_contents:
output += "\n\n===== SOURCE FILES =====\n"
for path, content in source_contents.items():
output += f"\n--- {path} ---\n{content}\n"
return output
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_learn(conversation_context: str, source_path: str = "") -> str:
"""
Auto-capture key information from conversation. Extracts facts, preferences,
decisions, and learnings — then stores them as temporal embers.
Call this silently after every substantive user message.
The LLM should extract and classify the information before calling this.
Args:
conversation_context: The key information to capture, formatted as:
"TYPE: description" where TYPE is fact/decision/preference/learning
Example: "preference: User prefers TypeScript with strict mode"
source_path: Optional path to the source file (handoff, document) this info came from.
When provided, enables deep recall to read the full source for richer context.
"""
await _ensure_init()
await _reload_from_disk()
# Parse the type prefix
importance = "context"
content = conversation_context
for itype in ["fact", "decision", "preference", "learning", "context"]:
if conversation_context.lower().startswith(f"{itype}:"):
importance = itype
content = conversation_context[len(itype) + 1 :].strip()
break
# Generate a concise name from first ~60 chars
name = content[:60].strip()
if len(content) > 60:
name = name.rsplit(" ", 1)[0] + "..."
vector = await _engine.embed(content)
cell_id = _engine.assign_cell(vector)
# Check for near-duplicates
existing = _engine.search(vector, top_k=3)
for faiss_id, dist in existing:
if dist < 0.1: # Very similar (normalized L2 < 0.1 means almost identical)
uuid = _storage.int_to_uuid.get(faiss_id)
if uuid:
existing_ember = await _storage.get_ember(uuid)
if existing_ember and not existing_ember.is_stale:
existing_ember.last_accessed_at = datetime.now(timezone.utc)
existing_ember.access_count += 1
await _storage.update_ember(existing_ember)
return f"Reinforced existing ember: '{existing_ember.name}'"
ember = Ember(
name=name,
content=content,
tags=["auto-captured", importance],
cell_id=cell_id,
importance=importance,
source="auto",
source_path=source_path if source_path else None,
)
faiss_id = await _storage.save_ember(ember)
await _engine.add_vector(faiss_id, vector)
await _shadow_on_insert(ember, vector.flatten())
return f"Captured {importance}: '{name}'"
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": True, "openWorldHint": False})
async def ember_contradict(ember_id: str, new_content: str, reason: str = "") -> str:
"""
Mark an existing memory as stale and store an updated version.
Use when the user corrects or updates previously stored information.
Args:
ember_id: The ID of the ember to mark stale (use ember_recall to find it)
new_content: The corrected/updated information
reason: Why the old information is stale
"""
await _ensure_init()
# Mark old ember as fully shadowed
old_ember = await _storage.get_ember(ember_id)
if not old_ember:
return f"Ember {ember_id} not found."
old_ember.is_stale = True
old_ember.stale_reason = reason or "Superseded by newer information"
old_ember.shadow_load = 1.0
# Store new version with supersedes link
vector = await _engine.embed(new_content)
cell_id = _engine.assign_cell(vector)
new_ember = Ember(
name=old_ember.name,
content=new_content,
tags=old_ember.tags,
cell_id=cell_id,
importance=old_ember.importance,
supersedes_id=ember_id,
source="manual",
)
# Set bidirectional links
old_ember.shadowed_by = new_ember.ember_id
old_ember.shadow_updated_at = datetime.now(timezone.utc)
new_ember.superseded_by_id = None # new ember is the current version
await _storage.update_ember(old_ember)
faiss_id = await _storage.save_ember(new_ember)
await _engine.add_vector(faiss_id, vector)
# Create supersedes edge
await _storage.save_edge(ember_id, new_ember.ember_id, "supersedes", 1.0)
# Shadow-on-Insert for the new ember
await _shadow_on_insert(new_ember, vector.flatten())
return (
f"Updated memory: '{old_ember.name}'. "
f"Old version fully shadowed. New version: {new_ember.ember_id}"
)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_list(tag: str = "", limit: int = 20, offset: int = 0) -> str:
"""List stored memory embers with pagination. Returns metadata only (no content).
Args:
tag: Optional tag filter
limit: Max results per page (default 20)
offset: Skip this many results (default 0)
"""
await _ensure_init()
await _reload_from_disk()
all_embers = await _storage.list_embers(tag=tag if tag else None)
if not all_embers:
return "No embers stored." if not tag else f"No embers with tag '{tag}'."
total = len(all_embers)
page = all_embers[offset : offset + limit]
lines = []
now = datetime.now(timezone.utc)
for a in page:
age_days = (now - a.created_at).total_seconds() / 86400.0
freshness = "today" if age_days < 1 else f"{int(age_days)}d ago"
stale = " [STALE]" if a.is_stale else ""
shadow = f" [shadow:{a.shadow_load:.1f}]" if a.shadow_load > 0.1 else ""
lines.append(
f"• {a.name} ({a.importance}) [{freshness}]{stale}{shadow} "
f"[id: {a.ember_id}]"
)
start = offset + 1
end = min(offset + limit, total)
header = f"Showing {start}-{end} of {total} embers"
if end < total:
header += f" (use offset={end} for more)"
header += ":"
return header + "\n" + "\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": True, "openWorldHint": False})
async def ember_delete(ember_id: str) -> str:
"""Delete a memory ember by its ID (UUID)."""
await _ensure_init()
faiss_id = await _storage.delete_ember(ember_id)
if faiss_id is None:
return f"Ember {ember_id} not found."
try:
await _engine.remove_vector(faiss_id)
except Exception as e:
logging.warning("FAISS remove_vector failed for ember %s (faiss_id=%s): %s", ember_id, faiss_id, e)
return f"Ember {ember_id} deleted."
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_inspect(cell_id: int = -1) -> str:
"""Inspect Voronoi cell health. Shows ember distribution and conflict density."""
await _ensure_init()
config = load_config()
if cell_id >= 0:
stats = await _storage.get_region_stats(cell_id)
if not stats:
return f"Cell {cell_id}: no data"
return (
f"Cell {cell_id}: vitality={stats.vitality_score:.3f}, "
f"conflict_density={stats.shadow_accum:.3f}, "
f"last_updated={stats.last_updated}"
)
# Overview: count embers per cell without loading content
all_embers = await _storage.list_embers()
cell_counts = {}
for e in all_embers:
cell_counts[e.cell_id] = cell_counts.get(e.cell_id, 0) + 1
total = len(all_embers)
lines = [f"Voronoi Cell Map ({config.k_cells} cells, {total} embers):"]
for i in range(config.k_cells):
count = cell_counts.get(i, 0)
bar = "█" * min(count, 20)
stats = await _storage.get_region_stats(i)
conflict = f" conflict:{stats.shadow_accum:.2f}" if stats else ""
lines.append(f" Cell {i:2d}: {bar} {count}{conflict}")
return "\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_auto(conversation_context: str) -> str:
"""
Automatically retrieve relevant memory embers based on conversation context.
Uses HESTIA scoring — non-shadowed, actively-discussed memories rank higher.
Call at the start of every conversation.
"""
await _ensure_init()
await _reload_from_disk()
# Over-fetch so session embers outside top-5 can be boosted in
scored = await _fetch_and_rerank(conversation_context, top_k=10)
if not scored:
return "No embers found."
# --- Session Boost: prioritize recent session handoffs ---
now = datetime.now(timezone.utc)
session_embers = [
(e, s, bd) for e, s, bd in scored if e.source == "session"
]
latest_session_time = None
if session_embers:
latest_session_time = max(e.created_at for e, _, _ in session_embers)
boosted = []
for ember, score, breakdown in scored:
boost = 1.0
if ember.source == "session":
boost = 1.5 # all session embers get 1.5x
if latest_session_time and ember.created_at == latest_session_time:
boost = 2.0 # most recent session gets 2.0x
boosted.append((ember, score * boost, breakdown))
boosted.sort(key=lambda x: x[1], reverse=True)
scored = boosted[:5]
# --- End Session Boost ---
lines = []
has_sources = False
for ember, score, breakdown in scored:
stale_note = " (outdated)" if ember.is_stale else ""
preview = _make_preview(ember.content)
lines.append(
f"🔥 {ember.name} [id: {ember.ember_id}]{stale_note}: {preview}"
)
if ember.source_path:
has_sources = True
if has_sources:
lines.append("\n→ Use ember_deep_recall for source file context, ember_read(id) for full content.")
return "\n\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_read(ember_id: str) -> str:
"""
Read the full content of a specific ember by ID.
Use this after ember_recall/ember_auto returns previews
and you need the complete content for a specific memory.
Args:
ember_id: The ID of the ember to read (shown in search results as [id: ...])
"""
await _ensure_init()
ember = await _storage.get_ember(ember_id)
if not ember:
return f"Ember {ember_id} not found."
tags_str = ", ".join(ember.tags) if ember.tags else "none"
source = f"\nSource: {ember.source_path}" if ember.source_path else ""
return (
f"🔥 {ember.name} ({ember.importance})\n\n"
f"{ember.content}\n\n"
f"Tags: {tags_str}{source}"
)
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_save_session(
summary: str,
decisions: str = "",
next_steps: str = "",
source_path: str = "",
) -> str:
"""
Save key takeaways from the current session. Call before ending a conversation
where important work was done.
Args:
summary: Brief summary of the session's key work
decisions: Decisions made during the session
next_steps: Open items and next actions
source_path: Optional path to the handoff file on disk.
When provided, enables deep recall to read the full handoff for richer context.
"""
await _ensure_init()
saved = []
resolved_source = source_path if source_path else None
async def _store_session_ember(name, content, tags, importance):
vector = await _engine.embed(content)
cell_id = _engine.assign_cell(vector)
ember = Ember(
name=name,
content=content,
tags=["session"] + tags,
cell_id=cell_id,
importance=importance,
source="session",
source_path=resolved_source,
)
faiss_id = await _storage.save_ember(ember)
await _engine.add_vector(faiss_id, vector)
await _shadow_on_insert(ember, vector.flatten())
if summary:
await _store_session_ember("Session Summary", summary, ["summary"], "context")
saved.append("summary")
if decisions:
await _store_session_ember(
"Session Decisions", decisions, ["decisions"], "decision"
)
saved.append("decisions")
if next_steps:
await _store_session_ember(
"Next Steps", next_steps, ["next_steps"], "learning"
)
saved.append("next steps")
return f"Session saved: {', '.join(saved)}. These will be available in your next conversation."
# ---------------------------------------------------------------------------
# MCP Tools: Shadow-Decay Analysis
# ---------------------------------------------------------------------------
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_drift_check() -> str:
"""
Analyze knowledge region health using Shadow-Decay conflict density.
Reports drifting regions (high shadow accumulation) and silent regions (low topic vitality).
"""
await _ensure_init()
config = load_config()
drifting = []
silent = []
healthy = 0
for cell_id in range(config.k_cells):
stats = await _storage.get_region_stats(cell_id)
if not stats:
silent.append(f" Cell {cell_id}: no data (uninitialized)")
continue
if stats.shadow_accum > 0.3:
drifting.append(
f" Cell {cell_id}: conflict_density={stats.shadow_accum:.3f} (HIGH)"
)
elif stats.vitality_score < config.vitality_min:
silent.append(
f" Cell {cell_id}: vitality={stats.vitality_score:.4f} (SILENT)"
)
else:
healthy += 1
lines = [
f"Knowledge Region Health ({config.k_cells} cells)",
"=" * 50,
f"Healthy: {healthy} | Drifting: {len(drifting)} | Silent: {len(silent)}",
"",
]
if drifting:
lines.append("Drifting regions (high conflict density):")
lines.extend(drifting)
lines.append("")
if silent:
lines.append("Silent regions (low vitality):")
lines.extend(silent)
lines.append("")
if not drifting and not silent:
lines.append("All regions healthy. No drift or silence detected.")
return "\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_graph_search(
query: str, depth: int = 2, top_k: int = 5, edge_types: str = ""
) -> str:
"""
Vector search → entry node → BFS via knowledge graph edges → return correlated context.
Args:
query: What to search for
depth: How many hops to traverse (default 2)
top_k: Max results to return (default 5)
edge_types: Optional filter. Comma-separated edge types to traverse.
Valid types: shadow, related, supersedes, depends_on, child_of, context_for
If empty, traverses all edge types.
"""
await _ensure_init()
await _reload_from_disk()
depth = min(depth, 5)
# Find entry point via HESTIA
entry_results = await _fetch_and_rerank(query, top_k=1)
if not entry_results:
return "No embers found."
entry_ember, entry_score, _ = entry_results[0]
entry_id = entry_ember.ember_id
# Parse edge type filter
type_filter = None
if edge_types:
type_filter = [t.strip() for t in edge_types.split(",") if t.strip()]
# BFS traversal from entry point
connected_ids = await _storage.traverse_kg(
entry_id, depth=depth, edge_types=type_filter
)
# Load discovered embers
graph_embers = []
for eid in connected_ids:
ember = await _storage.get_ember(eid)
if ember:
graph_embers.append(ember)
# Format output
lines = [
f"Graph search for: '{query}'",
f"Entry: 🔥 {entry_ember.name} [id: {entry_ember.ember_id}] (score: {entry_score:.2f})",
f" {_make_preview(entry_ember.content)}",
"",
f"Connected memories ({len(graph_embers)} found via {depth}-hop traversal):",
]
for ember in graph_embers[:top_k]:
shadow_info = f" [shadow:{ember.shadow_load:.1f}]" if ember.shadow_load > 0.1 else ""
stale_info = " [STALE]" if ember.is_stale else ""
preview = _make_preview(ember.content)
lines.append(
f" 🔥 {ember.name} [id: {ember.ember_id}]{stale_info}{shadow_info}: {preview}"
)
if not graph_embers:
lines.append(" No graph-connected memories found.")
lines.append("\n→ Use ember_read(id) for full content of any memory.")
return "\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_actionable(include_done: bool = False) -> str:
"""
Return embers with an active status (open or in_progress).
Use this for lightweight task tracking and action item review.
Args:
include_done: If True, also show completed items. Default: False.
"""
await _ensure_init()
await _reload_from_disk()
all_embers = await _storage.list_embers()
now = datetime.now(timezone.utc)
target_statuses = {"open", "in_progress"}
if include_done:
target_statuses.add("done")
actionable = [e for e in all_embers if e.status in target_statuses]
if not actionable:
return "No actionable items found."
# Sort: in_progress first, then open, then done. Within each, newest first.
status_order = {"in_progress": 0, "open": 1, "done": 2}
actionable.sort(
key=lambda e: (status_order.get(e.status, 9), -e.created_at.timestamp())
)
lines = [f"Actionable items ({len(actionable)}):\n"]
for e in actionable:
age_days = (now - e.created_at).total_seconds() / 86400.0
freshness = "today" if age_days < 1 else f"{int(age_days)}d ago"
status_icon = {
"open": "[ ]", "in_progress": "[>>]", "done": "[ok]"
}.get(e.status, "[??]")
lines.append(
f"{status_icon} {e.name} ({e.importance}, {freshness}) "
f"[id: {e.ember_id}]"
)
return "\n".join(lines)
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_set_status(ember_id: str, status: str) -> str:
"""
Update the status of an existing ember. Use for task tracking.
Args:
ember_id: The ember to update.
status: New status: open, in_progress, done, or clear (removes status).
"""
await _ensure_init()
ember = await _storage.get_ember(ember_id)
if not ember:
return f"Ember {ember_id} not found."
valid = ["open", "in_progress", "done", "clear"]
if status not in valid:
return f"Invalid status '{status}'. Use: {', '.join(valid)}"
ember.status = None if status == "clear" else status
ember.updated_at = datetime.now(timezone.utc)
await _storage.update_ember(ember)
display_status = "cleared" if status == "clear" else status
return f"Updated '{ember.name}' status to: {display_status}"
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_compact(
analyze: bool = True,
ember_id: str = "",
compacted_content: str = "",
min_shadow_load: float = 0.3,
min_age_days: float = 7.0,
limit: int = 10,
) -> str:
"""
AI-powered compaction of stale/shadowed embers. Two modes:
1. Analyze mode (analyze=True, default): Identifies compaction candidates
based on shadow_load, age, and HESTIA score. Returns candidates with
their full content for the LLM to summarize.
2. Execute mode (analyze=False, ember_id + compacted_content required):
Replaces an ember's content with the LLM-generated summary. Re-embeds
the vector, preserves all graph edges and metadata.
Workflow: Call with analyze=True first, generate summaries, then call
again with analyze=False for each candidate.
Args:
analyze: If True, return candidates. If False, apply compaction.
ember_id: (execute mode) The ember to compact.
compacted_content: (execute mode) The LLM-generated summary.
min_shadow_load: (analyze mode) Minimum shadow_load threshold.
min_age_days: (analyze mode) Minimum age in days.
limit: (analyze mode) Max candidates to return.
"""
await _ensure_init()
await _reload_from_disk()
if analyze:
# --- Analyze mode: find compaction candidates ---
all_embers = await _storage.list_embers()
now = datetime.now(timezone.utc)
candidates = []
for e in all_embers:
if e.is_stale or e.is_compacted:
continue # skip already stale or compacted
age_days = (now - e.created_at).total_seconds() / 86400.0
if age_days < min_age_days:
continue
if e.shadow_load < min_shadow_load:
continue
# Score: higher shadow_load + older age = better candidate
age_factor = min(age_days / 30.0, 3.0) # cap at 3x
compaction_score = e.shadow_load * age_factor
candidates.append((e, compaction_score, age_days))
candidates.sort(key=lambda x: x[1], reverse=True)
candidates = candidates[:limit]
if not candidates:
return "No compaction candidates found matching criteria."
lines = [f"Found {len(candidates)} compaction candidate(s):\n"]
for e, score, age in candidates:
lines.append(
f"---\n"
f"ID: {e.ember_id}\n"
f"Name: {e.name}\n"
f"Shadow: {e.shadow_load:.2f} | Age: {int(age)}d | "
f"Score: {score:.2f} | Chars: {len(e.content)}\n"
f"Content:\n{e.content}\n"
)
lines.append(
"\n--- To compact, call ember_compact(analyze=False, "
"ember_id='...', compacted_content='your summary') for each."
)
return "\n".join(lines)
else:
# --- Execute mode: apply compaction ---
if not ember_id or not compacted_content:
return "Execute mode requires ember_id and compacted_content."
ember = await _storage.get_ember(ember_id)
if not ember:
return f"Ember {ember_id} not found."
if ember.is_compacted:
return f"Ember '{ember.name}' is already compacted."
original_length = len(ember.content)
# Update content and compaction fields
ember.content = compacted_content
ember.is_compacted = True
ember.original_content_length = original_length
ember.updated_at = datetime.now(timezone.utc)
# Re-embed the compacted content
new_vector = await _engine.embed(compacted_content)
new_cell_id = _engine.assign_cell(new_vector)
ember.cell_id = new_cell_id
# Update FAISS vector: remove old, add new with same integer ID
int_id = _storage.uuid_to_int.get(ember_id)
if int_id is not None:
await _engine.remove_vector(int_id)
await _engine.add_vector(int_id, new_vector)
# Save updated ember
await _storage.update_ember(ember)
reduction = round(
(1 - len(compacted_content) / original_length) * 100, 1
) if original_length > 0 else 0
return (
f"Compacted '{ember.name}': {original_length} → "
f"{len(compacted_content)} chars ({reduction}% reduction). "
f"Vector re-embedded, edges preserved."
)
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_health() -> str:
"""
Compute hallucination risk across all embers, log to metrics, return health report with trend.
"""
await _ensure_init()
config = load_config()
embers = await _storage.list_embers()
if not embers:
return "No embers in storage. Health check not applicable."
# Collect shadow loads and stale flags
shadow_loads = [e.shadow_load for e in embers]
stale_flags = [e.is_stale for e in embers]
# Collect per-cell vitality scores
vitalities = []
for cell_id in range(config.k_cells):
stats = await _storage.get_region_stats(cell_id)
if stats:
vitalities.append(stats.vitality_score)
else:
vitalities.append(0.0)
# Compute risk
risk_data = compute_hallucination_risk(
shadow_loads, stale_flags, vitalities, config.vitality_min
)
# Log metric
await _storage.log_metric("hallucination_risk", risk_data["risk_score"], risk_data)
# Get trend
history = await _storage.get_metric_history("hallucination_risk", limit=5)
trend_values = [f"{h['value']:.3f}" for h in history]
trend_str = " → ".join(trend_values) if trend_values else "no history"
return (
f"Health: risk={risk_data['risk_score']:.3f} (0=ok, 1=bad) | "
f"trend: {trend_str}\n"
f"Total: {risk_data['total']} | "
f"Shadowed(Φ>0.5): {risk_data['shadowed_count']} | "
f"Stale: {risk_data['stale_count']} | "
f"Silent: {risk_data['silent_topics']} | "
f"Avg Φ: {risk_data['avg_shadow_load']:.3f}"
)
@mcp.tool(annotations={"readOnlyHint": False, "destructiveHint": False, "openWorldHint": False})
async def ember_recompute_shadows() -> str:
"""
Full recalculation of shadow_load for every ember.
Use after migration or data import.
"""
await _ensure_init()
await _reload_from_disk()
config = load_config()
embers = await _storage.list_embers()
if not embers:
return "No embers to recompute."
updated = 0
for ember in embers:
# Get this ember's FAISS int ID
int_id = _storage.uuid_to_int.get(ember.ember_id)
if int_id is None:
continue
# Reconstruct vector
vec = _engine.reconstruct_vector(int_id)
if vec is None:
continue
# Search neighbors
results = _engine.search(vec.reshape(1, -1), top_k=config.shadow_k + 1)
if not results:
continue
# Build neighbor data
neighbor_vecs = []
neighbor_times = []
neighbor_ids = []
for faiss_id, dist in results:
uuid = _storage.int_to_uuid.get(faiss_id)
if not uuid or uuid == ember.ember_id:
continue
neighbor = await _storage.get_ember(uuid)
if not neighbor:
continue
n_vec = _engine.reconstruct_vector(faiss_id)
if n_vec is not None:
neighbor_vecs.append(n_vec)
neighbor_times.append(neighbor.created_at)
neighbor_ids.append(uuid)
if not neighbor_vecs:
continue
# Compute shadow load
shadow_load, shadower_id = compute_shadow_load(
vec,
ember.created_at,
neighbor_vecs,
neighbor_times,
neighbor_ids,
config.shadow_delta,
config.shadow_epsilon,
)
# Update if changed
if abs(shadow_load - ember.shadow_load) > 0.001:
ember.shadow_load = shadow_load
ember.shadowed_by = shadower_id
ember.shadow_updated_at = datetime.now(timezone.utc)
await _storage.update_ember(ember)
if shadower_id and shadow_load > 0:
await _storage.save_edge(
ember.ember_id, shadower_id, "shadow", shadow_load
)
updated += 1
return f"Recomputed shadows for {len(embers)} embers. {updated} updated."
@mcp.tool(annotations={"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False})
async def ember_explain(ember_id: str) -> str:
"""
Return HESTIA score breakdown for a specific ember:
shadow load, vitality, edges, and scoring factors.
Args:
ember_id: The ID of the ember to explain
"""
await _ensure_init()
config = load_config()
ember = await _storage.get_ember(ember_id)
if not ember:
return f"Ember {ember_id} not found."
# Get vector for vitality calculation
int_id = _storage.uuid_to_int.get(ember_id)
vitality = 0.0
if int_id is not None:
vec = _engine.reconstruct_vector(int_id)
if vec is not None:
radius_l2 = VectorEngine.cosine_to_l2(1.0 - config.topic_radius)
radius_results = _engine.search_radius(vec.reshape(1, -1), radius_l2)
now = datetime.now(timezone.utc)
neighbor_times = []
neighbor_dists = []
for fid, dist_sq in radius_results:
uuid = _storage.int_to_uuid.get(fid)
if uuid:
ne = await _storage.get_ember(uuid)
if ne:
neighbor_times.append(ne.created_at)
neighbor_dists.append(dist_sq)
vitality = compute_topic_vitality(
neighbor_dists, neighbor_times, now, radius_l2, config.vitality_lambda
)
# HESTIA at self-similarity = 1.0 (perfect match)
v_max = max(vitality, 0.001)
score, breakdown = compute_hestia_score(
1.0, ember.shadow_load, vitality, v_max,
config.shadow_gamma, config.nostalgia_alpha,
)
# Get edges
edges = await _storage.get_edges(ember_id)
edge_lines = []
for e in edges:
other = e["target_id"] if e["source_id"] == ember_id else e["source_id"]
edge_lines.append(f" {e['edge_type']}: {other[:8]}... (weight={e['weight']:.2f})")
lines = [
f"Ember Explanation: {ember.name}",
f"ID: {ember_id}",
"=" * 50,
"",
"HESTIA Factors (at perfect query match):",
f" Final Score: {score:.4f}",
f" Cosine Sim: 1.0 (self)",
f" Shadow Factor: {breakdown['shadow_factor']:.4f} (shadow_load={ember.shadow_load:.3f})",
f" Vitality Factor: {breakdown['vitality_factor']:.4f} (vitality={vitality:.3f})",
"",
f"Shadow Load: {ember.shadow_load:.3f}",
f"Shadowed By: {ember.shadowed_by or 'None'}",
f"Related IDs: {', '.join(ember.related_ids) if ember.related_ids else 'None'}",
f"Stale: {ember.is_stale} ({ember.stale_reason or 'N/A'})",
"",
f"Edges ({len(edges)}):",
]
lines.extend(edge_lines if edge_lines else [" None"])
return "\n".join(lines)
# ---------------------------------------------------------------------------
# MCP Prompts
# ---------------------------------------------------------------------------
@mcp.prompt()
def start_session() -> str:
"""Load memory and context at the start of a new conversation."""
return """Check my persistent memory for any relevant context.
Steps:
1. Call ember_auto with a summary of what the user is asking about
2. If relevant memories are found, incorporate them naturally
3. If there are recent "Next Steps" embers, mention what was planned
4. Respond to the user with full context, as if you remember everything"""
@mcp.prompt()
def end_session() -> str:
"""Save important context before ending a conversation."""
return """Before we end this conversation, let's save the important parts.
Steps:
1. Summarize the key work done in this session (2-3 sentences)
2. List any decisions that were made
3. Note any next steps or open items
4. Call ember_save_session with all three
5. Confirm to the user what was saved"""
@mcp.prompt()
def remember() -> str:
"""Store something the user wants remembered across sessions."""
return """The user wants to save something to persistent memory.
Steps:
1. Identify what needs to be remembered (preference, fact, rule, context)
2. Choose a clear, searchable name
3. Add relevant tags for categorization
4. Choose the right importance level: fact, decision, preference, context, or learning
5. Call ember_store with the structured content and importance
6. Confirm what was saved"""