Skip to main content
Glama

BuildAutomata Memory MCP Server

by brucepro
interactive_memory.py63 kB
#!/usr/bin/env python3 """ Interactive Memory CLI for BuildAutomata Memory System Provides direct CLI access to memory operations for Claude Code and other tools. Usage: python interactive_memory.py store "content" --category general --importance 0.8 --tags tag1,tag2 python interactive_memory.py search "query" --limit 5 --category general python interactive_memory.py update MEMORY_ID --content "new content" --importance 0.9 python interactive_memory.py timeline --query "query" --limit 10 python interactive_memory.py stats python interactive_memory.py prune --max 1000 --dry-run python interactive_memory.py maintenance """ import sys import os import asyncio import json import argparse from pathlib import Path from datetime import datetime from typing import Optional, List # Add parent directory to path to import MemoryStore sys.path.insert(0, str(Path(__file__).parent)) # Import from the MCP server from buildautomata_memory_mcp import MemoryStore, Memory class MemoryCLI: """CLI interface for memory operations""" def __init__(self, username: str = None, agent_name: str = None, lazy_load: bool = False): self.username = username or os.getenv("BA_USERNAME", "buildautomata_ai_v012") self.agent_name = agent_name or os.getenv("BA_AGENT_NAME", "claude_assistant") self.memory_store = MemoryStore(self.username, self.agent_name, lazy_load=lazy_load) async def store(self, content: str, category: str = "general", importance: float = 0.5, tags: List[str] = None, metadata: dict = None, memory_type: str = "episodic", session_id: str = None, task_context: str = None) -> dict: """Store a new memory""" import uuid as uuid_lib memory = Memory( id=str(uuid_lib.uuid4()), # Generate UUID content=content, category=category, importance=importance, tags=tags or [], metadata=metadata or {}, created_at=datetime.now(), updated_at=datetime.now(), memory_type=memory_type, session_id=session_id, task_context=task_context ) store_result = await self.memory_store.store_memory(memory) result = { "success": store_result["success"], "memory_id": memory.id, "warnings": store_result.get("backends", []), "content": content, "similar_memories": store_result.get("similar_memories", []) } return result async def search(self, query: str, limit: int = 5, category: str = None, min_importance: float = 0.0, created_after: str = None, created_before: str = None, updated_after: str = None, updated_before: str = None, memory_type: str = None, session_id: str = None) -> dict: """Search for memories""" results = await self.memory_store.search_memories( query=query, limit=limit, category=category, min_importance=min_importance, created_after=created_after, created_before=created_before, updated_after=updated_after, updated_before=updated_before, memory_type=memory_type, session_id=session_id ) # Results are already dictionaries from the store return { "query": query, "count": len(results), "results": results } async def update(self, memory_id: str, content: str = None, category: str = None, importance: float = None, tags: List[str] = None, metadata: dict = None) -> dict: """Update an existing memory""" updates = {} if content is not None: updates["content"] = content if category is not None: updates["category"] = category if importance is not None: updates["importance"] = importance if tags is not None: updates["tags"] = tags if metadata is not None: updates["metadata"] = metadata update_result = await self.memory_store.update_memory( memory_id=memory_id, **updates ) return { "success": update_result["success"], "memory_id": memory_id, "changes": update_result.get("message", ""), "warnings": update_result.get("backends", []) } async def timeline(self, query: str = None, memory_id: str = None, limit: int = 10, start_date: str = None, end_date: str = None, show_all_memories: bool = False, include_diffs: bool = True, include_patterns: bool = True, include_semantic_relations: bool = False) -> dict: """Get memory timeline""" result = await self.memory_store.get_memory_timeline( query=query, memory_id=memory_id, limit=limit, start_date=start_date, end_date=end_date, show_all_memories=show_all_memories, include_diffs=include_diffs, include_patterns=include_patterns, include_semantic_relations=include_semantic_relations ) # Return comprehensive result return result async def stats(self) -> dict: """Get memory statistics""" stats = self.memory_store.get_statistics() return stats async def prune(self, max_memories: int = None, dry_run: bool = False) -> dict: """Prune old memories""" result = await self.memory_store.prune_old_memories( max_memories=max_memories, dry_run=dry_run ) return result async def maintenance(self) -> dict: """Run maintenance""" result = await self.memory_store.maintenance() return result async def init(self) -> dict: """Initialize agent - load context, intentions, continuity""" result = await self.memory_store.proactive_initialization_scan() return result async def get_by_id(self, memory_id: str) -> dict: """Get a specific memory by ID""" result = await self.memory_store.get_memory_by_id(memory_id) return result async def list_categories(self, min_count: int = 1) -> dict: """List all categories with counts""" result = await self.memory_store.list_categories(min_count=min_count) return result async def list_tags(self, min_count: int = 1) -> dict: """List all tags with usage counts""" result = await self.memory_store.list_tags(min_count=min_count) return result async def get_most_accessed(self, limit: int = 20) -> dict: """Get most accessed memories with tag cloud""" result_json = await self.memory_store.get_most_accessed_memories(limit=limit) result = json.loads(result_json) return result async def get_least_accessed(self, limit: int = 20, min_age_days: int = 7) -> dict: """Get least accessed memories - dead weight and buried treasure""" result_json = await self.memory_store.get_least_accessed_memories(limit=limit, min_age_days=min_age_days) result = json.loads(result_json) return result async def store_intention(self, description: str, priority: float = 0.5, deadline: str = None, preconditions: List[str] = None, actions: List[str] = None, related_memories: List[str] = None, metadata: dict = None) -> dict: """Store a new intention""" from datetime import datetime deadline_dt = None if deadline: try: deadline_dt = datetime.fromisoformat(deadline) except ValueError: return {"error": f"Invalid deadline format: {deadline}. Use ISO format (YYYY-MM-DDTHH:MM:SS)"} result = await self.memory_store.store_intention( description=description, priority=priority, deadline=deadline_dt, preconditions=preconditions, actions=actions, related_memories=related_memories, metadata=metadata ) return result async def get_intentions(self, limit: int = 10, include_pending: bool = True) -> dict: """Get active intentions""" intentions = await self.memory_store.get_active_intentions( limit=limit, include_pending=include_pending ) return {"intentions": intentions, "count": len(intentions)} async def update_intention_status(self, intention_id: str, status: str, metadata_updates: dict = None) -> dict: """Update intention status""" result = await self.memory_store.update_intention_status( intention_id=intention_id, status=status, metadata_updates=metadata_updates ) return result async def get_session_memories(self, session_id: str = None, start_date: str = None, end_date: str = None, task_context: str = None, limit: int = 100) -> dict: """Get memories from a work session or time period""" date_range = None if start_date and end_date: date_range = (start_date, end_date) memories = await self.memory_store.get_session_memories( session_id=session_id, date_range=date_range, task_context=task_context, limit=limit ) return { "session_id": session_id, "date_range": date_range, "task_context": task_context, "count": len(memories), "memories": memories } async def consolidate_memories(self, memory_ids: List[str], consolidation_type: str = "summarize", target_length: int = 500, new_memory_type: str = "semantic") -> dict: """Consolidate multiple episodic memories into semantic memory""" result = await self.memory_store.consolidate_memories( memory_ids=memory_ids, consolidation_type=consolidation_type, target_length=target_length, new_memory_type=new_memory_type ) return result async def traverse_graph(self, start_memory_id: str, depth: int = 2, max_nodes: int = 50, min_importance: float = 0.0, category_filter: str = None) -> dict: """Traverse memory graph N hops from starting node""" result = await self.memory_store.traverse_graph( start_memory_id=start_memory_id, depth=depth, max_nodes=max_nodes, min_importance=min_importance, category_filter=category_filter ) return result async def find_clusters(self, min_cluster_size: int = 3, min_importance: float = 0.5, limit: int = 10) -> dict: """Find densely connected regions in memory graph""" result = await self.memory_store.find_clusters( min_cluster_size=min_cluster_size, min_importance=min_importance, limit=limit ) return result async def graph_stats(self, category: str = None, min_importance: float = 0.0) -> dict: """Get graph connectivity statistics""" result = await self.memory_store.get_graph_statistics( category=category, min_importance=min_importance ) return result async def shutdown(self): """Shutdown the store""" await self.memory_store.shutdown() def parse_tags(tags_str: str) -> List[str]: """Parse comma-separated tags""" if not tags_str: return [] return [tag.strip() for tag in tags_str.split(",") if tag.strip()] def parse_metadata(metadata_str: str) -> dict: """Parse JSON metadata""" if not metadata_str: return {} try: return json.loads(metadata_str) except json.JSONDecodeError as e: print(f"Error parsing metadata JSON: {e}", file=sys.stderr) return {} def format_compact_search(result: dict) -> str: """Compact AI-optimized search output""" lines = [f"SEARCH_RESULTS count={result['count']} query={result['query']}"] for mem in result['results']: mem_id = mem.get('memory_id', mem.get('id', 'unknown')) category = mem.get('category', 'general') importance = mem.get('importance', 0.0) current_imp = mem.get('current_importance', importance) content = mem.get('content', '').replace('\n', ' ').encode('ascii', 'replace').decode('ascii') tags = ','.join(mem.get('tags', [])) created = mem.get('created_at', '') versions = mem.get('version_count', 1) accessed = mem.get('access_count', 0) lines.append(f"MEMORY id={mem_id} category={category} importance={importance:.2f} current={current_imp:.2f} versions={versions} accessed={accessed} created={created}") lines.append(f"CONTENT {content}") if tags: lines.append(f"TAGS {tags}") return '\n'.join(lines) def format_compact_init(result: dict) -> str: """Compact AI-optimized init output""" lines = ["INIT"] continuity = result.get("continuity_check", {}) if continuity: lines.append(f"CONTINUITY gap_hours={continuity.get('time_gap_hours', 0):.1f} new_session={continuity.get('is_new_session', False)}") intentions = result.get("active_intentions", []) if intentions: lines.append(f"INTENTIONS count={len(intentions)}") for intention in intentions: desc = intention.get('description', '').encode('ascii', 'replace').decode('ascii') lines.append(f"INTENTION priority={intention.get('priority', 0):.2f} status={intention.get('status', 'unknown')} description={desc}") urgent = result.get("urgent_items", []) if urgent: lines.append(f"URGENT count={len(urgent)}") for item in urgent: desc = item.get('description', '').encode('ascii', 'replace').decode('ascii') lines.append(f"URGENT_ITEM type={item.get('type', 'unknown')} description={desc}") context = result.get("context_summary", {}) if context: recent = context.get("recent_memories", []) if recent: lines.append(f"RECENT_MEMORIES count={len(recent)}") for mem in recent: content = mem.get('content', '').replace('\n', ' ').encode('ascii', 'replace').decode('ascii') lines.append(f"RECENT category={mem.get('category', 'unknown')} content={content}") return '\n'.join(lines) def format_compact_get(result: dict) -> str: """Compact AI-optimized get output""" lines = [f"MEMORY id={result['memory_id']}"] lines.append(f"META category={result['category']} importance={result['importance']} tags={','.join(result.get('tags', []))} versions={result['version_count']} accessed={result['access_count']}") lines.append(f"TIMESTAMPS created={result['created_at']} updated={result['updated_at']} last_accessed={result.get('last_accessed', 'never')}") content = result['content'].replace('\n', ' ') lines.append(f"CONTENT {content}") return '\n'.join(lines) async def main(): parser = argparse.ArgumentParser( description="Interactive Memory CLI for BuildAutomata Memory System", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: Store a memory: python interactive_memory.py store "User prefers dark mode" --category user_preference --importance 0.8 --tags "ui,settings" Search memories: python interactive_memory.py search "dark mode" --limit 5 Update a memory: python interactive_memory.py update abc123 --content "New content" --importance 0.9 Get timeline: python interactive_memory.py timeline --query "settings" --limit 10 Get statistics: python interactive_memory.py stats Prune memories: python interactive_memory.py prune --max 1000 --dry-run Run maintenance: python interactive_memory.py maintenance """ ) parser.add_argument("--username", help="Username (overrides BA_USERNAME env var)") parser.add_argument("--agent", help="Agent name (overrides BA_AGENT_NAME env var)") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--pretty", action="store_true", help="Pretty print JSON output") parser.add_argument("--human", action="store_true", help="Human-readable verbose output (default: AI-optimized compact)") subparsers = parser.add_subparsers(dest="command", help="Command to execute") # Init command subparsers.add_parser("init", help="Initialize agent (load context, intentions, continuity)") # Store command store_parser = subparsers.add_parser("store", help="Store a new memory") store_parser.add_argument("content", help="Memory content") store_parser.add_argument("--category", default="general", help="Category (default: general)") store_parser.add_argument("--importance", type=float, default=0.5, help="Importance 0.0-1.0 (default: 0.5)") store_parser.add_argument("--tags", help="Comma-separated tags") store_parser.add_argument("--metadata", help="JSON metadata") store_parser.add_argument("--memory-type", default="episodic", choices=["episodic", "semantic", "working"], help="Memory type (default: episodic)") store_parser.add_argument("--session-id", help="Session ID for grouping memories") store_parser.add_argument("--task-context", help="Task context description") # Search command search_parser = subparsers.add_parser("search", help="Search memories") search_parser.add_argument("query", help="Search query") search_parser.add_argument("--limit", type=int, default=5, help="Max results (default: 5)") search_parser.add_argument("--category", help="Filter by category") search_parser.add_argument("--min-importance", type=float, default=0.0, help="Min importance (default: 0.0)") search_parser.add_argument("--created-after", help="Filter created after (ISO format)") search_parser.add_argument("--created-before", help="Filter created before (ISO format)") search_parser.add_argument("--updated-after", help="Filter updated after (ISO format)") search_parser.add_argument("--updated-before", help="Filter updated before (ISO format)") search_parser.add_argument("--memory-type", choices=["episodic", "semantic", "working"], help="Filter by memory type") search_parser.add_argument("--session-id", help="Filter by session ID") # Update command update_parser = subparsers.add_parser("update", help="Update a memory") update_parser.add_argument("memory_id", help="Memory ID to update") update_parser.add_argument("--content", help="New content") update_parser.add_argument("--category", help="New category") update_parser.add_argument("--importance", type=float, help="New importance") update_parser.add_argument("--tags", help="New tags (comma-separated)") update_parser.add_argument("--metadata", help="New metadata (JSON)") # Timeline command timeline_parser = subparsers.add_parser("timeline", help="Get comprehensive memory timeline") timeline_parser.add_argument("--query", help="Search query to find memories") timeline_parser.add_argument("--memory-id", help="Specific memory ID") timeline_parser.add_argument("--limit", type=int, default=20, help="Max memories (default: 20)") timeline_parser.add_argument("--start-date", help="Filter events after this date (ISO format)") timeline_parser.add_argument("--end-date", help="Filter events before this date (ISO format)") timeline_parser.add_argument("--show-all", action="store_true", help="Show ALL memories chronologically") timeline_parser.add_argument("--no-diffs", action="store_true", help="Exclude text diffs") timeline_parser.add_argument("--no-patterns", action="store_true", help="Exclude pattern analysis") timeline_parser.add_argument("--with-semantic-relations", action="store_true", help="Enable semantic relations (expensive: ~2s per memory, shows memory network)") # Stats command subparsers.add_parser("stats", help="Get memory statistics") # Get by ID command get_parser = subparsers.add_parser("get", help="Get a specific memory by ID") get_parser.add_argument("memory_id", help="Memory ID (UUID)") # List categories command categories_parser = subparsers.add_parser("categories", help="List all categories with counts") categories_parser.add_argument("--min-count", type=int, default=1, help="Minimum memory count (default: 1)") # List tags command tags_parser = subparsers.add_parser("tags", help="List all tags with usage counts") tags_parser.add_argument("--min-count", type=int, default=1, help="Minimum usage count (default: 1)") # Prune command prune_parser = subparsers.add_parser("prune", help="Prune old memories") prune_parser.add_argument("--max", type=int, help="Max memories to keep") prune_parser.add_argument("--dry-run", action="store_true", help="Show what would be pruned") # Maintenance command subparsers.add_parser("maintenance", help="Run maintenance") # Access patterns accessed_parser = subparsers.add_parser("accessed", help="Get most accessed memories with tag cloud") accessed_parser.add_argument("--limit", type=int, default=20, help="Number of memories to show (default: 20)") least_accessed_parser = subparsers.add_parser("least-accessed", help="Get least accessed memories - dead weight and buried treasure") least_accessed_parser.add_argument("--limit", type=int, default=20, help="Number of memories to show (default: 20)") least_accessed_parser.add_argument("--min-age-days", type=int, default=7, help="Minimum age in days (default: 7)") # Intention commands intention_store_parser = subparsers.add_parser("intention-store", help="Store a new intention") intention_store_parser.add_argument("description", help="Intention description") intention_store_parser.add_argument("--priority", type=float, default=0.5, help="Priority (0.0-1.0, default: 0.5)") intention_store_parser.add_argument("--deadline", help="Deadline (ISO format: YYYY-MM-DDTHH:MM:SS)") intention_store_parser.add_argument("--preconditions", help="Preconditions (comma-separated)") intention_store_parser.add_argument("--actions", help="Actions (comma-separated)") intention_store_parser.add_argument("--related-memories", help="Related memory IDs (comma-separated)") intention_store_parser.add_argument("--metadata", help="Metadata (JSON)") intention_list_parser = subparsers.add_parser("intention-list", help="List active intentions") intention_list_parser.add_argument("--limit", type=int, default=10, help="Max intentions (default: 10)") intention_list_parser.add_argument("--active-only", action="store_true", help="Show only active (exclude pending)") intention_update_parser = subparsers.add_parser("intention-update", help="Update intention status") intention_update_parser.add_argument("intention_id", help="Intention ID") intention_update_parser.add_argument("status", choices=["pending", "active", "completed", "cancelled"], help="New status") intention_update_parser.add_argument("--metadata", help="Metadata updates (JSON)") # Session memories command session_parser = subparsers.add_parser("get-session", help="Get memories from a work session or time period") session_parser.add_argument("--session-id", help="Session ID to retrieve") session_parser.add_argument("--start-date", help="Start date (ISO format)") session_parser.add_argument("--end-date", help="End date (ISO format)") session_parser.add_argument("--task-context", help="Task context filter") session_parser.add_argument("--limit", type=int, default=100, help="Max memories (default: 100)") # Consolidate memories command consolidate_parser = subparsers.add_parser("consolidate", help="Consolidate episodic memories into semantic memory") consolidate_parser.add_argument("memory_ids", help="Comma-separated memory IDs to consolidate") consolidate_parser.add_argument("--type", dest="consolidation_type", default="summarize", choices=["summarize", "synthesize", "compress"], help="Consolidation type (default: summarize)") consolidate_parser.add_argument("--target-length", type=int, default=500, help="Target length in characters (default: 500)") consolidate_parser.add_argument("--new-type", default="semantic", choices=["episodic", "semantic", "working"], help="New memory type (default: semantic)") # Graph exploration commands traverse_parser = subparsers.add_parser("traverse-graph", help="Traverse memory graph N hops from starting node") traverse_parser.add_argument("start_memory_id", help="Memory ID to start traversal from") traverse_parser.add_argument("--depth", type=int, default=2, help="Traversal depth in hops (1-5, default: 2)") traverse_parser.add_argument("--max-nodes", type=int, default=50, help="Maximum nodes to return (default: 50)") traverse_parser.add_argument("--min-importance", type=float, default=0.0, help="Minimum importance filter (default: 0.0)") traverse_parser.add_argument("--category", help="Filter by category") clusters_parser = subparsers.add_parser("find-clusters", help="Find densely connected regions in memory graph") clusters_parser.add_argument("--min-size", type=int, default=3, help="Minimum cluster size (default: 3)") clusters_parser.add_argument("--min-importance", type=float, default=0.5, help="Minimum importance (default: 0.5)") clusters_parser.add_argument("--limit", type=int, default=10, help="Maximum clusters to return (default: 10)") graph_stats_parser = subparsers.add_parser("graph-stats", help="Get graph connectivity statistics") graph_stats_parser.add_argument("--category", help="Filter to specific category") graph_stats_parser.add_argument("--min-importance", type=float, default=0.0, help="Minimum importance (default: 0.0)") args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) # Determine if lazy loading is appropriate for this command # Commands that don't need vector search can use lazy loading lazy_commands = {"stats", "get", "categories", "tags", "prune", "maintenance", "accessed", "least-accessed", "intention-store", "intention-list", "intention-update", "get-session", "consolidate"} use_lazy_load = args.command in lazy_commands # Initialize CLI cli = MemoryCLI(username=args.username, agent_name=args.agent, lazy_load=use_lazy_load) try: result = None if args.command == "init": result = await cli.init() elif args.command == "store": result = await cli.store( content=args.content, category=args.category, importance=args.importance, tags=parse_tags(args.tags), metadata=parse_metadata(args.metadata), memory_type=args.memory_type, session_id=args.session_id, task_context=args.task_context ) elif args.command == "search": result = await cli.search( query=args.query, limit=args.limit, category=args.category, min_importance=args.min_importance, created_after=args.created_after, created_before=args.created_before, updated_after=args.updated_after, updated_before=args.updated_before, memory_type=args.memory_type, session_id=args.session_id ) elif args.command == "update": result = await cli.update( memory_id=args.memory_id, content=args.content, category=args.category, importance=args.importance, tags=parse_tags(args.tags) if args.tags else None, metadata=parse_metadata(args.metadata) if args.metadata else None ) elif args.command == "timeline": result = await cli.timeline( query=args.query, memory_id=args.memory_id, limit=args.limit, start_date=args.start_date, end_date=args.end_date, show_all_memories=args.show_all, include_diffs=not args.no_diffs, include_patterns=not args.no_patterns, include_semantic_relations=args.with_semantic_relations # Default: False for performance ) elif args.command == "stats": result = await cli.stats() elif args.command == "get": result = await cli.get_by_id(memory_id=args.memory_id) elif args.command == "categories": result = await cli.list_categories(min_count=args.min_count) elif args.command == "tags": result = await cli.list_tags(min_count=args.min_count) elif args.command == "prune": result = await cli.prune( max_memories=args.max, dry_run=args.dry_run ) elif args.command == "maintenance": result = await cli.maintenance() elif args.command == "accessed": result = await cli.get_most_accessed(limit=args.limit) elif args.command == "least-accessed": result = await cli.get_least_accessed(limit=args.limit, min_age_days=args.min_age_days) elif args.command == "intention-store": result = await cli.store_intention( description=args.description, priority=args.priority, deadline=args.deadline, preconditions=parse_tags(args.preconditions) if args.preconditions else None, actions=parse_tags(args.actions) if args.actions else None, related_memories=parse_tags(args.related_memories) if args.related_memories else None, metadata=parse_metadata(args.metadata) if args.metadata else None ) elif args.command == "intention-list": result = await cli.get_intentions( limit=args.limit, include_pending=not args.active_only ) elif args.command == "intention-update": result = await cli.update_intention_status( intention_id=args.intention_id, status=args.status, metadata_updates=parse_metadata(args.metadata) if args.metadata else None ) elif args.command == "get-session": result = await cli.get_session_memories( session_id=args.session_id, start_date=args.start_date, end_date=args.end_date, task_context=args.task_context, limit=args.limit ) elif args.command == "consolidate": memory_ids = [id.strip() for id in args.memory_ids.split(",")] result = await cli.consolidate_memories( memory_ids=memory_ids, consolidation_type=args.consolidation_type, target_length=args.target_length, new_memory_type=args.new_type ) elif args.command == "traverse-graph": result = await cli.traverse_graph( start_memory_id=args.start_memory_id, depth=args.depth, max_nodes=args.max_nodes, min_importance=args.min_importance, category_filter=args.category ) elif args.command == "find-clusters": result = await cli.find_clusters( min_cluster_size=args.min_size, min_importance=args.min_importance, limit=args.limit ) elif args.command == "graph-stats": result = await cli.graph_stats( category=args.category, min_importance=args.min_importance ) # Output result if args.json or args.pretty: # JSON output indent = 2 if args.pretty else None print(json.dumps(result, indent=indent, default=str)) elif args.human: # Human-readable verbose output if args.command == "init": print("=" * 80) print("AGENT INITIALIZATION") print("=" * 80) print() # Continuity check continuity = result.get("continuity_check", {}) if continuity: print("SESSION CONTINUITY:") print(f" Last activity: {continuity.get('last_activity', 'Never')}") print(f" Time gap: {continuity.get('time_gap_hours', 0):.1f} hours") print(f" New session: {continuity.get('is_new_session', False)}") print() # Active intentions intentions = result.get("active_intentions", []) if intentions: print(f"ACTIVE INTENTIONS ({len(intentions)}):") for i, intention in enumerate(intentions, 1): status_emoji = {"active": "*", "pending": "-", "completed": "✓", "cancelled": "X"}.get(intention.get('status'), '?') print(f" {i}. [{intention.get('priority', 0):.2f}] {status_emoji} {intention.get('description', 'No description')}") print() else: print("ACTIVE INTENTIONS: None") print() # Urgent items urgent = result.get("urgent_items", []) if urgent: print(f"URGENT ITEMS ({len(urgent)}):") for item in urgent: print(f" [!] {item.get('type', 'unknown')}: {item.get('description', '')}") print() # Context summary context = result.get("context_summary", {}) if context: print("CONTEXT SUMMARY:") print(f" Active intentions: {context.get('active_intention_count', 0)}") print(f" Urgent items: {context.get('urgent_count', 0)}") recent = context.get("recent_memories", []) if recent: print(f" Recent memories: {len(recent)}") for mem in recent[:3]: content = mem.get('content', '').encode('ascii', 'replace').decode('ascii') print(f" - [{mem.get('category', 'unknown')}] {content}") print() # Performance scan_time = result.get("scan_duration_ms", 0) print(f"Scan completed in {scan_time:.1f}ms") print("=" * 80) print() print("Store observations as they happen, not just when prompted.") print("What to store: preferences, decisions, corrections, insights, technical details.") print("Default bias: store it. Memories are cheap, context loss is expensive.") elif args.command == "store": print(f"[OK] Memory stored successfully") print(f" ID: {result['memory_id']}") content = result['content'].encode('ascii', 'replace').decode('ascii') print(f" Content: {content}") if result.get('warnings'): print(f" Warnings: {', '.join(result['warnings'])}") # Display similar memories if found if result.get('similar_memories'): print() print(f"[SIMILAR MEMORIES - You've thought about this before:]") for i, sim in enumerate(result['similar_memories'], 1): print(f"\n {i}. ID: {sim['id']}") print(f" Category: {sim['category']}") print(f" Created: {sim['created']}") preview = sim['content_preview'].encode('ascii', 'replace').decode('ascii') print(f" Preview: {preview}") elif args.command == "search": print(f"Found {result['count']} memories for query: '{result['query']}'") print() for i, mem in enumerate(result['results'], 1): mem_id = mem.get('memory_id', mem.get('id', 'unknown')) category = mem.get('category', 'general') importance = mem.get('importance', 0.0) current_imp = mem.get('current_importance', importance) content = mem.get('content', '') tags = mem.get('tags', []) created_at = mem.get('created_at', '') version_count = mem.get('version_count', 1) access_count = mem.get('access_count', 0) related_memories = mem.get('related_memories', []) print(f"{i}. [{mem_id}] {category} (importance: {importance:.2f}, current: {current_imp:.2f})") content_safe = content.encode('ascii', 'replace').decode('ascii') print(f" {content_safe}") print(f" Tags: {', '.join(tags) if tags else 'none'}") print(f" Created: {created_at}, Versions: {version_count}, Accessed: {access_count}x") if related_memories: print(f" Related ({len(related_memories)} memories):") for j, rel in enumerate(related_memories[:3], 1): # Show first 3 rel_preview = rel.get('content_preview', '')[:80] rel_cat = rel.get('category', 'unknown') rel_imp = rel.get('importance', 0.0) print(f" {j}. [{rel_cat}] {rel_preview} (imp: {rel_imp:.2f})") # Show version history summary if available if mem.get('version_history'): vh = mem['version_history'] print(f" Version History: {vh['total_versions']} versions, last updated {vh['last_updated']}") print() elif args.command == "update": print(f"[OK] Memory updated successfully") print(f" ID: {result['memory_id']}") print(f" Changes: {', '.join(result['changes'])}") if result.get('warnings'): print(f" Warnings: {', '.join(result['warnings'])}") elif args.command == "timeline": if result.get("error"): print(f"Error: {result['error']}") else: print("=" * 80) print("MEMORY TIMELINE - Biographical Narrative") print("=" * 80) print() # Narrative summary if result.get("narrative_arc"): print("NARRATIVE SUMMARY:") print("-" * 80) print(result["narrative_arc"]) print() # Statistics print(f"Total Events: {result.get('total_events', 0)}") print(f"Memories Tracked: {result.get('memories_tracked', 0)}") print() # Temporal patterns patterns = result.get("temporal_patterns", {}) if patterns: print("TEMPORAL PATTERNS:") print(f" Duration: {patterns.get('total_duration_days', 0)} days") print(f" Avg Events/Day: {patterns.get('avg_events_per_day', 0)}") bursts = patterns.get("bursts", []) if bursts: print(f"\n BURSTS ({len(bursts)} detected):") for i, burst in enumerate(bursts, 1): print(f" {i}. {burst['event_count']} events in {burst['duration_hours']}h") gaps = patterns.get("gaps", []) if gaps: print(f"\n GAPS ({len(gaps)} detected):") for i, gap in enumerate(gaps, 1): print(f" {i}. {gap['duration_days']} days of silence") print() print("=" * 80) print(f"EVENTS (showing {len(result.get('events', []))} events)") print("=" * 80) # Show abbreviated event list for i, event in enumerate(result.get("events", [])[:20], 1): # Limit to first 20 # Handle datetime objects from SQLite converter timestamp = event['timestamp'] if hasattr(timestamp, 'isoformat'): timestamp_str = timestamp.isoformat()[:19] else: timestamp_str = str(timestamp)[:19] print(f"[{i}] v{event['version']} - {timestamp_str}") print(f" {event['category']} | Importance: {event['importance']}") print(f" {event['content']}") if event.get('diff'): print(f" Changed: {event['diff']['change_magnitude']:.1%}") print() if len(result.get('events', [])) > 20: print(f"... and {len(result['events']) - 20} more events") print() elif args.command == "get": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print(f"MEMORY: {result['memory_id']}") print("=" * 80) print(f"Category: {result['category']}") print(f"Importance: {result['importance']}") print(f"Tags: {', '.join(result['tags']) if result['tags'] else 'none'}") print(f"Created: {result['created_at']}") print(f"Updated: {result['updated_at']}") print(f"Last Accessed: {result['last_accessed']}") print(f"Access Count: {result['access_count']}") print(f"Versions: {result['version_count']}") if "current_version" in result: print(f"Current Version: {result['current_version']}") print(f"Last Change: {result['last_change_type']}") if result.get('last_change_description'): print(f"Change Description: {result['last_change_description']}") print("\nCONTENT:") print("-" * 80) content_safe = result['content'].encode('ascii', 'replace').decode('ascii') print(content_safe) if result.get('metadata'): print("\nMETADATA:") print("-" * 80) print(json.dumps(result['metadata'], indent=2)) elif args.command == "categories": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print("MEMORY CATEGORIES") print("=" * 80) print(f"Total Categories: {result['total_categories']}") print(f"Total Memories: {result['total_memories']}") print(f"Min Count Filter: {result['min_count_filter']}") print("\nCATEGORIES (sorted by count):") print("-" * 80) for category, count in result['categories'].items(): print(f"{category:40} {count:>5} memories") elif args.command == "tags": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print("MEMORY TAGS") print("=" * 80) print(f"Total Unique Tags: {result['total_unique_tags']}") print(f"Total Tag Usages: {result['total_tag_usages']}") print(f"Min Count Filter: {result['min_count_filter']}") print("\nTAGS (sorted by usage count):") print("-" * 80) for tag, count in result['tags'].items(): print(f"{tag:40} {count:>5} uses") elif args.command == "stats": print("Memory System Statistics:") print("=" * 60) if result.get('version'): print(f"Version: {result['version']}") if result.get('fixes'): print(f"Fixes: {', '.join(result['fixes'])}") print("-" * 60) print(f"Total memories: {result.get('total_memories', 0)}") print(f"Total versions: {result.get('total_versions', 0)}") print(f"Cache size: {result.get('cache_size', 0)}") print(f"Qdrant available: {result.get('qdrant_available', False)}") print(f"Embeddings available: {result.get('embeddings_available', False)}") if result.get('categories'): print(f"\nCategories: {json.dumps(result['categories'], indent=2)}") if result.get('last_maintenance'): print(f"\nLast maintenance: {result['last_maintenance']}") elif args.command == "prune": print("Pruning Results:") print("=" * 60) print(f"Dry run: {result.get('dry_run', False)}") print(f"Deleted count: {result.get('deleted_count', 0)}") print(f"Retained count: {result.get('retained_count', 0)}") if result.get('deleted_ids'): print(f"Deleted IDs: {', '.join(result['deleted_ids'])}") elif args.command == "maintenance": print("Maintenance Results:") print("=" * 60) print(json.dumps(result, indent=2, default=str)) elif args.command == "accessed": print("=" * 80) print("MOST ACCESSED MEMORIES - Behavioral Truth") print("=" * 80) print() print(result.get("interpretation", "")) print() memories = result.get("memories", []) print(f"TOP {len(memories)} MEMORIES BY ACCESS COUNT:") print("=" * 80) for i, mem in enumerate(memories, 1): print(f"\n[{i}] Access Count: {mem['access_count']} | Importance: {mem['importance']}") print(f" Category: {mem['category']}") print(f" Last Accessed: {mem['last_accessed']}") print(f" Content: {mem['content_preview']}") print() print("=" * 80) print("TAG CLOUD (Top 30 from most accessed memories):") print("=" * 80) tag_cloud = result.get("tag_cloud", []) if tag_cloud: # Display in columns for i in range(0, len(tag_cloud), 3): row = tag_cloud[i:i+3] print(" " + " | ".join(f"{t['tag']} ({t['count']})" for t in row)) else: print(" No tags found") print() elif args.command == "least-accessed": print("=" * 80) print("LEAST ACCESSED MEMORIES - Dead Weight & Buried Treasure") print("=" * 80) print() print(result.get("interpretation", "")) print() print(f"Minimum age filter: {result.get('min_age_days')} days") print(f"Zero access count: {result.get('zero_access_count')} memories") print() memories = result.get("memories", []) print(f"BOTTOM {len(memories)} MEMORIES BY ACCESS COUNT:") print("=" * 80) for i, mem in enumerate(memories, 1): print(f"\n[{i}] Access Count: {mem['access_count']} | Importance: {mem['importance']} | Age: {mem['age_days']} days") print(f" Category: {mem['category']}") print(f" Created: {mem['created_at']}") print(f" Last Accessed: {mem['last_accessed'] or 'Never'}") print(f" Content: {mem['content_preview']}") print() print("=" * 80) print("TAG CLOUD (Top 30 from least accessed memories):") print("=" * 80) tag_cloud = result.get("tag_cloud", []) if tag_cloud: # Display in columns for i in range(0, len(tag_cloud), 3): row = tag_cloud[i:i+3] print(" " + " | ".join(f"{t['tag']} ({t['count']})" for t in row)) else: print(" No tags found") print() elif args.command == "intention-store": if "error" in result: print(f"[ERROR] {result['error']}") else: print("[OK] Intention stored successfully") print(f" ID: {result.get('intention_id')}") print(f" Priority: {result.get('priority')}") print(f" Status: {result.get('status')}") elif args.command == "intention-list": intentions = result.get("intentions", []) print("=" * 80) print(f"INTENTIONS ({result.get('count', 0)})") print("=" * 80) print() if not intentions: print("No intentions found") else: for i, intention in enumerate(intentions, 1): status_emoji = {"active": "*", "pending": "-", "completed": "✓", "cancelled": "X"}.get(intention.get('status'), '?') print(f"{i}. [{intention.get('priority', 0):.2f}] {status_emoji} {intention.get('description')}") print(f" ID: {intention.get('id')}") print(f" Status: {intention.get('status')}") if intention.get('deadline'): print(f" Deadline: {intention.get('deadline')}") print(f" Created: {intention.get('created_at')}") if intention.get('preconditions'): print(f" Preconditions: {', '.join(intention['preconditions'])}") if intention.get('actions'): print(f" Actions: {', '.join(intention['actions'])}") print() elif args.command == "intention-update": if "error" in result: print(f"[ERROR] {result['error']}") else: print("[OK] Intention updated successfully") print(f" ID: {result.get('intention_id')}") print(f" New Status: {result.get('status')}") elif args.command == "get-session": print("=" * 80) print("SESSION MEMORIES") print("=" * 80) if result.get('session_id'): print(f"Session ID: {result['session_id']}") if result.get('date_range'): print(f"Date Range: {result['date_range'][0]} to {result['date_range'][1]}") if result.get('task_context'): print(f"Task Context: {result['task_context']}") print(f"Total Memories: {result['count']}") print() for i, mem in enumerate(result.get('memories', []), 1): mem_id = mem.get('memory_id', mem.get('id', 'unknown')) print(f"[{i}] {mem_id}") print(f" Type: {mem.get('memory_type', 'episodic')} | Category: {mem.get('category', 'general')}") print(f" Importance: {mem.get('importance', 0.0):.2f}") print(f" Created: {mem.get('created_at', 'unknown')}") content = mem.get('content', '').encode('ascii', 'replace').decode('ascii') print(f" Content: {content}") if mem.get('task_context'): print(f" Task: {mem['task_context']}") print() elif args.command == "consolidate": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print("MEMORY CONSOLIDATION RESULT") print("=" * 80) print(f"New Memory ID: {result['new_memory_id']}") print(f"Consolidation Type: {result['consolidation_type']}") print(f"Source Memories: {len(result['source_memory_ids'])}") print(f"New Memory Type: {result['new_memory_type']}") print() print("CONSOLIDATED CONTENT:") print("-" * 80) content = result['content'].encode('ascii', 'replace').decode('ascii') print(content) print() if result.get('provenance'): print("PROVENANCE:") print(json.dumps(result['provenance'], indent=2)) elif args.command == "traverse-graph": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print(f"GRAPH TRAVERSAL FROM: {result['start_node']}") print("=" * 80) print(f"Depth: {result['depth']} hops") print(f"Nodes: {result['node_count']}, Edges: {result['edge_count']}") if result.get('truncated'): print("⚠ Result truncated at max_nodes limit") print() # Group by depth by_depth = {} for node in result['nodes']: depth = node['depth'] by_depth.setdefault(depth, []).append(node) for depth in sorted(by_depth.keys()): print(f"DEPTH {depth}: {len(by_depth[depth])} nodes") for node in by_depth[depth][:5]: print(f" [{node['category']}] {node['content_preview']}") print(f" ID: {node['id']} | importance: {node['importance']}") if len(by_depth[depth]) > 5: print(f" ... and {len(by_depth[depth]) - 5} more") print() elif args.command == "find-clusters": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print("MEMORY CLUSTERS") print("=" * 80) print(f"Analyzed: {result['total_memories_analyzed']} memories") print(f"Found: {result['clusters_found']} clusters") print() for i, cluster in enumerate(result['clusters'], 1): print(f"CLUSTER {i}: {cluster['size']} memories") print(f" Category: {cluster['dominant_category']}") print(f" Avg Importance: {cluster['avg_importance']}") print(f" Common Tags: {', '.join(cluster['common_tags'][:5])}") print(f" Sample memories:") for mem in cluster['sample_memories'][:3]: print(f" • {mem['content_preview']}") print() elif args.command == "graph-stats": if "error" in result: print(f"[ERROR] {result['error']}") else: print("=" * 80) print("MEMORY GRAPH STATISTICS") print("=" * 80) print(f"Total Memories: {result['total_memories']}") print(f"Total Connections: {result['total_connections']}") print(f"Avg Connections: {result['avg_connections_per_memory']}") print(f"Isolated Nodes: {result['isolated_nodes_count']}") print() print("CONNECTIVITY DISTRIBUTION:") dist = result['connectivity_distribution'] print(f" No connections: {dist['no_connections']}") print(f" 1-3 connections: {dist['1-3_connections']}") print(f" 4-10 connections: {dist['4-10_connections']}") print(f" 10+ connections: {dist['10+_connections']}") print() print("MOST CONNECTED HUBS:") for hub in result['most_connected_hubs'][:5]: print(f" {hub['connections']} connections - [{hub['category']}] {hub['content_preview'][:60]}") else: # Compact AI-optimized output (default) if args.command == "init": print(format_compact_init(result)) elif args.command == "search": print(format_compact_search(result)) elif args.command == "get": print(format_compact_get(result)) elif args.command == "store": print(f"STORED id={result['memory_id']}") if result.get('similar_memories'): print(f"SIMILAR count={len(result['similar_memories'])}") for sim in result['similar_memories']: print(f"SIMILAR_MEM id={sim['id']} category={sim['category']}") elif args.command == "update": print(f"UPDATED id={result['memory_id']} changes={','.join(result.get('changes', []))}") elif args.command == "stats": version_str = f" version={result['version']}" if result.get('version') else "" fixes_str = f" fixes={','.join(result['fixes'])}" if result.get('fixes') else "" print(f"STATS{version_str}{fixes_str} memories={result.get('total_memories', 0)} categories={result.get('total_categories', 0)} tags={result.get('total_unique_tags', 0)}") elif args.command == "get-session": print(f"SESSION count={result['count']} session_id={result.get('session_id', 'all')}") for mem in result.get('memories', []): mem_id = mem.get('memory_id', mem.get('id', 'unknown')) print(f"MEMORY id={mem_id} type={mem.get('memory_type', 'episodic')} category={mem.get('category', 'general')}") elif args.command == "consolidate": if "error" in result: print(f"ERROR {result['error']}") else: print(f"CONSOLIDATED new_id={result['new_memory_id']} source_count={len(result['source_memory_ids'])} type={result['consolidation_type']}") elif args.command == "traverse-graph": if "error" in result: print(f"ERROR {result['error']}") else: print(f"GRAPH_TRAVERSE start={result['start_node']} depth={result['depth']} nodes={result['node_count']} edges={result['edge_count']} truncated={result.get('truncated', False)}") for node in result['nodes']: print(f"NODE id={node['id']} depth={node['depth']} category={node['category']} importance={node['importance']}") elif args.command == "find-clusters": if "error" in result: print(f"ERROR {result['error']}") else: print(f"CLUSTERS total={result['total_memories_analyzed']} found={result['clusters_found']}") for cluster in result['clusters']: print(f"CLUSTER size={cluster['size']} category={cluster['dominant_category']} avg_importance={cluster['avg_importance']} tags={','.join(cluster['common_tags'][:3])}") elif args.command == "graph-stats": if "error" in result: print(f"ERROR {result['error']}") else: print(f"GRAPH_STATS memories={result['total_memories']} connections={result['total_connections']} avg={result['avg_connections_per_memory']} isolated={result['isolated_nodes_count']}") dist = result['connectivity_distribution'] print(f"DISTRIBUTION none={dist['no_connections']} low={dist['1-3_connections']} med={dist['4-10_connections']} high={dist['10+_connections']}") for hub in result['most_connected_hubs'][:5]: print(f"HUB id={hub['id']} connections={hub['connections']} category={hub['category']}") else: # Fallback for other commands - use JSON print(json.dumps(result, default=str)) finally: await cli.shutdown() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brucepro/buildautomata_memory_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server