"""
MCP Tool Definitions and Handlers for BuildAutomata Memory System
Copyright 2025 Jurden Bruce
All tool responses return JSON for AI consumption, not human-formatted text.
"""
import json
import uuid
import logging
from datetime import datetime
from typing import List, Dict, Any
from mcp.types import Tool, TextContent
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles datetime objects"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
logger = logging.getLogger("buildautomata-memory.mcp-tools")
# Import models - handle both package and direct execution
try:
from .models import Memory
except ImportError:
from models import Memory
def get_tool_definitions() -> List[Tool]:
"""Return list of available MCP tools"""
return [
Tool(
name="store_memory",
description="Store a new memory with flexible categorization",
inputSchema={
"type": "object",
"properties": {
"content": {"type": "string", "description": "Content to store"},
"category": {"type": "string", "description": "Category (any string)", "default": "general"},
"importance": {"type": "number", "description": "Importance (0.0-1.0)", "default": 0.5},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags array", "default": []},
"metadata": {"type": "object", "description": "Additional metadata", "default": {}},
"memory_type": {"type": "string", "description": "episodic|semantic|working", "default": "episodic"},
"session_id": {"type": "string", "description": "Session identifier"},
"task_context": {"type": "string", "description": "Task context"},
},
"required": ["content"],
},
),
Tool(
name="search_memories",
description="Search memories via semantic + full-text search. Returns structured data with version history and access stats.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 5},
"category": {"type": "string", "description": "Filter by category"},
"min_importance": {"type": "number", "description": "Min importance (0.0-1.0)", "default": 0.0},
"created_after": {"type": "string", "description": "ISO date filter"},
"created_before": {"type": "string", "description": "ISO date filter"},
},
"required": ["query"],
},
),
Tool(
name="get_memory",
description="Retrieve specific memory by ID with full details",
inputSchema={
"type": "object",
"properties": {
"memory_id": {"type": "string", "description": "Memory ID"},
},
"required": ["memory_id"],
},
),
Tool(
name="update_memory",
description="Update existing memory (creates new version)",
inputSchema={
"type": "object",
"properties": {
"memory_id": {"type": "string", "description": "Memory ID to update"},
"content": {"type": "string", "description": "New content (optional)"},
"category": {"type": "string", "description": "New category (optional)"},
"importance": {"type": "number", "description": "New importance (optional)"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "New tags (optional)"},
"metadata": {"type": "object", "description": "Metadata to merge (optional)"},
},
"required": ["memory_id"],
},
),
Tool(
name="get_statistics",
description="Get memory system statistics (totals, categories, tags, performance)",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="store_intention",
description="Store proactive intention for agent to pursue",
inputSchema={
"type": "object",
"properties": {
"description": {"type": "string", "description": "Intention description"},
"priority": {"type": "number", "description": "Priority (0.0-1.0)", "default": 0.5},
"preconditions": {"type": "array", "items": {"type": "string"}, "default": []},
"actions": {"type": "array", "items": {"type": "string"}, "default": []},
"deadline": {"type": "string", "description": "ISO datetime deadline"},
},
"required": ["description"],
},
),
Tool(
name="get_active_intentions",
description="Get all active intentions ordered by priority",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="initialize_agent",
description="Proactive initialization scan - checks continuity, active intentions, and provides recent context. Run at session start.",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="traverse_memory_graph",
description="Traverse memory graph from a starting memory ID, following connections up to N hops",
inputSchema={
"type": "object",
"properties": {
"start_memory_id": {"type": "string", "description": "Starting memory ID"},
"max_hops": {"type": "integer", "description": "Maximum hops to traverse", "default": 2},
"min_importance": {"type": "number", "description": "Min importance filter", "default": 0.0},
},
"required": ["start_memory_id"],
},
),
Tool(
name="find_memory_clusters",
description="Find clusters of connected memories in the graph",
inputSchema={
"type": "object",
"properties": {
"min_cluster_size": {"type": "integer", "description": "Minimum cluster size", "default": 3},
"min_importance": {"type": "number", "description": "Min importance filter", "default": 0.0},
},
},
),
Tool(
name="get_graph_stats",
description="Get memory graph statistics (connections, clusters, hubs)",
inputSchema={
"type": "object",
"properties": {
"category": {"type": "string", "description": "Filter by category"},
"min_importance": {"type": "number", "description": "Min importance filter", "default": 0.0},
},
},
),
Tool(
name="update_intention_status",
description="Update intention status (pending/active/completed/cancelled)",
inputSchema={
"type": "object",
"properties": {
"intention_id": {"type": "string", "description": "Intention ID to update"},
"status": {"type": "string", "description": "New status: pending|active|completed|cancelled"},
"notes": {"type": "string", "description": "Optional notes about the status change"},
},
"required": ["intention_id", "status"],
},
),
Tool(
name="get_command_history",
description="Get history of MCP tool calls for audit trail and session reconstruction",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Max results", "default": 20},
"tool_name": {"type": "string", "description": "Filter by specific tool name"},
"start_date": {"type": "string", "description": "Filter from date (ISO format, e.g. 2025-11-10)"},
"end_date": {"type": "string", "description": "Filter to date (ISO format, e.g. 2025-11-20)"},
},
},
),
Tool(
name="get_most_accessed_memories",
description="Get most accessed memories with tag cloud. Reveals behavioral truth - what memories you actually rely on (based on access_count) vs what you think is important (declared importance). Implements Saint Bernard pattern: importance from usage, not declaration.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of memories to retrieve (default: 20)",
"default": 20,
},
},
"required": [],
},
),
Tool(
name="get_least_accessed_memories",
description="Get least accessed memories - reveals dead weight and buried treasure. Shows memories with lowest access_count (excluding very recent ones). Reveals: (1) Dead weight - high importance but never used, (2) Buried treasure - good content with poor metadata, (3) Temporal artifacts - once crucial, now obsolete, (4) Storage habits audit.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of memories to retrieve (default: 20)",
"default": 20,
},
"min_age_days": {
"type": "integer",
"description": "Minimum age in days (excludes recent memories that haven't had time to be accessed, default: 7)",
"default": 7,
},
},
"required": [],
},
),
Tool(
name="list_categories",
description="List all memory categories with counts. Useful for browsing organization structure and finding categories to explore.",
inputSchema={
"type": "object",
"properties": {
"min_count": {
"type": "integer",
"description": "Minimum number of memories required to show category (default 1)",
"default": 1,
},
},
"required": [],
},
),
Tool(
name="list_tags",
description="List all tags with usage counts. Useful for discovering tag vocabulary and finding related memories.",
inputSchema={
"type": "object",
"properties": {
"min_count": {
"type": "integer",
"description": "Minimum usage count to show tag (default 1)",
"default": 1,
},
},
"required": [],
},
),
Tool(
name="get_session_memories",
description="Retrieve all memories from a work session or time period. Enables 'load where I left off' by reconstructing full session context. Filter by session_id, date_range, or task_context.",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "UUID of session to retrieve",
},
"date_range": {
"type": "array",
"items": {"type": "string"},
"description": "Start and end dates in ISO format [start, end]",
},
"task_context": {
"type": "string",
"description": "Filter by task context string (partial match)",
},
"limit": {
"type": "integer",
"description": "Max memories to return (default: 100)",
"default": 100,
},
},
"required": [],
},
),
Tool(
name="get_memory_timeline",
description="""Get comprehensive memory timeline - a biographical narrative of memory formation and evolution.
Features:
- Chronological progression: All memory events ordered by time
- Version diffs: See actual content changes between versions
- Burst detection: Identify periods of intensive memory activity
- Gap analysis: Discover voids in memory (discontinuous existence)
- Cross-references: Track when memories reference each other
- Narrative arc: See how understanding evolved from first contact to current state
This is the closest thing to a "life story" from memories - showing not just content but tempo and rhythm of consciousness.
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Semantic search query to find related memories",
},
"memory_id": {
"type": "string",
"description": "Specific memory ID to get timeline for",
},
"limit": {
"type": "integer",
"description": "Maximum number of memories to track (default: 10)",
"default": 10,
},
"start_date": {
"type": "string",
"description": "Filter events after this date (ISO format: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)",
},
"end_date": {
"type": "string",
"description": "Filter events before this date (ISO format: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)",
},
"show_all_memories": {
"type": "boolean",
"description": "Show ALL memories in chronological order (full timeline)",
"default": False,
},
"include_diffs": {
"type": "boolean",
"description": "Include text diffs showing content changes",
"default": True,
},
"include_patterns": {
"type": "boolean",
"description": "Include burst/gap pattern analysis",
"default": True,
},
},
"required": [],
},
),
Tool(
name="run_maintenance",
description="Run database maintenance (VACUUM, ANALYZE, repair missing embeddings). Call this periodically to optimize performance and fix vector search issues.",
inputSchema={"type": "object", "properties": {}},
),
]
async def handle_tool_call(name: str, arguments: Dict[str, Any], memory_store) -> List[TextContent]:
"""
Handle MCP tool calls with JSON responses
Args:
name: Tool name
arguments: Tool arguments
memory_store: MemoryStore instance
Returns:
List of TextContent with JSON-encoded responses
"""
try:
if name == "store_memory":
memory = Memory(
id=str(uuid.uuid4()),
content=arguments["content"],
category=arguments.get("category", "general"),
importance=arguments.get("importance", 0.5),
tags=arguments.get("tags", []),
metadata=arguments.get("metadata", {}),
created_at=datetime.now(),
updated_at=datetime.now(),
memory_type=arguments.get("memory_type", "episodic"),
session_id=arguments.get("session_id"),
task_context=arguments.get("task_context"),
)
result = await memory_store.store_memory(memory, is_update=False)
response = {
"success": result["success"],
"memory_id": memory.id if result["success"] else None,
"backends": result.get("backends", []),
"similar_memories": result.get("similar_memories", []),
"error": result.get("error"),
}
_log_command(memory_store, name, arguments, "stored", memory.id if result["success"] else None, result["success"])
return [TextContent(type="text", text=json.dumps(response, indent=2, cls=DateTimeEncoder))]
elif name == "search_memories":
results = await memory_store.search_memories(
query=arguments["query"],
limit=arguments.get("limit", 5),
category=arguments.get("category"),
min_importance=arguments.get("min_importance", 0.0),
created_after=arguments.get("created_after"),
created_before=arguments.get("created_before"),
)
_log_command(memory_store, name, arguments, f"found {len(results)}", None, True)
return [TextContent(type="text", text=json.dumps(results, indent=2, cls=DateTimeEncoder))]
elif name == "get_memory":
result = await memory_store.get_memory_by_id(arguments["memory_id"])
_log_command(memory_store, name, arguments, "retrieved", arguments["memory_id"], True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "update_memory":
result = await memory_store.update_memory(
memory_id=arguments["memory_id"],
content=arguments.get("content"),
category=arguments.get("category"),
importance=arguments.get("importance"),
tags=arguments.get("tags"),
metadata=arguments.get("metadata"),
)
_log_command(memory_store, name, arguments, "updated", arguments["memory_id"], result.get("success", True))
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_statistics":
result = memory_store.get_statistics()
_log_command(memory_store, name, arguments, "stats_retrieved", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "store_intention":
result = await memory_store.store_intention(
description=arguments["description"],
priority=arguments.get("priority", 0.5),
preconditions=arguments.get("preconditions", []),
actions=arguments.get("actions", []),
deadline=arguments.get("deadline"),
)
_log_command(memory_store, name, arguments, "intention_stored", result.get("intention_id"), True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_active_intentions":
result = await memory_store.get_active_intentions()
_log_command(memory_store, name, arguments, f"found {len(result)}", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "initialize_agent":
result = await memory_store.proactive_initialization_scan()
_log_command(memory_store, name, arguments, "initialized", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_graph_stats":
result = await memory_store.get_graph_statistics(
category=arguments.get("category"),
min_importance=arguments.get("min_importance", 0.0),
)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "traverse_memory_graph":
result = await memory_store.traverse_graph(
start_memory_id=arguments["start_memory_id"],
depth=arguments.get("max_hops", 2),
min_importance=arguments.get("min_importance", 0.0),
)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "find_memory_clusters":
result = await memory_store.find_clusters(
min_cluster_size=arguments.get("min_cluster_size", 3),
min_importance=arguments.get("min_importance", 0.0),
)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "update_intention_status":
# Convert notes to metadata_updates format for backend
metadata_updates = None
if arguments.get("notes"):
metadata_updates = {"status_notes": arguments["notes"]}
result = await memory_store.update_intention_status(
intention_id=arguments["intention_id"],
status=arguments["status"],
metadata_updates=metadata_updates,
)
_log_command(memory_store, name, arguments, "status_updated", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_command_history":
result = memory_store.sqlite_store.get_command_history(
limit=arguments.get("limit", 20),
tool_name=arguments.get("tool_name"),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date"),
)
# Don't log get_command_history calls to avoid noise
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_most_accessed_memories":
result = await memory_store.get_most_accessed_memories(limit=arguments.get("limit", 20))
_log_command(memory_store, name, arguments, f"retrieved {arguments.get('limit', 20)} most accessed", None, True)
return [TextContent(type="text", text=result)]
elif name == "get_least_accessed_memories":
result = await memory_store.get_least_accessed_memories(
limit=arguments.get("limit", 20),
min_age_days=arguments.get("min_age_days", 7)
)
_log_command(memory_store, name, arguments, f"retrieved {arguments.get('limit', 20)} least accessed", None, True)
return [TextContent(type="text", text=result)]
elif name == "list_categories":
result = await memory_store.list_categories(min_count=arguments.get("min_count", 1))
_log_command(memory_store, name, arguments, f"listed {len(result.get('categories', []))} categories", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "list_tags":
result = await memory_store.list_tags(min_count=arguments.get("min_count", 1))
_log_command(memory_store, name, arguments, f"listed {len(result.get('tags', []))} tags", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_session_memories":
result = await memory_store.get_session_memories(
session_id=arguments.get("session_id"),
date_range=arguments.get("date_range"),
task_context=arguments.get("task_context"),
limit=arguments.get("limit", 100)
)
_log_command(memory_store, name, arguments, f"retrieved {len(result)} session memories", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "get_memory_timeline":
result = await memory_store.get_memory_timeline(
query=arguments.get("query"),
memory_id=arguments.get("memory_id"),
limit=arguments.get("limit", 10),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date"),
show_all_memories=arguments.get("show_all_memories", False),
include_diffs=arguments.get("include_diffs", True),
include_patterns=arguments.get("include_patterns", True),
include_semantic_relations=arguments.get("include_semantic_relations", False),
)
_log_command(memory_store, name, arguments, f"timeline: {result.get('total_events', 0)} events", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
elif name == "run_maintenance":
result = await memory_store.maintenance()
_log_command(memory_store, name, arguments, "maintenance_complete", None, True)
return [TextContent(type="text", text=json.dumps(result, indent=2, cls=DateTimeEncoder))]
else:
return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}))]
except Exception as e:
logger.error(f"Tool execution error: {name}: {e}", exc_info=True)
_log_command(memory_store, name, arguments, str(e), None, False)
return [TextContent(type="text", text=json.dumps({
"error": str(e),
"tool": name,
"type": type(e).__name__,
}, indent=2))]
def _log_command(memory_store, tool_name: str, args: Dict[str, Any],
result_summary: str = None, memory_id: str = None, success: bool = True):
"""Helper to log command to history"""
try:
if hasattr(memory_store, 'sqlite_store') and memory_store.sqlite_store:
memory_store.sqlite_store.log_command(tool_name, args, result_summary, memory_id, success)
except Exception as e:
logger.warning(f"Failed to log command {tool_name}: {e}")