"""
MCP tool definitions for Ember V3.
All tools exposed to Claude are registered here.
Tools call into memory/operations.py — they never touch SQLite directly.
"""
from __future__ import annotations
import time
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger("ember.tools")
PREVIEW_CHARS = 200
SOURCE_FILE_MAX_CHARS = 5000
MAX_SOURCE_FILES = 3
def _preview(text: str, max_chars: int = PREVIEW_CHARS) -> str:
"""Truncate text for preview display."""
if len(text) <= max_chars:
return text
return text[:max_chars].rsplit(" ", 1)[0] + "..."
def _resolve_id(ember_id: str) -> Optional[str]:
"""Resolve a full or partial ember ID to a full UUID. Returns None if not found."""
from ember.memory.db import get_db
db = get_db()
# Try exact match first
row = db.fetchone("SELECT id FROM memories WHERE id = ?", (ember_id,))
if row:
return row["id"]
# Try partial match
row = db.fetchone("SELECT id FROM memories WHERE id LIKE ?", (f"{ember_id}%",))
return row["id"] if row else None
def _format_memory(mem: dict, include_score: bool = False) -> str:
"""Format a memory dict for tool output."""
parts = [
f"[id: {mem['id'][:8]}]",
f"[{mem.get('tier', '?')}]",
]
if mem.get("status"):
parts.append(f"[{mem['status']}]")
if mem.get("importance", 0) >= 0.7:
parts.append(f"[imp: {mem['importance']:.1f}]")
if mem.get("tags"):
parts.append(f"[{mem['tags']}]")
if include_score and "final_score" in mem:
parts.append(f"[score: {mem['final_score']:.3f}]")
if include_score and "similarity_score" in mem:
parts.append(f"[sim: {mem['similarity_score']:.3f}]")
header = " ".join(parts)
content = _preview(mem.get("content", ""))
return f"{header}: {content}"
def register_all_tools(server, engine, session_id: str):
"""Register all Ember MCP tools with the server."""
# ---- Tool 1: ember_store ----
@server.tool()
async def ember_store(
content: str,
tier: str = "session",
importance: float = 0.5,
tags: str = "",
source: str = "agent",
status: str = "",
source_path: str = "",
) -> str:
"""
Store a new memory. Claude decides what to store, which tier,
importance level, and any tags.
Tiers: working (ephemeral), session (30d), relational (180d), glacier (permanent).
For tasks/action items, set status='open'.
"""
from ember.memory.operations import store_memory
from ember.memory.checkpoint import check_overload
embedding = engine.embed(content) if engine.is_semantic else None
result = store_memory(
content=content,
tier=tier,
importance=importance,
tags=tags,
source=source,
status=status,
embedding=embedding,
source_path=source_path,
)
response = f"Stored [{result['id'][:8]}] in {tier} tier"
# Check for cognitive overload after storing
if tier == "working":
overload = check_overload()
if overload["overloaded"]:
response += f"\n\n{overload['recommendation']}"
return response
# ---- Tool 2: ember_recall ----
@server.tool()
async def ember_recall(
query: str,
top_k: int = 5,
tier_filter: str = "",
status_filter: str = "",
min_importance: float = 0.0,
) -> str:
"""
Retrieve memories semantically similar to a query.
Use at conversation start or when past context would help.
tier_filter: comma-separated tiers (e.g., "session,relational")
status_filter: 'open' | 'in_progress' | 'done' | '' (no filter)
"""
from ember.memory.operations import semantic_search, search_fts, touch_memories_batch
tiers = [t.strip() for t in tier_filter.split(",") if t.strip()] or None
results = []
if engine.is_semantic:
query_emb = engine.embed(query)
results = semantic_search(
query_embedding=query_emb,
engine=engine,
top_k=top_k,
tier_filter=tiers,
min_importance=min_importance,
)
# Fall back to FTS5 if semantic search found nothing
if not results:
results = search_fts(query, limit=top_k)
# Apply status filter post-search
if status_filter:
results = [r for r in results if r.get("status") == status_filter]
if not results:
return "No memories found matching your query."
# Touch accessed memories
touch_memories_batch([r["id"] for r in results])
lines = [f"Found {len(results)} memories:\n"]
for mem in results:
lines.append(_format_memory(mem, include_score=True))
return "\n".join(lines)
# ---- Tool 3: ember_read ----
@server.tool()
async def ember_read(ember_id: str) -> str:
"""
Read the full content of a specific memory by ID.
Use after ember_recall returns previews and you need complete content.
"""
from ember.memory.operations import get_memory, touch_memory
full_id = _resolve_id(ember_id)
if not full_id:
return f"Memory not found: {ember_id}"
mem = get_memory(full_id)
touch_memory(mem["id"])
parts = [
f"ID: {mem['id']}",
f"Tier: {mem.get('tier', '?')}",
f"Importance: {mem.get('importance', 0.5):.2f}",
]
if mem.get("tags"):
parts.append(f"Tags: {mem['tags']}")
if mem.get("status"):
parts.append(f"Status: {mem['status']}")
if mem.get("source_path"):
parts.append(f"Source: {mem['source_path']}")
if mem.get("shadow_load", 0) > 0:
parts.append(f"Shadow load: {mem['shadow_load']:.2f}")
created = time.strftime("%Y-%m-%d %H:%M", time.localtime(mem.get("created_at", 0)))
parts.append(f"Created: {created}")
parts.append(f"\n{mem.get('content', '')}")
return "\n".join(parts)
# ---- Tool 4: ember_learn ----
@server.tool()
async def ember_learn(
conversation_context: str,
source_path: str = "",
) -> str:
"""
Auto-capture key information from conversation. Extracts facts,
preferences, decisions, and learnings — stores as memories.
Call silently after every substantive user message.
Format: "TYPE: description" where TYPE is fact/decision/preference/learning
"""
from ember.memory.operations import store_memory
# Parse importance from type prefix
importance_map = {
"fact": 0.7,
"decision": 0.8,
"preference": 0.75,
"learning": 0.6,
"context": 0.4,
}
tier_map = {
"fact": "relational",
"decision": "session",
"preference": "relational",
"learning": "session",
"context": "working",
}
# Extract type from prefix
detected_type = "context"
content = conversation_context
for type_key in importance_map:
if conversation_context.lower().startswith(f"{type_key}:"):
detected_type = type_key
content = conversation_context[len(type_key) + 1:].strip()
break
embedding = engine.embed(content) if engine.is_semantic else None
result = store_memory(
content=content,
tier=tier_map[detected_type],
importance=importance_map[detected_type],
tags=detected_type,
source="learned",
embedding=embedding,
source_path=source_path,
)
return f"Learned [{result['id'][:8]}] as {detected_type} in {tier_map[detected_type]} tier"
# ---- Tool 5: ember_update ----
@server.tool()
async def ember_update(
ember_id: str,
content: str = "",
tier: str = "",
importance: float = -1,
tags: str = "",
status: str = "",
) -> str:
"""
Update an existing memory. Change content, tier, importance, tags,
or status. Use for promoting memories, marking tasks done, or
correcting information.
"""
from ember.memory.operations import update_memory
full_id = _resolve_id(ember_id)
if not full_id:
return f"Update failed: MEMORY_NOT_FOUND ({ember_id})"
kwargs = {}
if content:
kwargs["content"] = content
if engine.is_semantic:
kwargs["embedding"] = engine.embed(content)
if tier:
kwargs["tier"] = tier
if importance >= 0:
kwargs["importance"] = importance
if tags:
kwargs["tags"] = tags
if status:
kwargs["status"] = status
result = update_memory(full_id, **kwargs)
if result.get("updated"):
return f"Updated [{full_id[:8]}]"
return f"Update failed: {result.get('error', 'unknown')}"
# ---- Tool 6: ember_discard ----
@server.tool()
async def ember_discard(
ember_id: str,
reason: str = "",
replacement_content: str = "",
) -> str:
"""
Mark a memory as outdated/superseded. Does not delete — marks as
shadowed so it no longer appears in retrieval.
Optionally provide replacement_content to store a corrected version.
Related memories in the knowledge graph will be flagged for review.
"""
from ember.memory.operations import (
shadow_memory, store_memory, get_edges, get_memory,
)
full_id = _resolve_id(ember_id)
if not full_id:
return f"Discard failed: MEMORY_NOT_FOUND ({ember_id})"
result = shadow_memory(full_id, reason=reason)
if not result.get("shadowed"):
return f"Discard failed: {result.get('error', 'unknown')}"
response = f"Shadowed [{full_id[:8]}]"
# Store replacement if provided
if replacement_content:
old_mem = get_memory(full_id)
embedding = engine.embed(replacement_content) if engine.is_semantic else None
new_result = store_memory(
content=replacement_content,
tier=old_mem.get("tier", "session") if old_mem else "session",
importance=old_mem.get("importance", 0.5) if old_mem else 0.5,
tags=old_mem.get("tags", "") if old_mem else "",
source="correction",
embedding=embedding,
)
response += f" → Replaced with [{new_result['id'][:8]}]"
# Flag connected memories for review
edges = get_edges(ember_id)
if edges:
flagged = set()
for edge in edges:
neighbor = edge["target_id"] if edge["source_id"] == ember_id else edge["source_id"]
if neighbor != ember_id:
flagged.add(neighbor)
if flagged:
response += f"\n{len(flagged)} connected memories flagged for review"
return response
# ---- Tool 7: ember_delete ----
@server.tool()
async def ember_delete(ember_id: str) -> str:
"""
Permanently delete a memory and its knowledge graph edges.
Use sparingly — prefer ember_discard for outdated information.
"""
from ember.memory.operations import delete_memory
full_id = _resolve_id(ember_id)
if not full_id:
return f"Delete failed: MEMORY_NOT_FOUND ({ember_id})"
result = delete_memory(full_id)
if result.get("deleted"):
return f"Deleted [{full_id[:8]}] and its edges"
return f"Delete failed: {result.get('error', 'unknown')}"
# ---- Tool 8: ember_list ----
@server.tool()
async def ember_list(
tier: str = "",
status: str = "",
limit: int = 20,
offset: int = 0,
) -> str:
"""
List stored memories with optional filtering and pagination.
Returns metadata and previews, not full content.
"""
from ember.memory.operations import list_memories
tiers = [t.strip() for t in tier.split(",") if t.strip()] or None
results = list_memories(
tier_filter=tiers,
status_filter=status or None,
limit=limit,
offset=offset,
)
if not results:
return "No memories found."
lines = [f"Showing {len(results)} memories (offset={offset}):\n"]
for mem in results:
lines.append(_format_memory(mem))
return "\n".join(lines)
# ---- Tool 9: ember_auto ----
@server.tool()
async def ember_auto(conversation_context: str) -> str:
"""
Automatically retrieve relevant memories for the current conversation.
Call at the start of every new conversation to load context.
"""
from ember.memory.operations import semantic_search, search_fts, touch_memories_batch
results = []
if engine.is_semantic:
query_emb = engine.embed(conversation_context)
results = semantic_search(
query_embedding=query_emb,
engine=engine,
top_k=5,
)
# Fall back to FTS5 if semantic search found nothing
if not results:
results = search_fts(conversation_context, limit=5)
if not results:
return "No relevant memories found. Starting fresh."
# Boost session-tier recent memories
now = time.time()
for r in results:
if r.get("tier") == "session" and r.get("tags") == "session_summary":
age_hours = (now - r.get("created_at", now)) / 3600
if age_hours < 24:
r["final_score"] = r.get("final_score", 0) * 2.0
elif age_hours < 72:
r["final_score"] = r.get("final_score", 0) * 1.5
results.sort(key=lambda x: x.get("final_score", 0), reverse=True)
touch_memories_batch([r["id"] for r in results])
lines = []
for mem in results:
lines.append(f"- {_format_memory(mem)}")
# Check for deep context availability
source_paths = [r.get("source_path") for r in results if r.get("source_path")]
if source_paths:
lines.append(f"\nDeep context available from {len(source_paths)} source(s)")
return "\n".join(lines)
# ---- Tool 10: ember_deep_recall ----
@server.tool()
async def ember_deep_recall(query: str, top_k: int = 3) -> str:
"""
Retrieve memories AND read their source files for full context.
Use when ember_recall previews aren't enough and you need
deeper detail from the original source.
"""
from ember.memory.operations import semantic_search, search_fts, touch_memories_batch
results = []
if engine.is_semantic:
query_emb = engine.embed(query)
results = semantic_search(
query_embedding=query_emb,
engine=engine,
top_k=top_k,
)
# Fall back to FTS5 if semantic search found nothing
if not results:
results = search_fts(query, limit=top_k)
if not results:
return "No memories found."
touch_memories_batch([r["id"] for r in results])
lines = []
files_read = 0
for mem in results:
lines.append(f"\n{'='*40}")
lines.append(_format_memory(mem, include_score=True))
lines.append(mem.get("content", ""))
# Read source file if available
source_path = mem.get("source_path", "")
if source_path and files_read < MAX_SOURCE_FILES:
path = Path(source_path).expanduser()
if path.exists() and path.is_relative_to(Path.home()):
try:
text = path.read_text(errors="replace")[:SOURCE_FILE_MAX_CHARS]
lines.append(f"\n--- Source: {source_path} ---")
lines.append(text)
files_read += 1
except Exception as e:
lines.append(f"(Could not read source: {e})")
return "\n".join(lines)
# ---- Tool 11: ember_actionable ----
@server.tool()
async def ember_actionable(include_done: bool = False) -> str:
"""
List all memories with active status (open or in_progress).
Surfaces pending tasks, follow-ups, and action items.
"""
from ember.memory.operations import list_memories
results = []
for status in ("open", "in_progress"):
items = list_memories(status_filter=status, limit=100)
results.extend(items)
if include_done:
done = list_memories(status_filter="done", limit=50)
results.extend(done)
if not results:
return "No actionable items found."
# Group by status
grouped = {}
for mem in results:
s = mem.get("status", "unknown")
grouped.setdefault(s, []).append(mem)
lines = []
for status_key in ("in_progress", "open", "done"):
items = grouped.get(status_key, [])
if items:
lines.append(f"\n### {status_key.upper()} ({len(items)})")
for mem in items:
lines.append(f" {_format_memory(mem)}")
return "\n".join(lines)
# ---- Tool 12: ember_checkpoint ----
@server.tool()
async def ember_checkpoint(
state_summary: str,
active_task: str = "",
clear_working: bool = False,
) -> str:
"""
Save current task state and optionally clear working memory.
Use when context is cluttered, before switching tasks, or
when resuming after a break.
"""
from ember.memory.checkpoint import save_checkpoint
result = save_checkpoint(
session_id=session_id,
state_summary=state_summary,
active_task=active_task,
clear_working=clear_working,
)
response = f"Checkpoint saved at {time.strftime('%H:%M:%S', time.localtime(result['saved_at']))}"
response += f" ({result['working_memories_saved']} working memories captured)"
if result["cleared"]:
response += " — working memory cleared"
return response
# ---- Tool 13: ember_resume ----
@server.tool()
async def ember_resume(checkpoint_id: str = "") -> str:
"""
Load the most recent checkpoint to resume where you left off.
Call at session start if the user wants to continue previous work.
"""
from ember.memory.checkpoint import load_checkpoint
state = load_checkpoint(
checkpoint_id=checkpoint_id if checkpoint_id else None,
)
if not state:
return "No checkpoint found to resume from."
saved_at = time.strftime(
"%Y-%m-%d %H:%M",
time.localtime(state.get("created_at", 0)),
)
lines = [
f"Resuming from checkpoint ({saved_at}):",
f"State: {state.get('state_summary', 'N/A')}",
]
if state.get("active_task"):
lines.append(f"Active task: {state['active_task']}")
snapshot = state.get("working_memory_snapshot", [])
if snapshot:
lines.append(f"Working memories at checkpoint: {len(snapshot)}")
return "\n".join(lines)
# ---- Tool 14: ember_consolidate ----
@server.tool()
async def ember_consolidate(
ember_ids: str,
merged_content: str,
tier: str = "session",
importance: float = 0.6,
) -> str:
"""
Merge two or more related memories into a single consolidated
memory. Original memories are shadowed. Use when you detect
redundancy or multiple fragments that should be unified.
ember_ids: comma-separated memory IDs (minimum 2)
"""
from ember.utils.consolidation import consolidate_memories
ids = [i.strip() for i in ember_ids.split(",") if i.strip()]
if len(ids) < 2:
return "Need at least 2 memory IDs to consolidate."
result = consolidate_memories(
memory_ids=ids,
merged_content=merged_content,
tier=tier,
importance=importance,
)
if "error" in result:
return f"Consolidation failed: {result['error']}"
return (
f"Consolidated {len(result['shadowed_ids'])} memories → "
f"[{result['new_id'][:8]}]"
)
# ---- Tool 15: ember_session_close ----
@server.tool()
async def ember_session_close(
session_summary: str,
key_decisions: str = "",
next_steps: str = "",
) -> str:
"""
Close the current session. Triggers consolidation:
- Working memories evaluated for promotion
- Session summary stored
- Expired memories cleaned up
"""
from ember.memory.operations import close_session
result = close_session(
session_id=session_id,
summary=session_summary,
decisions=key_decisions,
next_steps=next_steps,
)
return (
f"Session closed. "
f"Promoted: {result['memories_promoted']}, "
f"Discarded: {result['memories_discarded']}"
)
# ---- Tool 16: ember_graph_search ----
@server.tool()
async def ember_graph_search(
query: str,
depth: int = 2,
top_k: int = 5,
) -> str:
"""
Vector search → entry node → BFS via knowledge graph → return
correlated context. Traverses memory connections to find
related information.
"""
from ember.memory.operations import (
semantic_search, search_fts, traverse_kg,
get_memories_batch, touch_memories_batch,
)
# Find entry points via semantic search, fall back to FTS5
entry_results = []
if engine.is_semantic:
query_emb = engine.embed(query)
entry_results = semantic_search(
query_embedding=query_emb,
engine=engine,
top_k=3,
)
if not entry_results:
entry_results = search_fts(query, limit=3)
if not entry_results:
return "No entry points found for graph search."
entry_ids = [r["id"] for r in entry_results]
# BFS traversal
all_ids = traverse_kg(entry_ids, depth=min(depth, 5))
# Fetch all discovered memories
memories = get_memories_batch(all_ids)
# Filter out shadowed
memories = [m for m in memories if not m.get("is_shadowed")]
# Sort by importance
memories.sort(key=lambda m: m.get("importance", 0), reverse=True)
memories = memories[:top_k]
if not memories:
return "No connected memories found."
touch_memories_batch([m["id"] for m in memories])
lines = [f"Graph search found {len(memories)} connected memories:\n"]
for mem in memories:
lines.append(_format_memory(mem))
return "\n".join(lines)
# ---- Tool 17: ember_health ----
@server.tool()
async def ember_health() -> str:
"""
Compute memory health metrics. Shows total counts per tier,
shadow load distribution, and overall health score.
"""
from ember.memory.db import get_db
from ember.memory.operations import log_metric
db = get_db()
# Counts per tier
tier_counts = {}
for tier in ("working", "session", "relational", "glacier"):
row = db.fetchone(
"SELECT COUNT(*) as cnt FROM memories WHERE tier = ? AND is_shadowed = 0",
(tier,),
)
tier_counts[tier] = row["cnt"] if row else 0
total = sum(tier_counts.values())
# Shadowed count
row = db.fetchone("SELECT COUNT(*) as cnt FROM memories WHERE is_shadowed = 1")
shadowed = row["cnt"] if row else 0
# Average shadow load on active memories
row = db.fetchone(
"SELECT AVG(shadow_load) as avg_sl FROM memories WHERE is_shadowed = 0"
)
avg_shadow = row["avg_sl"] if row and row["avg_sl"] else 0.0
# High shadow load count
row = db.fetchone(
"SELECT COUNT(*) as cnt FROM memories WHERE shadow_load > 0.5 AND is_shadowed = 0"
)
high_shadow = row["cnt"] if row else 0
# Actionable items
row = db.fetchone(
"SELECT COUNT(*) as cnt FROM memories WHERE status IN ('open', 'in_progress')"
)
actionable = row["cnt"] if row else 0
# Health score: lower shadow + balanced tiers = healthier
shadow_risk = min(1.0, avg_shadow * 2)
health_score = max(0.0, 1.0 - shadow_risk)
log_metric("health_score", health_score)
log_metric("total_memories", total)
lines = [
"Ember Health Report",
"=" * 30,
f"Total active memories: {total}",
f" working: {tier_counts['working']}",
f" session: {tier_counts['session']}",
f" relational: {tier_counts['relational']}",
f" glacier: {tier_counts['glacier']}",
f"Shadowed (inactive): {shadowed}",
f"Avg shadow load: {avg_shadow:.3f}",
f"High shadow (>0.5): {high_shadow}",
f"Actionable items: {actionable}",
f"Health score: {health_score:.2f}",
]
return "\n".join(lines)
# ---- Tool 18: ember_save_session ----
@server.tool()
async def ember_save_session(
summary: str,
decisions: str = "",
next_steps: str = "",
source_path: str = "",
) -> str:
"""
Save key takeaways from the current session. Call before ending
a conversation where important work was done.
"""
from ember.memory.operations import store_memory
content = f"Session summary: {summary}"
if decisions:
content += f"\nDecisions: {decisions}"
if next_steps:
content += f"\nNext steps: {next_steps}"
embedding = engine.embed(content) if engine.is_semantic else None
result = store_memory(
content=content,
tier="session",
importance=0.7,
tags="session_summary",
source="session",
embedding=embedding,
source_path=source_path,
)
return f"Session saved [{result['id'][:8]}]"
# ---- Tool 19: ember_set_status ----
@server.tool()
async def ember_set_status(ember_id: str, status: str) -> str:
"""
Update the status of a memory. Use for task tracking.
Status: 'open' | 'in_progress' | 'done' | '' (clear status)
"""
from ember.memory.operations import update_memory
valid_statuses = ("open", "in_progress", "done", "")
if status not in valid_statuses:
return f"Invalid status. Must be one of: {valid_statuses}"
full_id = _resolve_id(ember_id)
if not full_id:
return f"Failed: MEMORY_NOT_FOUND ({ember_id})"
result = update_memory(full_id, status=status)
if result.get("updated"):
return f"Status updated: [{full_id[:8]}] → {status or 'cleared'}"
return f"Failed: {result.get('error', 'unknown')}"