"""
OMEGA MCP Handlers -- Maps tool names to async handler functions.
Each handler delegates to omega.bridge for actual operations and returns
MCP-compatible response dicts.
"""
import logging
from pathlib import Path
from typing import Any, Dict
logger = logging.getLogger("omega.server.handlers")
def _clamp_int(value, default: int, min_val: int = 1, max_val: int = 10000) -> int:
"""Clamp a numeric argument to safe bounds."""
try:
v = int(value)
return max(min_val, min(v, max_val))
except (TypeError, ValueError):
return default
# Safe directory for export/import operations
_SAFE_EXPORT_DIR = Path.home() / ".omega"
# ============================================================================
# Response Helpers
# ============================================================================
def mcp_response(text: str) -> dict:
"""Build a successful MCP response."""
return {"content": [{"type": "text", "text": str(text)}]}
def mcp_error(text: str) -> dict:
"""Build an error MCP response."""
return {"content": [{"type": "text", "text": f"Error: {text}"}], "isError": True}
# ============================================================================
# Handler: omega_store (also handles omega_remember as alias)
# ============================================================================
async def handle_omega_store(arguments: dict) -> dict:
"""Store a memory with optional type and metadata.
Accepts 'text' as alias for 'content' for backward compat with omega_remember.
Defaults event_type to 'memory' when not provided.
"""
content = arguments.get("content", "").strip()
# Support 'text' as alias for 'content' (backward compat with omega_remember)
if not content:
content = arguments.get("text", "").strip()
if not content:
return mcp_error("content (or text) is required")
event_type = arguments.get("event_type", "memory")
metadata = arguments.get("metadata", {})
session_id = arguments.get("session_id")
project = arguments.get("project") or (metadata or {}).get("project")
entity_id = arguments.get("entity_id")
agent_type = arguments.get("agent_type")
# Wire through priority if provided
priority = arguments.get("priority")
if priority is not None:
try:
priority = max(1, min(5, int(priority)))
metadata = dict(metadata or {})
metadata["priority"] = priority
except (TypeError, ValueError):
pass
try:
from omega.bridge import store
result = store(
content=content,
event_type=event_type,
metadata=metadata,
session_id=session_id,
project=project,
entity_id=entity_id,
agent_type=agent_type,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_store failed: %s", e, exc_info=True)
import traceback
tb = traceback.format_exc()
logger.error("omega_store traceback: %s", tb)
return mcp_error(f"Failed to store memory: {e}")
# ============================================================================
# Handler: omega_query
# ============================================================================
async def handle_omega_query(arguments: dict) -> dict:
"""Search memories — semantic, phrase, or timeline mode."""
mode = arguments.get("mode", "semantic")
# Timeline mode — delegate to timeline handler
if mode == "timeline":
return await handle_omega_timeline(arguments)
query_text = arguments.get("query", "").strip()
if not query_text:
return mcp_error("query is required")
# Phrase mode — delegate to bridge.phrase_search
if mode == "phrase":
limit = _clamp_int(arguments.get("limit", 10), default=10, max_val=1000)
event_type = arguments.get("event_type")
project = arguments.get("project")
case_sensitive = arguments.get("case_sensitive", False)
try:
from omega.bridge import phrase_search
result = phrase_search(
phrase=query_text,
limit=limit,
event_type=event_type,
project=project,
case_sensitive=case_sensitive,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_query (phrase) failed: %s", e, exc_info=True)
return mcp_error("Phrase search failed")
# Semantic mode (default)
limit = _clamp_int(arguments.get("limit", 10), default=10, max_val=1000)
event_type = arguments.get("event_type")
project = arguments.get("project")
session_id = arguments.get("session_id")
context_file = arguments.get("context_file")
context_tags = arguments.get("context_tags")
filter_tags = arguments.get("filter_tags")
raw_temporal = arguments.get("temporal_range")
temporal_range = tuple(raw_temporal) if raw_temporal and len(raw_temporal) == 2 else None
entity_id = arguments.get("entity_id")
agent_type = arguments.get("agent_type")
try:
from omega.bridge import query
result = query(
query_text=query_text,
limit=limit,
event_type=event_type,
project=project,
session_id=session_id,
context_file=context_file,
context_tags=context_tags,
filter_tags=filter_tags,
temporal_range=temporal_range,
entity_id=entity_id,
agent_type=agent_type,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_query failed: %s", e, exc_info=True)
return mcp_error("Query failed")
# ============================================================================
# Handler: omega_welcome
# ============================================================================
async def handle_omega_welcome(arguments: dict) -> dict:
"""Get a session welcome briefing with recent relevant memories."""
session_id = arguments.get("session_id")
project = arguments.get("project")
try:
from omega.bridge import welcome
from omega import json_compat as json
briefing = welcome(session_id=session_id, project=project)
return mcp_response(json.dumps(briefing, indent=2))
except Exception as e:
logger.error("omega_welcome failed: %s", e, exc_info=True)
return mcp_error("Welcome briefing failed")
# ============================================================================
# Handler: omega_profile
# ============================================================================
async def handle_omega_profile(arguments: dict) -> dict:
"""Read or update the user profile, or list preferences.
If 'update' dict is provided, merges those fields and saves.
Otherwise, returns the current profile.
Also handles legacy omega_save_profile calls via 'profile' param.
"""
# Action-based routing for composite tool
action = arguments.get("action", "").strip()
if action == "list_preferences":
return await handle_omega_list_preferences(arguments)
# Support legacy omega_save_profile param name
update_data = arguments.get("update") or arguments.get("profile")
if update_data:
# Write mode
try:
from omega.bridge import get_profile, save_profile
existing = get_profile()
existing.pop("preferences_from_memory", None)
existing.update(update_data)
success = save_profile(existing)
if success:
return mcp_response(f"Profile updated with {len(update_data)} field(s).")
else:
return mcp_error("Failed to save profile to disk.")
except Exception as e:
logger.error("omega_profile (save) failed: %s", e, exc_info=True)
return mcp_error("Save profile failed")
else:
# Read mode
try:
from omega.bridge import get_profile
from omega import json_compat as json
profile = get_profile()
if not profile:
return mcp_response("No profile found. Preferences will build your profile over time.")
return mcp_response(json.dumps(profile, indent=2))
except Exception as e:
logger.error("omega_profile failed: %s", e, exc_info=True)
return mcp_error("Profile failed")
# ============================================================================
# Handler: omega_delete_memory
# ============================================================================
async def handle_omega_delete_memory(arguments: dict) -> dict:
"""Delete a specific memory by its ID."""
memory_id = arguments.get("memory_id", "").strip()
if not memory_id:
return mcp_error("memory_id is required")
try:
from omega.bridge import delete_memory
result = delete_memory(memory_id=memory_id)
if result.get("success"):
return mcp_response(f"Deleted memory `{memory_id[:16]}`")
else:
return mcp_error(result.get("error", f"Memory {memory_id} not found"))
except Exception as e:
logger.error("omega_delete_memory failed: %s", e, exc_info=True)
return mcp_error("Delete failed")
# ============================================================================
# Handler: omega_edit_memory
# ============================================================================
async def handle_omega_edit_memory(arguments: dict) -> dict:
"""Edit the content of a specific memory."""
memory_id = arguments.get("memory_id", "").strip()
new_content = arguments.get("new_content", "").strip()
if not memory_id:
return mcp_error("memory_id is required")
if not new_content:
return mcp_error("new_content is required")
try:
from omega.bridge import edit_memory
result = edit_memory(memory_id=memory_id, new_content=new_content)
if result.get("success"):
return mcp_response(f"Updated memory `{memory_id[:16]}`\nNew content: {new_content[:200]}")
else:
return mcp_error(result.get("error", f"Memory {memory_id} not found"))
except Exception as e:
logger.error("omega_edit_memory failed: %s", e, exc_info=True)
return mcp_error("Edit failed")
# ============================================================================
# Handler: omega_list_preferences
# ============================================================================
async def handle_omega_list_preferences(arguments: dict) -> dict:
"""List all stored user preferences."""
try:
from omega.bridge import list_preferences
prefs = list_preferences()
if not prefs:
return mcp_response("No preferences stored yet.")
lines = [f"## User Preferences ({len(prefs)} total)\n"]
for pref in prefs:
content = pref.get("content", "")[:200]
created = pref.get("created_at", "")[:16]
pref_id = pref.get("id", "")[:12]
lines.append(f"- {content}")
lines.append(f" _Created: {created} | id: {pref_id}_")
lines.append("")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_list_preferences failed: %s", e, exc_info=True)
return mcp_error("List preferences failed")
# ============================================================================
# Handler: omega_health (includes former omega_status stats)
# ============================================================================
async def handle_omega_health(arguments: dict) -> dict:
"""Detailed health check with memory usage, warnings, and recommendations."""
try:
from omega.bridge import check_health, status
warn_mb = _clamp_int(arguments.get("warn_mb", 350), default=350, max_val=10000)
critical_mb = _clamp_int(arguments.get("critical_mb", 800), default=800, max_val=10000)
max_nodes = _clamp_int(arguments.get("max_nodes", 10000), default=10000, max_val=100000)
result = check_health(warn_mb=warn_mb, critical_mb=critical_mb, max_nodes=max_nodes)
# Append basic stats (formerly omega_status)
try:
st = status()
result += f"**Backend:** {st.get('backend', 'sqlite')}\n"
result += f"**Store:** {st.get('store_path', '~/.omega')}\n"
result += f"**Vec enabled:** {st.get('vec_enabled', False)}\n"
except Exception:
pass
return mcp_response(result)
except Exception as e:
logger.error("omega_health failed: %s", e, exc_info=True)
return mcp_error("Health check failed")
# ============================================================================
# Handler: omega_backup (merged export + import)
# ============================================================================
async def handle_omega_backup(arguments: dict) -> dict:
"""Export or import memories (backup/restore)."""
mode = arguments.get("mode", "export").strip()
filepath = arguments.get("filepath", "").strip()
if not filepath:
return mcp_error("filepath is required")
# Path validation: restrict to ~/.omega/ to prevent sensitive file access
resolved = Path(filepath).expanduser().resolve()
safe_dir = _SAFE_EXPORT_DIR.resolve()
if not str(resolved).startswith(str(safe_dir) + "/") and resolved.parent != safe_dir:
return mcp_error(f"Path must be under {_SAFE_EXPORT_DIR}")
if mode == "import":
if not resolved.exists():
return mcp_error("File not found")
clear_existing = arguments.get("clear_existing", True)
try:
from omega.bridge import import_memories
result = import_memories(filepath=str(resolved), clear_existing=clear_existing)
return mcp_response(result)
except Exception as e:
logger.error("omega_backup import failed: %s", e, exc_info=True)
return mcp_error("Import failed (internal error)")
else:
try:
from omega.bridge import export_memories
result = export_memories(filepath=str(resolved))
# Warn if encryption is enabled — export is plaintext
from omega.crypto import is_enabled as crypto_enabled
if crypto_enabled():
result["warning"] = (
"OMEGA_ENCRYPT is enabled but exports are plaintext. "
"The export file contains unencrypted memory content. "
"Store it securely or delete after use."
)
return mcp_response(result)
except Exception as e:
logger.error("omega_backup export failed: %s", e, exc_info=True)
return mcp_error("Export failed (internal error)")
# ============================================================================
# Handler: omega_lessons (merged with omega_cross_project_lessons)
# ============================================================================
async def handle_omega_lessons(arguments: dict) -> dict:
"""Retrieve cross-session or cross-project lessons learned."""
try:
cross_project = arguments.get("cross_project", False)
task = arguments.get("task")
limit = _clamp_int(arguments.get("limit", 5), default=5, max_val=100)
agent_type = arguments.get("agent_type")
if cross_project:
from omega.bridge import get_cross_project_lessons
exclude_project = arguments.get("exclude_project")
exclude_session = arguments.get("exclude_session")
lessons = get_cross_project_lessons(
task=task,
exclude_project=exclude_project,
exclude_session=exclude_session,
limit=limit,
agent_type=agent_type,
)
if not lessons:
return mcp_response("No cross-project lessons found.")
output = f"# Cross-Project Lessons ({len(lessons)})\n\n"
for i, lesson in enumerate(lessons, 1):
proj = lesson.get("source_project", "?")
xp = " **[CROSS-PROJECT]**" if lesson.get("cross_project") else ""
output += f"## {i}. {lesson['content'][:120]}\n"
output += f"*Source: {proj} | Projects seen: {lesson.get('projects_seen', 1)}{xp}*\n"
output += f"*Accessed: {lesson.get('access_count', 0)} times | Created: {lesson.get('created_at', '?')[:16]}*\n\n"
return mcp_response(output)
else:
from omega.bridge import get_cross_session_lessons
project_path = arguments.get("project_path")
lessons = get_cross_session_lessons(
task=task,
project_path=project_path,
limit=limit,
agent_type=agent_type,
)
if not lessons:
return mcp_response("No cross-session lessons found yet.")
output = f"# Cross-Session Lessons ({len(lessons)})\n\n"
for i, lesson in enumerate(lessons, 1):
verified = " [verified]" if lesson.get("verified") else ""
access = lesson.get("access_count", 0)
output += f"## {i}. {lesson.get('content', '')[:200]}{verified}\n"
output += f"*Access count: {access} | Session: {lesson.get('session_id', 'unknown')[:16]}*\n\n"
return mcp_response(output)
except Exception as e:
logger.error("omega_lessons failed: %s", e, exc_info=True)
return mcp_error("Lessons failed")
# ============================================================================
# Handler: omega_feedback
# ============================================================================
async def handle_omega_feedback(arguments: dict) -> dict:
"""Record feedback on a surfaced memory."""
memory_id = arguments.get("memory_id", "").strip()
rating = arguments.get("rating", "").strip()
reason = arguments.get("reason")
if not memory_id:
return mcp_error("memory_id is required")
if rating not in ("helpful", "unhelpful", "outdated"):
return mcp_error("rating must be one of: helpful, unhelpful, outdated")
try:
from omega.bridge import record_feedback
result = record_feedback(memory_id=memory_id, rating=rating, reason=reason)
if "error" in result:
return mcp_error(result["error"])
return mcp_response(
f"Feedback recorded: {rating} for `{memory_id[:16]}`\n"
f"New score: {result.get('new_score', 0)} "
f"({result.get('total_signals', 0)} total signals)"
)
except Exception as e:
logger.error("omega_feedback failed: %s", e, exc_info=True)
return mcp_error("Feedback failed")
# ============================================================================
# Handler: omega_clear_session
# ============================================================================
async def handle_omega_clear_session(arguments: dict) -> dict:
"""Clear all memories for a session."""
session_id = arguments.get("session_id", "").strip()
if not session_id:
return mcp_error("session_id is required")
try:
from omega.bridge import clear_session
result = clear_session(session_id=session_id)
return mcp_response(f"Cleared session `{session_id[:16]}`: {result.get('removed', 0)} memories removed.")
except Exception as e:
logger.error("omega_clear_session failed: %s", e, exc_info=True)
return mcp_error("Clear session failed")
# ============================================================================
# Handler: omega_consolidate
# ============================================================================
async def handle_omega_consolidate(arguments: dict) -> dict:
"""Run memory consolidation: prune stale entries, cap summaries, clean edges."""
prune_days = _clamp_int(arguments.get("prune_days", 30), default=30, max_val=365)
max_summaries = _clamp_int(arguments.get("max_summaries", 50), default=50, max_val=1000)
try:
from omega.bridge import consolidate
result = consolidate(prune_days=prune_days, max_summaries=max_summaries)
return mcp_response(result)
except Exception as e:
logger.error("omega_consolidate failed: %s", e, exc_info=True)
return mcp_error("Consolidation failed")
# ============================================================================
# Handler: omega_similar
# ============================================================================
async def handle_omega_similar(arguments: dict) -> dict:
"""Find memories similar to a given memory."""
memory_id = arguments.get("memory_id", "").strip()
if not memory_id:
return mcp_error("memory_id is required")
limit = _clamp_int(arguments.get("limit", 5), default=5, max_val=100)
try:
from omega.bridge import find_similar_memories
result = find_similar_memories(memory_id=memory_id, limit=limit)
return mcp_response(result)
except Exception as e:
logger.error("omega_similar failed: %s", e, exc_info=True)
return mcp_error("Similar search failed")
# ============================================================================
# Handler: omega_timeline
# ============================================================================
async def handle_omega_timeline(arguments: dict) -> dict:
"""Show memory timeline grouped by day."""
days = _clamp_int(arguments.get("days", 7), default=7, min_val=0, max_val=365)
limit_per_day = _clamp_int(arguments.get("limit_per_day", 10), default=10, max_val=100)
try:
from omega.bridge import timeline
result = timeline(days=days, limit_per_day=limit_per_day)
return mcp_response(result)
except Exception as e:
logger.error("omega_timeline failed: %s", e, exc_info=True)
return mcp_error("Timeline failed")
# ============================================================================
# Handler: omega_traverse
# ============================================================================
async def handle_omega_traverse(arguments: dict) -> dict:
"""Traverse the memory relationship graph from a starting memory."""
memory_id = arguments.get("memory_id", "").strip()
if not memory_id:
return mcp_error("memory_id is required")
max_hops = arguments.get("max_hops", 2)
min_weight = arguments.get("min_weight", 0.0)
try:
from omega.bridge import traverse
result = traverse(
memory_id=memory_id,
max_hops=max_hops,
min_weight=min_weight,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_traverse failed: %s", e, exc_info=True)
return mcp_error("Traverse failed")
# ============================================================================
# Handler: omega_compact
# ============================================================================
async def handle_omega_compact(arguments: dict) -> dict:
"""Compact related memories into consolidated knowledge nodes."""
event_type = arguments.get("event_type", "lesson_learned")
similarity_threshold = arguments.get("similarity_threshold", 0.6)
min_cluster_size = _clamp_int(arguments.get("min_cluster_size", 3), default=3, min_val=2, max_val=100)
dry_run = arguments.get("dry_run", False)
try:
from omega.bridge import compact
result = compact(
event_type=event_type,
similarity_threshold=similarity_threshold,
min_cluster_size=min_cluster_size,
dry_run=dry_run,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_compact failed: %s", e, exc_info=True)
return mcp_error("Compact failed")
# ============================================================================
# Handler: omega_type_stats
# ============================================================================
async def handle_omega_type_stats(arguments: dict) -> dict:
"""Get memory counts grouped by event type."""
try:
from omega.bridge import type_stats
stats = type_stats()
if not stats:
return mcp_response("No memories stored yet.")
total = sum(stats.values())
lines = [f"# Memory Type Stats ({total} total)\n"]
for etype, count in sorted(stats.items(), key=lambda x: x[1], reverse=True):
pct = (count / total * 100) if total > 0 else 0
lines.append(f"- **{etype}**: {count} ({pct:.1f}%)")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_type_stats failed: %s", e, exc_info=True)
return mcp_error("Type stats failed")
# ============================================================================
# Handler: omega_session_stats
# ============================================================================
async def handle_omega_session_stats(arguments: dict) -> dict:
"""Get memory counts grouped by session ID."""
try:
from omega.bridge import session_stats
stats = session_stats()
if not stats:
return mcp_response("No session data found.")
# Sort by count descending, show top 20
sorted_sessions = sorted(stats.items(), key=lambda x: x[1], reverse=True)[:20]
total = sum(stats.values())
lines = [f"# Session Stats (top {len(sorted_sessions)} of {len(stats)} sessions, {total} total memories)\n"]
for sid, count in sorted_sessions:
truncated = sid[:16] + "..." if len(sid) > 16 else sid
lines.append(f"- `{truncated}`: {count} memories")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_session_stats failed: %s", e, exc_info=True)
return mcp_error("Session stats failed")
# ============================================================================
# Handler: omega_weekly_digest
# ============================================================================
async def handle_omega_weekly_digest(arguments: dict) -> dict:
"""Generate a weekly knowledge digest with stats, trends, and highlights."""
try:
from omega.bridge import get_weekly_digest
days = arguments.get("days", 7)
digest = get_weekly_digest(days=days)
lines = [f"# Your Week in Review ({digest['period_days']}d)\n"]
# Summary line
lines.append(
f"**{digest['period_new']} new memories** across "
f"{digest['session_count']} sessions "
f"({digest['total_memories']} total)"
)
# Growth
if digest["prev_period_count"] > 0:
direction = "up" if digest["growth_pct"] > 0 else "down"
lines.append(
f"**Growth:** {direction} {abs(digest['growth_pct'])}% vs previous {days}d "
f"({digest['prev_period_count']} -> {digest['period_new']})"
)
# Type breakdown
if digest["type_breakdown"]:
lines.append("\n**Breakdown:**")
for etype, count in sorted(digest["type_breakdown"].items(), key=lambda x: x[1], reverse=True):
if count > 0 and etype != "session_summary":
lines.append(f" - {etype}: {count}")
# Top topics
if digest["top_topics"]:
lines.append(f"\n**Top topics:** {', '.join(digest['top_topics'][:6])}")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_weekly_digest failed: %s", e, exc_info=True)
return mcp_error("Weekly digest failed")
# ============================================================================
# Handler: omega_checkpoint
# ============================================================================
async def handle_omega_checkpoint(arguments: dict) -> dict:
"""Save a task checkpoint for session continuity."""
task_title = arguments.get("task_title", "").strip()
progress = arguments.get("progress", "").strip()
if not task_title or not progress:
return mcp_error("task_title and progress are required")
# Build structured checkpoint content
checkpoint = {
"version": 1,
"task_title": task_title,
"plan": arguments.get("plan", ""),
"progress": progress,
"files_touched": arguments.get("files_touched", {}),
"decisions": arguments.get("decisions", []),
"key_context": arguments.get("key_context", ""),
"next_steps": arguments.get("next_steps", ""),
}
# Format as searchable text content
content_lines = [f"## Checkpoint: {task_title}"]
if checkpoint["plan"]:
content_lines.append(f"\n### Plan\n{checkpoint['plan']}")
content_lines.append(f"\n### Progress\n{checkpoint['progress']}")
if checkpoint["files_touched"]:
content_lines.append("\n### Files Changed")
for fp, summary in checkpoint["files_touched"].items():
content_lines.append(f"- `{fp}`: {summary}")
if checkpoint["decisions"]:
content_lines.append("\n### Decisions")
for d in checkpoint["decisions"]:
content_lines.append(f"- {d}")
if checkpoint["key_context"]:
content_lines.append(f"\n### Key Context\n{checkpoint['key_context']}")
if checkpoint["next_steps"]:
content_lines.append(f"\n### Next Steps\n{checkpoint['next_steps']}")
content = "\n".join(content_lines)
# Determine checkpoint number for this task
session_id = arguments.get("session_id")
project = arguments.get("project")
checkpoint_num = 1
try:
from omega.bridge import query_structured
existing = query_structured(
query_text=f"checkpoint {task_title}",
limit=10,
event_type="checkpoint",
)
if project:
existing = [e for e in existing if (e.get("metadata") or {}).get("project") == project]
checkpoint_num = len(existing) + 1
except Exception:
pass
metadata = {
"checkpoint_number": checkpoint_num,
"checkpoint_data": checkpoint,
}
try:
from omega.bridge import auto_capture
result = auto_capture(
content=content,
event_type="checkpoint",
metadata=metadata,
session_id=session_id,
project=project,
)
return mcp_response(f"{result}\n\nCheckpoint #{checkpoint_num} saved for: {task_title}")
except Exception as e:
logger.error("omega_checkpoint failed: %s", e, exc_info=True)
return mcp_error(f"Checkpoint failed: {e}")
# ============================================================================
# Handler: omega_resume_task
# ============================================================================
async def handle_omega_resume_task(arguments: dict) -> dict:
"""Resume a checkpointed task with full context."""
task_title = arguments.get("task_title", "").strip()
project = arguments.get("project")
verbosity = arguments.get("verbosity", "full")
limit = _clamp_int(arguments.get("limit"), 1, 1, 5)
# Build search query
query_text = f"checkpoint {task_title}" if task_title else "checkpoint"
try:
from omega.bridge import query_structured
results = query_structured(
query_text=query_text,
limit=limit * 3, # Over-fetch for filtering
event_type="checkpoint",
)
if not results:
return mcp_response("No checkpoints found. Start fresh or provide a different task title.")
# Post-filter by project if specified (metadata match, not query dilution)
if project:
filtered = [r for r in results if (r.get("metadata") or {}).get("project") == project]
if filtered:
results = filtered
# Take the most recent checkpoints (by created_at)
results = sorted(results, key=lambda r: r.get("created_at", ""), reverse=True)[:limit]
lines = [f"# Task Resume — {len(results)} checkpoint(s) found\n"]
for r in results:
meta = r.get("metadata", {})
checkpoint_data = meta.get("checkpoint_data", {})
cp_num = meta.get("checkpoint_number", "?")
created = r.get("created_at", "unknown")[:16]
if verbosity == "minimal":
next_steps = checkpoint_data.get("next_steps", "No next steps recorded")
lines.append(f"## Checkpoint #{cp_num} ({created})")
lines.append(f"**Task**: {checkpoint_data.get('task_title', 'Unknown')}")
lines.append(f"**Next Steps**: {next_steps}\n")
elif verbosity == "summary":
lines.append(f"## Checkpoint #{cp_num} ({created})")
lines.append(f"**Task**: {checkpoint_data.get('task_title', 'Unknown')}")
if checkpoint_data.get("plan"):
lines.append(f"**Plan**: {checkpoint_data['plan']}")
lines.append(f"**Progress**: {checkpoint_data.get('progress', 'Unknown')}")
lines.append(f"**Next Steps**: {checkpoint_data.get('next_steps', 'None')}\n")
else: # full
lines.append(r.get("content", "No content"))
if checkpoint_data.get("files_touched") and "Files Changed" not in r.get("content", ""):
lines.append("\n### Files Changed")
for fp, summary in checkpoint_data["files_touched"].items():
lines.append(f"- `{fp}`: {summary}")
lines.append("")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_resume_task failed: %s", e, exc_info=True)
return mcp_error(f"Resume failed: {e}")
# ============================================================================
# Handler: omega_remind
# ============================================================================
async def handle_omega_remind(arguments: dict) -> dict:
"""Create a time-based reminder."""
text = arguments.get("text", "").strip()
duration = arguments.get("duration", "").strip()
if not text:
return mcp_error("text is required")
if not duration:
return mcp_error("duration is required (e.g. '1h', '30m', '2d')")
context = arguments.get("context")
session_id = arguments.get("session_id")
project = arguments.get("project")
try:
from omega.bridge import create_reminder
result = create_reminder(
text=text,
duration=duration,
context=context,
session_id=session_id,
project=project,
)
lines = [
f"Reminder set: {result['text']}",
f"Due at: {result['remind_at_local']}",
f"ID: {result['reminder_id']}",
]
return mcp_response("\n".join(lines))
except ValueError as e:
return mcp_error(str(e))
except Exception as e:
logger.error("omega_remind failed: %s", e, exc_info=True)
return mcp_error(f"Failed to create reminder: {e}")
# ============================================================================
# Handler: omega_remind_list
# ============================================================================
async def handle_omega_remind_list(arguments: dict) -> dict:
"""List reminders with status and due times."""
status = arguments.get("status")
try:
from omega.bridge import list_reminders
include_dismissed = status in ("dismissed", "all")
reminders = list_reminders(status=status, include_dismissed=include_dismissed)
if not reminders:
return mcp_response("No reminders found.")
lines = [f"**Reminders** ({len(reminders)} found)\n"]
status_icons = {"pending": "⏳", "fired": "🔔", "dismissed": "✓"}
for r in reminders:
icon = status_icons.get(r["status"], "?")
overdue = " **[OVERDUE]**" if r.get("is_overdue") else ""
lines.append(f"- {icon} {r['text']}{overdue}")
lines.append(f" Due: {r['remind_at_local']} | Status: {r['status']} | Time: {r['time_until']}")
if r.get("context"):
lines.append(f" Context: {r['context'][:120]}")
lines.append(f" ID: {r['id']}")
return mcp_response("\n".join(lines))
except Exception as e:
logger.error("omega_remind_list failed: %s", e, exc_info=True)
return mcp_error(f"Failed to list reminders: {e}")
# ============================================================================
# Handler: omega_remind_dismiss
# ============================================================================
async def handle_omega_remind_dismiss(arguments: dict) -> dict:
"""Dismiss a reminder by ID."""
reminder_id = arguments.get("reminder_id", "").strip()
if not reminder_id:
return mcp_error("reminder_id is required")
try:
from omega.bridge import dismiss_reminder
result = dismiss_reminder(reminder_id)
if result.get("success"):
return mcp_response(f"Dismissed reminder: {result.get('text', reminder_id)}")
return mcp_error(result.get("error", "Failed to dismiss reminder"))
except Exception as e:
logger.error("omega_remind_dismiss failed: %s", e, exc_info=True)
return mcp_error(f"Failed to dismiss reminder: {e}")
# ============================================================================
# Handler: omega_protocol
# ============================================================================
async def handle_omega_protocol(arguments: dict) -> dict:
"""Serve the coordination playbook dynamically based on context."""
section = arguments.get("section")
project = arguments.get("project")
# Detect peer count for auto-mode selection
peer_count = 0
try:
from omega.coordination import get_manager
mgr = get_manager()
sessions = mgr.list_sessions(auto_clean=True)
peer_count = max(0, len(sessions) - 1)
except Exception:
pass # Solo mode if coordination unavailable
try:
from omega.protocol import get_protocol
result = get_protocol(
section=section,
project=project,
include_lessons=True,
peer_count=peer_count,
)
return mcp_response(result)
except Exception as e:
logger.error("omega_protocol failed: %s", e, exc_info=True)
return mcp_error(f"Protocol failed: {e}")
# ============================================================================
# Composite Routing Handlers (action-discriminated)
# ============================================================================
async def handle_omega_memory(arguments: dict) -> dict:
"""Route omega_memory composite to individual handlers by action."""
action = arguments.get("action", "").strip()
if action == "edit":
return await handle_omega_edit_memory(arguments)
elif action == "delete":
return await handle_omega_delete_memory(arguments)
elif action == "feedback":
return await handle_omega_feedback(arguments)
elif action == "similar":
return await handle_omega_similar(arguments)
elif action == "traverse":
return await handle_omega_traverse(arguments)
else:
return mcp_error(f"Unknown omega_memory action: {action!r}. Use: edit, delete, feedback, similar, traverse")
async def handle_omega_remind_composite(arguments: dict) -> dict:
"""Route omega_remind composite to individual handlers by action."""
action = arguments.get("action", "set").strip()
if action == "set":
return await handle_omega_remind(arguments)
elif action == "list":
return await handle_omega_remind_list(arguments)
elif action == "dismiss":
return await handle_omega_remind_dismiss(arguments)
else:
return mcp_error(f"Unknown omega_remind action: {action!r}. Use: set, list, dismiss")
async def handle_omega_maintain(arguments: dict) -> dict:
"""Route omega_maintain composite to individual handlers by action."""
action = arguments.get("action", "").strip()
if action == "health":
return await handle_omega_health(arguments)
elif action == "consolidate":
return await handle_omega_consolidate(arguments)
elif action == "compact":
return await handle_omega_compact(arguments)
elif action == "backup":
return await handle_omega_backup({**arguments, "mode": "export"})
elif action == "restore":
return await handle_omega_backup({**arguments, "mode": "import"})
elif action == "clear_session":
return await handle_omega_clear_session(arguments)
else:
return mcp_error(f"Unknown omega_maintain action: {action!r}. Use: health, consolidate, compact, backup, restore, clear_session")
async def handle_omega_stats(arguments: dict) -> dict:
"""Route omega_stats composite to individual handlers by action."""
action = arguments.get("action", "").strip()
if action == "types":
return await handle_omega_type_stats(arguments)
elif action == "sessions":
return await handle_omega_session_stats(arguments)
elif action == "digest":
return await handle_omega_weekly_digest(arguments)
elif action == "access_rate":
return await handle_omega_access_rate(arguments)
else:
return mcp_error(f"Unknown omega_stats action: {action!r}. Use: types, sessions, digest, access_rate")
async def handle_omega_access_rate(arguments: dict) -> dict:
"""Return access rate breakdown for memories."""
try:
from omega.bridge import access_rate_stats
stats = access_rate_stats()
output = "# Memory Access Rate\n\n"
output += f"- **Total memories:** {stats['total_memories']}\n"
output += f"- **Never accessed:** {stats['zero_access_count']} ({stats['never_accessed_pct']}%)\n"
output += f"- **Average access count:** {stats['avg_access_count']}\n\n"
output += "## By Event Type\n"
output += "| Type | Count | Avg Access | Never Accessed |\n"
output += "|------|-------|------------|----------------|\n"
for t in stats["by_type"]:
output += f"| {t['event_type']} | {t['count']} | {t['avg_access_count']} | {t['zero_access_count']} ({t['zero_access_pct']}%) |\n"
if stats["top_accessed"]:
output += "\n## Top 10 Most Accessed\n"
for m in stats["top_accessed"]:
output += f"- **{m['access_count']}x** [{m['event_type']}] {m['content']}\n"
return mcp_response(output)
except Exception as e:
logger.error("omega_access_rate failed: %s", e, exc_info=True)
return mcp_error(f"Access rate query failed: {e}")
# ============================================================================
# Handler Registry
# ============================================================================
HANDLERS: Dict[str, Any] = {
# === 12 consolidated tools ===
"omega_store": handle_omega_store,
"omega_query": handle_omega_query,
"omega_welcome": handle_omega_welcome,
"omega_protocol": handle_omega_protocol,
"omega_lessons": handle_omega_lessons,
"omega_checkpoint": handle_omega_checkpoint,
"omega_resume_task": handle_omega_resume_task,
"omega_memory": handle_omega_memory,
"omega_profile": handle_omega_profile,
"omega_remind": handle_omega_remind_composite,
"omega_maintain": handle_omega_maintain,
"omega_stats": handle_omega_stats,
# === Backward compatibility aliases (old tool names -> handlers) ===
"omega_remember": lambda args: handle_omega_store(
{**args, "event_type": args.get("event_type", "user_preference")}
),
"omega_delete_memory": handle_omega_delete_memory,
"omega_edit_memory": handle_omega_edit_memory,
"omega_list_preferences": handle_omega_list_preferences,
"omega_health": handle_omega_health,
"omega_backup": handle_omega_backup,
"omega_save_profile": handle_omega_profile,
"omega_feedback": handle_omega_feedback,
"omega_clear_session": handle_omega_clear_session,
"omega_similar": handle_omega_similar,
"omega_timeline": handle_omega_timeline,
"omega_consolidate": handle_omega_consolidate,
"omega_traverse": handle_omega_traverse,
"omega_compact": handle_omega_compact,
"omega_phrase_search": lambda args: handle_omega_query(
{**args, "query": args.get("phrase", args.get("query", "")), "mode": "phrase"}
),
"omega_type_stats": handle_omega_type_stats,
"omega_session_stats": handle_omega_session_stats,
"omega_weekly_digest": handle_omega_weekly_digest,
"omega_remind_list": handle_omega_remind_list,
"omega_remind_dismiss": handle_omega_remind_dismiss,
}