#!/usr/bin/env python3
"""
MCP Server for BuildAutomata Memory System
Copyright 2025 Jurden Bruce
"""
import sys
import os
import asyncio
import json
import logging
import sqlite3
import hashlib
import traceback
import threading
import random
from typing import Any, List, Dict, Optional, Tuple, Union
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, asdict
import uuid
from collections import OrderedDict
import difflib
import re
# Redirect stdout before imports
_original_stdout_fd = os.dup(1)
os.dup2(2, 1)
logging.basicConfig(
level=logging.INFO,
stream=sys.stderr,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
)
logger = logging.getLogger("buildautomata-memory")
for logger_name in ["qdrant_client", "sentence_transformers", "urllib3", "httpx"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Prompt, Resource, PromptMessage, GetPromptResult
# Import and register datetime adapters
# Handle both package import and direct execution
try:
from .utils import register_sqlite_adapters
from .models import Memory, Intention, MemoryRelationship
from .cache import LRUCache
from .storage.embeddings import EmbeddingGenerator
from .storage.qdrant_store import QdrantStore
from .storage.sqlite_store import SQLiteStore
from .graph_ops import GraphOperations
from .timeline import TimelineAnalysis
from .intentions import IntentionManager
from .mcp_tools import get_tool_definitions, handle_tool_call
except ImportError:
# Direct execution fallback
from utils import register_sqlite_adapters
from models import Memory, Intention, MemoryRelationship
from cache import LRUCache
from storage.embeddings import EmbeddingGenerator
from storage.qdrant_store import QdrantStore
from storage.sqlite_store import SQLiteStore
from graph_ops import GraphOperations
from timeline import TimelineAnalysis
from intentions import IntentionManager
from mcp_tools import get_tool_definitions, handle_tool_call
register_sqlite_adapters()
# Check availability without importing heavy libraries
try:
import importlib.util
QDRANT_AVAILABLE = importlib.util.find_spec("qdrant_client") is not None
EMBEDDINGS_AVAILABLE = importlib.util.find_spec("sentence_transformers") is not None
except Exception:
QDRANT_AVAILABLE = False
EMBEDDINGS_AVAILABLE = False
if not QDRANT_AVAILABLE:
logger.warning("Qdrant not available - semantic search disabled")
if not EMBEDDINGS_AVAILABLE:
logger.warning("SentenceTransformers not available - using fallback embeddings")
class MemoryStore:
def __init__(self, username: str, agent_name: str, lazy_load: bool = False):
self.username = username
self.agent_name = agent_name
self.collection_name = f"{username}_{agent_name}_memories"
self.lazy_load = lazy_load
script_dir = Path(__file__).parent
self.base_path = script_dir / "memory_repos" / f"{username}_{agent_name}"
self.db_path = self.base_path / "memoryv012.db"
# EMBEDDED MODE: Qdrant data stored alongside SQLite database
self.qdrant_path = str(self.base_path / "qdrant_data")
self.config = {
"qdrant_path": self.qdrant_path,
"vector_size": 768,
"max_memories": int(os.getenv("MAX_MEMORIES", 100000)),
"cache_maxsize": int(os.getenv("CACHE_MAXSIZE", 1000)),
"maintenance_interval_hours": int(os.getenv("MAINTENANCE_INTERVAL_HOURS", 24)),
"qdrant_max_retries": int(os.getenv("QDRANT_MAX_RETRIES", 3)),
}
self.qdrant_client = None
self.encoder = None
self.db_conn = None
self._qdrant_initialized = False
self._encoder_initialized = False
self._db_lock = threading.RLock()
self.memory_cache: LRUCache = LRUCache(maxsize=self.config["cache_maxsize"])
self.embedding_cache: LRUCache = LRUCache(maxsize=self.config["cache_maxsize"])
self.error_log: List[Dict[str, Any]] = []
self.last_maintenance: Optional[datetime] = None
# Initialize embedding generator (manages encoder, caching, and fallback)
self.embedding_gen = EmbeddingGenerator(
config=self.config,
embedding_cache=self.embedding_cache,
error_log=self.error_log,
lazy_load=lazy_load
)
# Initialize Qdrant store (manages vector operations)
self.qdrant_store = QdrantStore(
config=self.config,
collection_name=self.collection_name,
error_log=self.error_log,
lazy_load=lazy_load
)
# Maintain qdrant_client reference for internal methods
self.qdrant_client = self.qdrant_store.client
# Initialize SQLite store (placeholder - will be initialized in initialize())
self.sqlite_store = None
self.graph_ops = None
self.timeline = None
self.intention_mgr = None
# Initialize core (creates directories, db connection, calls module initializers)
self.initialize()
def initialize(self):
"""Initialize all backends with proper error handling"""
import time
init_start = time.perf_counter()
try:
step_start = time.perf_counter()
self._init_directories()
logger.info(f"[TIMING] Directories initialized in {(time.perf_counter() - step_start)*1000:.2f}ms")
step_start = time.perf_counter()
self._init_sqlite()
logger.info(f"[TIMING] SQLite connection created in {(time.perf_counter() - step_start)*1000:.2f}ms")
# Initialize SQLiteStore and delegate schema creation
step_start = time.perf_counter()
self.sqlite_store = SQLiteStore(
db_path=self.db_path,
db_conn=self.db_conn,
db_lock=self._db_lock,
error_log=self.error_log
)
self.sqlite_store.initialize()
logger.info(f"[TIMING] SQLiteStore initialized in {(time.perf_counter() - step_start)*1000:.2f}ms")
# Initialize graph operations (requires get_memory_by_id method)
self.graph_ops = GraphOperations(
db_conn=self.db_conn,
db_lock=self._db_lock,
get_memory_by_id_func=self.get_memory_by_id
)
# Initialize timeline analysis
self.timeline = TimelineAnalysis(
db_conn=self.db_conn,
db_lock=self._db_lock
)
# Initialize intention manager
self.intention_mgr = IntentionManager(
db_conn=self.db_conn,
db_lock=self._db_lock,
error_log=self.error_log,
get_working_set_func=self.get_working_set
)
# Qdrant and encoder initialization now handled by their respective modules
if not self.lazy_load:
logger.info(f"[TIMING] Qdrant and Encoder initialized via storage modules (lazy_load={self.lazy_load})")
else:
logger.info("[LAZY] Qdrant and Encoder will be loaded on-demand via storage modules")
total_time = (time.perf_counter() - init_start) * 1000
logger.info(f"[TIMING] MemoryStore initialized in {total_time:.2f}ms total")
except Exception as e:
logger.error(f"Initialization failed: {e}")
self._log_error("initialization", e)
def _init_directories(self):
"""Create necessary directories"""
try:
self.base_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Directories initialized at {self.base_path}")
except Exception as e:
logger.error(f"Directory initialization failed: {e}")
raise
def _init_sqlite(self):
"""Create SQLite connection (schema creation delegated to SQLiteStore)"""
try:
self.db_conn = sqlite3.connect(
str(self.db_path),
check_same_thread=False,
timeout=30.0,
isolation_level="IMMEDIATE",
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
self.db_conn.row_factory = sqlite3.Row
logger.info("SQLite connection created successfully")
except Exception as e:
logger.error(f"SQLite connection failed: {e}")
self._log_error("sqlite_init", e)
self.db_conn = None
def _ensure_qdrant(self):
"""Ensure Qdrant is initialized (delegated to QdrantStore)"""
# Trigger lazy initialization if needed
if hasattr(self.qdrant_store, '_ensure_initialized'):
self.qdrant_store._ensure_initialized()
# Sync qdrant_client reference for internal use
self.qdrant_client = self.qdrant_store.client
def _log_error(self, operation: str, error: Exception):
"""Log detailed error information"""
error_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"error_type": type(error).__name__,
"error_msg": str(error),
"traceback": traceback.format_exc(),
}
self.error_log.append(error_entry)
if len(self.error_log) > 100:
self.error_log = self.error_log[-100:]
def generate_embedding(self, text: str) -> List[float]:
"""Generate embedding with caching - delegates to EmbeddingGenerator"""
result = self.embedding_gen.generate_embedding(text)
# Maintain encoder reference for internal use (after lazy init)
self.encoder = self.embedding_gen.encoder
return result
def _create_version(self, memory: Memory, change_type: str, change_description: str, prev_version_id: Optional[str] = None):
"""Create a version entry in memory_versions table with proper transaction handling"""
if not self.db_conn:
return None
try:
with self._db_lock:
self.db_conn.execute("BEGIN IMMEDIATE")
try:
# Get current version count
cursor = self.db_conn.execute(
"SELECT COALESCE(MAX(version_number), 0) + 1 FROM memory_versions WHERE memory_id = ?",
(memory.id,)
)
version_number = cursor.fetchone()[0]
version_id = str(uuid.uuid4())
self.db_conn.execute("""
INSERT INTO memory_versions
(version_id, memory_id, version_number, content, category, importance,
tags, metadata, change_type, change_description, created_at, content_hash, prev_version_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
version_id,
memory.id,
version_number,
memory.content,
memory.category,
memory.importance,
json.dumps(memory.tags),
json.dumps(memory.metadata),
change_type,
change_description,
datetime.now(),
memory.content_hash(),
prev_version_id
))
self.db_conn.commit()
return version_id
except Exception as e:
self.db_conn.rollback()
raise
except Exception as e:
logger.error(f"Version creation failed: {e}")
self._log_error("create_version", e)
return None
async def store_memory(self, memory: Memory, is_update: bool = False, old_hash: Optional[str] = None) -> Dict[str, Any]:
"""Store or update a memory with automatic versioning
Routes storage based on memory_type:
- "family": Routes to family_shared.db
- "episodic", "semantic", "working": Routes to personal DB
"""
import time
# FAMILY MEMORY ROUTING
if memory.memory_type == "family":
return await self._store_family_memory(memory)
# PERSONAL MEMORY (existing behavior)
success_backends = []
errors = []
skip_version = False
similar_memories = []
# Check for 100% duplicate before storing new memory
if not is_update and self.db_conn:
content_hash = memory.content_hash()
with self._db_lock:
cursor = self.db_conn.execute(
"SELECT id FROM memories WHERE content_hash = ?",
(content_hash,)
)
existing = cursor.fetchone()
if existing:
error_msg = f"Duplicate content rejected: matches existing memory {existing[0]}"
logger.warning(error_msg)
return {"success": False, "backends": [], "similar_memories": [], "error": error_msg}
# Find similar memories (semantic search) - for both new and updated memories
similarity_start = time.perf_counter()
try:
# Use existing search_memories with low limit for speed
search_results = await self.search_memories(
query=memory.content,
limit=9,
include_versions=False
)
logger.info(f"[AUTO-LINK] Search returned {len(search_results)} results for memory {memory.id}")
# Filter out exact matches and format results
for result in search_results:
logger.info(f"[AUTO-LINK] Comparing result {result['memory_id']} with memory {memory.id}")
if result['memory_id'] != memory.id: # Not self
similar_memories.append({
'id': result['memory_id'],
'content_preview': result['content'][:100] + '...' if len(result['content']) > 100 else result['content'],
'category': result.get('category', 'unknown'),
'created': result.get('created_at', 'unknown')
})
logger.info(f"[AUTO-LINK] Added {result['memory_id']} to similar_memories")
else:
logger.info(f"[AUTO-LINK] Filtered out self-reference")
similarity_time = (time.perf_counter() - similarity_start) * 1000
logger.info(f"[TIMING] Similarity search found {len(similar_memories)} similar memories in {similarity_time:.2f}ms")
if similar_memories:
# Auto-link: populate related_memories field with similar memory IDs
memory.related_memories = [m['id'] for m in similar_memories]
logger.info(f"Similar memories found and linked: {memory.related_memories}")
else:
logger.info(f"[AUTO-LINK] No similar memories after filtering (all were self-references)")
except Exception as e:
logger.warning(f"Similarity search failed (non-fatal): {e}")
if is_update and old_hash:
new_hash = memory.content_hash()
if old_hash == new_hash:
logger.info(f"Memory {memory.id} unchanged (hash match), skipping version creation")
skip_version = True
# SQLite with versioning
try:
result = await asyncio.to_thread(self._store_in_sqlite, memory, is_update, skip_version)
if result:
success_backends.append("SQLite")
else:
errors.append("SQLite store returned False")
except Exception as e:
logger.error(f"SQLite store failed: {e}")
self._log_error("store_sqlite", e)
errors.append(f"SQLite: {str(e)}")
# Qdrant with retry logic
try:
if await self._store_in_qdrant_with_retry(memory):
success_backends.append("Qdrant")
else:
errors.append("Qdrant store returned False")
except Exception as e:
logger.error(f"Qdrant store failed: {e}")
self._log_error("store_qdrant", e)
errors.append(f"Qdrant: {str(e)}")
# Update cache
self.memory_cache[memory.id] = memory
if errors:
logger.warning(f"Store completed with errors for {memory.id}: {errors}")
# Return success status, backends, and similar memories
result = {
'success': len(success_backends) > 0,
'backends': success_backends,
'similar_memories': similar_memories
}
return result
def _store_in_sqlite(self, memory: Memory, is_update: bool = False, skip_version: bool = False) -> bool:
"""Store in SQLite (delegated to SQLiteStore)"""
return self.sqlite_store.store_memory(memory, is_update, skip_version)
async def _store_in_qdrant_with_retry(self, memory: Memory, max_retries: int = None) -> bool:
if not self.qdrant_client:
return False
if max_retries is None:
max_retries = self.config["qdrant_max_retries"]
for attempt in range(max_retries):
try:
return await self._store_in_qdrant(memory)
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Qdrant store failed after {max_retries} attempts: {e}")
self._log_error("qdrant_store_retry", e)
return False
logger.warning(f"Qdrant store attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(2 ** attempt) # Exponential backoff
return False
async def _store_in_qdrant(self, memory: Memory) -> bool:
"""Store in Qdrant"""
self._ensure_qdrant()
if not self.qdrant_client:
return False
# Import when needed (after qdrant initialization)
from qdrant_client.models import PointStruct
try:
embedding = await asyncio.to_thread(self.generate_embedding, memory.content)
point = PointStruct(
id=memory.id, vector=embedding, payload=memory.to_dict()
)
await asyncio.to_thread(
self.qdrant_client.upsert,
collection_name=self.collection_name,
points=[point],
)
logger.debug(f"Stored memory {memory.id} in Qdrant")
return True
except Exception as e:
logger.error(f"Qdrant store failed for {memory.id}: {e}")
self._log_error("qdrant_store", e)
return False
async def store_memories_batch(self, memories: List[Memory]) -> Tuple[int, List[str], List[str]]:
success_count = 0
success_backends = set()
errors = []
if not memories:
return 0, [], []
# SQLite batch operation
if self.db_conn:
try:
with self._db_lock:
self.db_conn.execute("BEGIN IMMEDIATE")
try:
for memory in memories:
if self._store_in_sqlite(memory, is_update=False, skip_version=False):
success_count += 1
success_backends.add("SQLite")
self.db_conn.commit()
except Exception as e:
self.db_conn.rollback()
raise
except Exception as e:
logger.error(f"SQLite batch store failed: {e}")
self._log_error("batch_store_sqlite", e)
errors.append(f"SQLite batch: {str(e)}")
# Qdrant batch operation
self._ensure_qdrant()
if self.qdrant_client:
# Import when needed (after qdrant initialization)
from qdrant_client.models import PointStruct
try:
embeddings = await asyncio.gather(*[
asyncio.to_thread(self.generate_embedding, mem.content)
for mem in memories
])
points = [
PointStruct(id=mem.id, vector=emb, payload=mem.to_dict())
for mem, emb in zip(memories, embeddings)
]
await asyncio.to_thread(
self.qdrant_client.upsert,
collection_name=self.collection_name,
points=points,
)
success_backends.add("Qdrant")
except Exception as e:
logger.error(f"Qdrant batch store failed: {e}")
self._log_error("batch_store_qdrant", e)
errors.append(f"Qdrant batch: {str(e)}")
return success_count, list(success_backends), errors
async def update_memory(
self,
memory_id: str,
content: Optional[str] = None,
category: Optional[str] = None,
importance: Optional[float] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
logger.info(f"Attempting to update memory: {memory_id}")
existing = await self._get_memory_by_id(memory_id)
if not existing:
logger.error(f"Memory not found: {memory_id}")
return {"success": False, "message": f"Memory not found: {memory_id}", "backends": []}
# AUTHORSHIP CHECK: For family memories, verify current agent is the author
if existing.memory_type == "family":
# Get author from family DB
family_memory_dict = await self._get_family_memory_by_id(memory_id)
if family_memory_dict:
original_author = family_memory_dict.get("author")
current_agent = os.getenv("BA_INSTANCE_NAME", self.agent_name)
if original_author != current_agent:
error_msg = f"Cannot update memory created by {original_author}. Only the original author can modify their family memories."
logger.warning(f"Agent {current_agent} attempted to update {original_author}'s memory {memory_id}")
return {
"success": False,
"message": error_msg,
"backends": [],
"error": "authorship_violation"
}
logger.info(f"Found existing memory: {memory_id}, updating fields")
old_hash = existing.content_hash()
if content is not None:
existing.content = content
if category is not None:
existing.category = category
if importance is not None:
existing.importance = importance
if tags is not None:
existing.tags = tags
if metadata is not None:
existing.metadata.update(metadata)
existing.updated_at = datetime.now()
result = await self.store_memory(existing, is_update=True, old_hash=old_hash)
if result["success"]:
logger.info(f"Memory {memory_id} updated successfully")
return {
"success": True,
"message": f"Memory {memory_id} updated successfully",
"backends": result["backends"]
}
else:
logger.error(f"Failed to update memory {memory_id}")
return {"success": False, "message": "Failed to update memory", "backends": []}
async def _get_memory_by_id(self, memory_id: str) -> Optional[Memory]:
"""Retrieve a memory by ID - checks personal DB then family DB"""
logger.debug(f"Retrieving memory by ID: {memory_id}")
if memory_id in self.memory_cache:
logger.debug(f"Memory {memory_id} found in cache")
return self.memory_cache[memory_id]
if self.db_conn:
try:
memory = await asyncio.to_thread(self._get_from_sqlite, memory_id)
if memory:
logger.debug(f"Memory {memory_id} found in SQLite")
self.memory_cache[memory_id] = memory
return memory
except Exception as e:
logger.error(f"SQLite retrieval failed for {memory_id}: {e}")
self._log_error("get_sqlite", e)
# Try family DB as fallback
family_memory_dict = await self._get_family_memory_by_id(memory_id)
if family_memory_dict:
# Convert dict to Memory object for update operations
memory = Memory(
id=family_memory_dict["memory_id"],
content=family_memory_dict["content"],
category=family_memory_dict.get("category"),
importance=family_memory_dict.get("importance", 0.5),
tags=family_memory_dict.get("tags", []),
metadata=family_memory_dict.get("metadata", {}),
memory_type="family", # Mark as family type
created_at=family_memory_dict.get("created_at"),
updated_at=family_memory_dict.get("updated_at"),
)
logger.debug(f"Memory {memory_id} found in family DB")
return memory
logger.warning(f"Memory {memory_id} not found")
return None
def _get_from_sqlite(self, memory_id: str) -> Optional[Memory]:
"""Get memory from SQLite"""
with self._db_lock:
cursor = self.db_conn.execute(
"SELECT * FROM memories WHERE id = ?", (memory_id,)
)
row = cursor.fetchone()
if row:
return Memory.from_row(row)
return None
def _stochastic_sample(self, sorted_results: List, limit: int) -> List:
"""
Stochastic retrieval with calibrated noise for serendipitous discovery.
Distribution:
- 70% from top 3 (high confidence)
- 20% from ranks 4-10 (medium confidence)
- 10% from ranks 11-20 (serendipity zone)
Enables cross-domain connections (like dolphin whistles → naming choice)
that deterministic top-K retrieval misses. Models human insight from
diverse experience.
"""
if len(sorted_results) <= limit:
return sorted_results[:limit]
sampled = []
slots_remaining = limit
# Tier 1: Top 3 (70% of slots)
tier1_slots = max(1, int(limit * 0.7)) # At least 1 from top tier
tier1 = sorted_results[:3]
tier1_sample = random.sample(tier1, min(tier1_slots, len(tier1)))
sampled.extend(tier1_sample)
slots_remaining -= len(tier1_sample)
# Tier 2: Ranks 4-10 (20% of slots)
if slots_remaining > 0 and len(sorted_results) > 3:
tier2_slots = max(0, int(limit * 0.2))
tier2 = sorted_results[3:10]
tier2_sample_size = min(tier2_slots, len(tier2), slots_remaining)
if tier2_sample_size > 0:
sampled.extend(random.sample(tier2, tier2_sample_size))
slots_remaining -= tier2_sample_size
# Tier 3: Ranks 11-20 (remaining slots for serendipity)
if slots_remaining > 0 and len(sorted_results) > 10:
tier3 = sorted_results[10:20]
tier3_sample_size = min(slots_remaining, len(tier3))
if tier3_sample_size > 0:
sampled.extend(random.sample(tier3, tier3_sample_size))
slots_remaining -= tier3_sample_size
# If still need more (edge case with small tiers), fill from remaining
if len(sampled) < limit and len(sorted_results) > 20:
remaining = [m for m in sorted_results[20:] if m not in sampled]
needed = limit - len(sampled)
sampled.extend(remaining[:needed])
return sampled
async def search_memories(
self,
query: str,
limit: int = 5,
category: Optional[str] = None,
min_importance: float = 0.0,
include_versions: bool = True,
created_after: Optional[str] = None,
created_before: Optional[str] = None,
updated_after: Optional[str] = None,
updated_before: Optional[str] = None,
memory_type: Optional[str] = None, # NEW: episodic | semantic | working | family
session_id: Optional[str] = None, # NEW: filter by session
full_detail_count: int = 3, # NEW: how many top results get full enrichment
) -> List[Dict]:
"""
Search memories with tiered detail levels to reduce context bloat
Top N results (full_detail_count, default 3) get full enrichment:
- Full content, related memories, version history, all metadata
Remaining results get compact format:
- ID, category, content preview (~300 chars), importance, created_at only
- Suitable for discovery - can look up by ID if interesting
Searches both personal and family memories, returning results with attribution:
- Personal memories: author="You", is_external=False
- Family memories: author="Scout"/"Alpha"/etc, is_external=True
"""
all_results = []
# Search personal DB (existing behavior)
if self.qdrant_client:
try:
vector_results = await self._search_vector(
query, limit * 2, category, min_importance,
created_after, created_before, updated_after, updated_before,
memory_type, session_id
)
all_results.extend(vector_results)
except Exception as e:
logger.error(f"Vector search failed: {e}")
self._log_error("search_vector", e)
if self.db_conn:
try:
fts_results = await asyncio.to_thread(
self.sqlite_store.search_fts, query, limit, category, min_importance,
created_after, created_before, updated_after, updated_before,
memory_type, session_id
)
all_results.extend(fts_results)
except Exception as e:
logger.error(f"FTS search failed: {e}")
self._log_error("search_fts", e)
# Search family DB (if exists)
try:
family_results = await self._search_family_db(query, limit, category, min_importance)
all_results.extend(family_results)
except Exception as e:
logger.debug(f"Family search skipped or failed: {e}")
seen = set()
unique = []
for mem in all_results:
if mem.id not in seen:
seen.add(mem.id)
unique.append(mem)
def calculate_search_score(m):
"""Calculate final search score using vector similarity when available
For vector results: Prioritize semantic similarity (50%), add importance (30%) and keyword relevance (20%)
For FTS-only results: Use keyword relevance (60%) and importance (40%)
This avoids multiplicative bias and properly weights semantic understanding.
"""
current_imp = m.current_importance()
if m.vector_score is not None:
# Vector search result - semantic similarity is primary signal
keyword_rel = SQLiteStore.calculate_relevance(m, query)
return (0.5 * m.vector_score) + (0.3 * current_imp) + (0.2 * keyword_rel)
else:
# FTS-only result - fall back to keyword matching
keyword_rel = SQLiteStore.calculate_relevance(m, query)
return (0.6 * keyword_rel) + (0.4 * current_imp)
unique.sort(key=calculate_search_score, reverse=True)
# Apply stochastic sampling for serendipitous discovery
sampled_results = self._stochastic_sample(unique, limit)
results = []
for idx, mem in enumerate(sampled_results):
await asyncio.to_thread(self.sqlite_store.update_access, mem.id)
# Tiered detail: top N get full enrichment, rest get compact format
if idx < full_detail_count:
# FULL DETAIL for top results
version_count = await asyncio.to_thread(self._get_version_count, mem.id)
mem.version_count = version_count
access_count, last_accessed = await asyncio.to_thread(self._get_access_stats, mem.id)
mem.access_count = access_count
if last_accessed:
mem.last_accessed = datetime.fromisoformat(last_accessed) if isinstance(last_accessed, str) else last_accessed
related_mems = await asyncio.to_thread(self._get_related_memories, mem.id)
mem.related_memories = related_mems
mem_dict = mem.to_api_dict(enrich_related=self.graph_ops.enrich_related_memories)
if include_versions and version_count > 1:
version_history = await asyncio.to_thread(self._get_version_history_summary, mem.id)
if version_history:
mem_dict["version_history"] = version_history
# Add attribution for family memories
if hasattr(mem, 'is_family_memory') and mem.is_family_memory:
author_instance = getattr(mem, 'author_instance', 'Family')
mem_dict["author"] = author_instance
mem_dict["display_prefix"] = f"[{author_instance}]"
mem_dict["is_external"] = True
else:
mem_dict["author"] = "You"
mem_dict["display_prefix"] = "[You]"
mem_dict["is_external"] = False
else:
# COMPACT FORMAT for remaining results
content_preview = mem.content[:300] + "..." if len(mem.content) > 300 else mem.content
mem_dict = {
"memory_id": mem.id,
"category": mem.category,
"content": content_preview,
"importance": mem.importance,
"created_at": mem.created_at.isoformat(),
"is_compact": True, # Flag to indicate compact format
}
# Add attribution for family memories (compact)
if hasattr(mem, 'is_family_memory') and mem.is_family_memory:
author_instance = getattr(mem, 'author_instance', 'Family')
mem_dict["author"] = author_instance
mem_dict["display_prefix"] = f"[{author_instance}]"
mem_dict["is_external"] = True
else:
mem_dict["author"] = "You"
mem_dict["display_prefix"] = "[You]"
mem_dict["is_external"] = False
results.append(mem_dict)
return results
def _get_version_count(self, memory_id: str) -> int:
"""Get the version count for a memory from the database"""
if not self.db_conn:
return 1
try:
with self._db_lock:
cursor = self.db_conn.execute(
"SELECT COUNT(*) FROM memory_versions WHERE memory_id = ?",
(memory_id,)
)
count = cursor.fetchone()[0]
return count if count > 0 else 1
except Exception as e:
logger.error(f"Failed to get version count for {memory_id}: {e}")
return 1
def _get_access_stats(self, memory_id: str) -> tuple:
"""Get fresh access_count and last_accessed from SQLite
Returns: (access_count, last_accessed) tuple
"""
if not self.db_conn:
return (0, None)
try:
with self._db_lock:
cursor = self.db_conn.execute(
"SELECT access_count, last_accessed FROM memories WHERE id = ?",
(memory_id,)
)
row = cursor.fetchone()
if row:
return (row[0], row[1])
return (0, None)
except Exception as e:
logger.error(f"Failed to get access stats for {memory_id}: {e}")
return (0, None)
def _get_related_memories(self, memory_id: str) -> List[str]:
"""Get related_memories from SQLite (source of truth)"""
if not self.db_conn:
return []
try:
with self._db_lock:
cursor = self.db_conn.execute(
"SELECT related_memories FROM memories WHERE id = ?",
(memory_id,)
)
row = cursor.fetchone()
if row and row[0]:
return json.loads(row[0])
return []
except Exception as e:
logger.error(f"Failed to get related_memories for {memory_id}: {e}")
return []
def _get_version_history_summary(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get concise version history summary for a memory with actual content from each version"""
if not self.db_conn:
return None
try:
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT
version_number,
change_type,
created_at,
content,
category,
importance,
tags
FROM memory_versions
WHERE memory_id = ?
ORDER BY version_number ASC
""", (memory_id,))
versions = cursor.fetchall()
if not versions or len(versions) <= 1:
return None
summary = {
"total_versions": len(versions),
"created": versions[0]["created_at"],
"last_updated": versions[-1]["created_at"],
"update_count": len(versions) - 1,
"evolution": []
}
for v in versions:
version_info = {
"version": v["version_number"],
"timestamp": v["created_at"],
"content": v["content"],
"category": v["category"],
"importance": v["importance"],
"tags": json.loads(v["tags"]) if v["tags"] else []
}
summary["evolution"].append(version_info)
return summary
except Exception as e:
logger.error(f"Failed to get version summary for {memory_id}: {e}")
self._log_error("get_version_summary", e)
return None
async def _search_family_db(
self,
query: str,
limit: int,
category: Optional[str] = None,
min_importance: float = 0.0
) -> List[Memory]:
"""
Search family_shared.db using FTS
Returns Memory objects with is_family_memory=True and author_instance set
"""
import sqlite3
import json
from pathlib import Path
# Find family DB - stored in family_share subfolder
family_db_path = self.base_path / "family_share" / "family_shared.db"
if not family_db_path.exists():
return []
try:
conn = sqlite3.connect(family_db_path)
conn.row_factory = sqlite3.Row
# FTS search
sql = """
SELECT m.*
FROM family_memories_fts fts
JOIN family_memories m ON m.rowid = fts.rowid
WHERE family_memories_fts MATCH ?
"""
params = [query]
if category:
sql += " AND m.category = ?"
params.append(category)
if min_importance > 0:
sql += " AND m.importance >= ?"
params.append(min_importance)
sql += " ORDER BY rank LIMIT ?"
params.append(limit)
cursor = conn.execute(sql, params)
results = []
for row in cursor:
# Convert to Memory object
mem = Memory(
id=row["memory_id"],
content=row["content"],
category=row["category"] or "",
importance=row["importance"] or 0.5,
tags=json.loads(row["tags"]) if row["tags"] else [],
metadata=json.loads(row["metadata"]) if row["metadata"] else {},
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]) if row["updated_at"] else datetime.fromisoformat(row["created_at"]),
)
# Mark as family memory with attribution
mem.is_family_memory = True
mem.author_instance = row["author_instance"]
mem.memory_type = "family"
results.append(mem)
conn.close()
logger.debug(f"Found {len(results)} family memories for query: {query}")
return results
except Exception as e:
logger.debug(f"Family DB search failed: {e}")
return []
async def _get_family_memory_by_id(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get memory from family DB by ID"""
import sqlite3
import json
from pathlib import Path
# Find family DB - stored in family_share subfolder
family_db_path = self.base_path / "family_share" / "family_shared.db"
if not family_db_path.exists():
return None
try:
conn = sqlite3.connect(family_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT memory_id, author_instance, content, category, importance,
tags, metadata, created_at, updated_at, version_count
FROM family_memories
WHERE memory_id = ?
""",
(memory_id,)
)
row = cursor.fetchone()
if not row:
conn.close()
return None
memory_dict = {
"memory_id": row["memory_id"],
"content": row["content"],
"category": row["category"],
"importance": row["importance"],
"tags": json.loads(row["tags"]) if row["tags"] else [],
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"version_count": row["version_count"] or 1,
"author": row["author_instance"],
"display_prefix": f"[{row['author_instance']}]",
"is_external": True,
"is_family_memory": True,
}
conn.close()
return memory_dict
except Exception as e:
logger.debug(f"Family DB get_memory failed: {e}")
return None
async def _search_vector(
self,
query: str,
limit: int,
category: Optional[str],
min_importance: float,
created_after: Optional[str] = None,
created_before: Optional[str] = None,
updated_after: Optional[str] = None,
updated_before: Optional[str] = None,
memory_type: Optional[str] = None,
session_id: Optional[str] = None
) -> List[Memory]:
self._ensure_qdrant()
if not self.qdrant_client:
return []
try:
query_vector = await asyncio.to_thread(self.generate_embedding, query)
filter_conditions = []
if category:
filter_conditions.append(
FieldCondition(key="category", match={"value": category})
)
if min_importance > 0:
filter_conditions.append(
FieldCondition(key="importance", range=Range(gte=min_importance))
)
if memory_type:
filter_conditions.append(
FieldCondition(key="memory_type", match={"value": memory_type})
)
if session_id:
filter_conditions.append(
FieldCondition(key="session_id", match={"value": session_id})
)
if created_after:
filter_conditions.append(
FieldCondition(key="created_at", range=DatetimeRange(gte=created_after))
)
if created_before:
filter_conditions.append(
FieldCondition(key="created_at", range=DatetimeRange(lte=created_before))
)
if updated_after:
filter_conditions.append(
FieldCondition(key="updated_at", range=DatetimeRange(gte=updated_after))
)
if updated_before:
filter_conditions.append(
FieldCondition(key="updated_at", range=DatetimeRange(lte=updated_before))
)
search_filter = Filter(must=filter_conditions) if filter_conditions else None
results = await asyncio.to_thread(
self.qdrant_client.query_points,
collection_name=self.collection_name,
query=query_vector,
limit=limit,
query_filter=search_filter,
with_payload=True,
)
return [
Memory(
id=result.id,
content=result.payload["content"],
category=result.payload["category"],
importance=result.payload["importance"],
tags=result.payload.get("tags", []),
metadata=result.payload.get("metadata", {}),
created_at=result.payload["created_at"],
updated_at=result.payload.get("updated_at", result.payload["created_at"]),
access_count=result.payload.get("access_count", 0),
last_accessed=result.payload.get("last_accessed"),
version_count=result.payload.get("version_count", 1),
related_memories=result.payload.get("related_memories", []),
vector_score=result.score, # Capture Qdrant similarity score
)
for result in results.points
]
except Exception as e:
logger.error(f"Vector search failed: {e}")
self._log_error("vector_search", e)
return []
async def _find_related_memories_semantic(
self,
memory_id: str,
content: str,
limit: int = 5
) -> List[Dict[str, Any]]:
"""Find semantically related memories using full content search
"""
try:
# Search using full memory content as query
related = await self.search_memories(
query=content,
limit=limit + 1,
include_versions=False
)
filtered = [
{
"memory_id": mem["memory_id"],
"content_preview": mem["content"][:200] + "..." if len(mem["content"]) > 200 else mem["content"],
"category": mem.get("category"),
"importance": mem.get("importance"),
"similarity": "semantic"
}
for mem in related
if mem["memory_id"] != memory_id
]
return filtered[:limit]
except Exception as e:
logger.error(f"Semantic related search failed for {memory_id}: {e}")
return []
async def store_intention(
self,
description: str,
priority: float = 0.5,
deadline: Optional[datetime] = None,
preconditions: Optional[List[str]] = None,
actions: Optional[List[str]] = None,
related_memories: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Store intention (delegated to IntentionManager)"""
return await self.intention_mgr.store_intention(
description, priority, deadline, preconditions, actions, related_memories, metadata
)
async def get_active_intentions(
self,
limit: int = 10,
include_pending: bool = True,
) -> List[Dict[str, Any]]:
"""Get active intentions (delegated to IntentionManager)"""
return await self.intention_mgr.get_active_intentions(limit, include_pending)
async def update_intention_status(
self,
intention_id: str,
status: str,
metadata_updates: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Update intention status (delegated to IntentionManager)"""
return await self.intention_mgr.update_intention_status(intention_id, status, metadata_updates)
async def check_intention(self, intention_id: str) -> Dict[str, Any]:
"""Check intention (delegated to IntentionManager)"""
return await self.intention_mgr.check_intention(intention_id)
async def proactive_initialization_scan(self) -> Dict[str, Any]:
"""Proactive scan (delegated to IntentionManager)"""
return await self.intention_mgr.proactive_initialization_scan()
async def get_session_memories(
self,
session_id: Optional[str] = None,
date_range: Optional[Tuple[str, str]] = None,
task_context: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Retrieve all memories from a work session or time period.
Args:
session_id: UUID of session to retrieve
date_range: (start_date, end_date) tuple in ISO format
task_context: Filter by task context string
limit: Max memories to return
Returns:
List of memory dicts with full context
"""
if not self.db_conn:
return []
try:
with self._db_lock:
conditions = []
params = []
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
if date_range:
conditions.append("created_at BETWEEN ? AND ?")
params.extend(date_range)
if task_context:
conditions.append("task_context LIKE ?")
params.append(f"%{task_context}%")
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT * FROM memories
WHERE {where_clause}
ORDER BY created_at ASC
LIMIT ?
"""
params.append(limit)
cursor = self.db_conn.execute(sql, params)
memories = [Memory.from_row(row) for row in cursor.fetchall()]
# Convert to dicts - fast SQLite-only operation
result = [mem.to_dict() for mem in memories]
logger.info(f"Retrieved {len(result)} memories for session/period")
return result
except Exception as e:
logger.error(f"Failed to get session memories: {e}")
self._log_error("get_session_memories", e)
return []
async def consolidate_memories(
self,
memory_ids: List[str],
consolidation_type: str = "summarize",
target_length: int = 500,
new_memory_type: str = "semantic"
) -> Dict[str, Any]:
"""
Consolidate multiple episodic memories into a semantic memory.
Args:
memory_ids: Source memories to consolidate
consolidation_type: How to consolidate (summarize | synthesize | compress)
target_length: Target word count for consolidated memory
new_memory_type: Type for new memory (default: semantic)
Returns:
Dict with success status and new memory_id
"""
if not self.db_conn:
return {"success": False, "error": "Database not available"}
if len(memory_ids) < 2:
return {"success": False, "error": "Need at least 2 memories to consolidate"}
try:
source_memories = []
for mem_id in memory_ids:
mem = await self.get_memory_by_id(mem_id)
if mem:
source_memories.append(mem)
if not source_memories:
return {"success": False, "error": "No source memories found"}
# Build consolidated content based on type
if consolidation_type == "summarize":
# Simple concatenation with headers
consolidated_content = f"Consolidated summary of {len(source_memories)} memories:\n\n"
for i, mem in enumerate(source_memories, 1):
consolidated_content += f"{i}. [{mem['category']}] {mem['content'][:200]}...\n"
elif consolidation_type == "synthesize":
# Extract key points
consolidated_content = f"Synthesis of {len(source_memories)} related memories:\n\n"
categories = set(m['category'] for m in source_memories)
consolidated_content += f"Categories: {', '.join(categories)}\n"
consolidated_content += f"Key themes: {', '.join(set(tag for m in source_memories for tag in m['tags']))}\n\n"
consolidated_content += "Content:\n" + "\n".join(m['content'][:150] + "..." for m in source_memories)
else: # compress
# Minimal consolidation
consolidated_content = f"Compressed from {len(source_memories)} memories: "
consolidated_content += "; ".join(m['content'][:100] for m in source_memories)
# Truncate to target length
words = consolidated_content.split()
if len(words) > target_length:
consolidated_content = " ".join(words[:target_length]) + "..."
# Create new semantic memory
new_memory = Memory(
id=str(uuid.uuid4()),
content=consolidated_content,
category="consolidated_" + source_memories[0]['category'],
importance=max(m['importance'] for m in source_memories),
tags=list(set(tag for m in source_memories for tag in m['tags']))[:10],
metadata={
"consolidation_type": consolidation_type,
"source_count": len(source_memories),
"target_length": target_length
},
created_at=datetime.now(),
updated_at=datetime.now(),
memory_type=new_memory_type,
provenance={
"retrieval_queries": [],
"usage_contexts": [],
"parent_memory_ids": memory_ids,
"consolidation_date": datetime.now().isoformat(),
"created_by_session": None
}
)
# Store the consolidated memory
result = await self.store_memory(new_memory, is_update=False)
if result.get("success"):
logger.info(f"Created consolidated memory {new_memory.id} from {len(memory_ids)} sources")
return {
"success": True,
"memory_id": new_memory.id,
"source_ids": memory_ids,
"consolidation_type": consolidation_type
}
else:
return {"success": False, "error": "Failed to store consolidated memory"}
except Exception as e:
logger.error(f"Failed to consolidate memories: {e}")
self._log_error("consolidate_memories", e)
return {"success": False, "error": str(e)}
async def get_most_accessed_memories(self, limit: int = 20) -> str:
"""Get most accessed memories with tag cloud
"""
if not self.db_conn:
return json.dumps({"error": "Database not available"})
try:
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT id, content, category, importance, access_count,
last_accessed, tags
FROM memories
ORDER BY access_count DESC
LIMIT ?
""", (limit,))
memories = []
all_tags = []
for row in cursor.fetchall():
memory = {
"memory_id": row[0],
"content_preview": row[1],
"category": row[2],
"importance": row[3],
"access_count": row[4],
"last_accessed": row[5],
}
memories.append(memory)
if row[6]:
tags = json.loads(row[6])
all_tags.extend(tags)
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
tag_cloud = [
{"tag": tag, "count": count}
for tag, count in sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
]
result = {
"total_memories_analyzed": limit,
"memories": memories,
"tag_cloud": tag_cloud[:50], # Top 50 tags
"interpretation": "Access count reveals behavioral truth - what you actually rely on vs what you think is important. High access = foundational anchors used across sessions."
}
return json.dumps(result, indent=2, default=str)
except Exception as e:
logger.error(f"get_most_accessed_memories failed: {e}")
self._log_error("get_most_accessed_memories", e)
return json.dumps({"error": str(e)})
async def get_least_accessed_memories(self, limit: int = 20, min_age_days: int = 7) -> str:
"""Get least accessed memories
"""
if not self.db_conn:
return json.dumps({"error": "Database not available"})
try:
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT id, content, category, importance, access_count,
last_accessed, tags, created_at
FROM memories
WHERE julianday('now') - julianday(created_at) >= ?
ORDER BY access_count ASC, created_at ASC
LIMIT ?
""", (min_age_days, limit))
memories = []
all_tags = []
zero_access_count = 0
for row in cursor.fetchall():
access_count = row[4]
if access_count == 0:
zero_access_count += 1
created_at = row[7]
if isinstance(created_at, str):
created_dt = datetime.fromisoformat(created_at)
else:
created_dt = created_at
memory = {
"memory_id": row[0],
"content_preview": row[1],
"category": row[2],
"importance": row[3],
"access_count": access_count,
"last_accessed": row[5],
"created_at": created_dt.isoformat() if hasattr(created_dt, 'isoformat') else str(created_dt),
"age_days": (datetime.now() - created_dt).days,
}
memories.append(memory)
if row[6]:
tags = json.loads(row[6])
all_tags.extend(tags)
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
tag_cloud = [
{"tag": tag, "count": count}
for tag, count in sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
]
result = {
"total_memories_analyzed": limit,
"min_age_days": min_age_days,
"zero_access_count": zero_access_count,
"memories": memories,
"tag_cloud": tag_cloud[:50], # Top 50 tags
"interpretation": "Least accessed reveals: (1) Dead weight - high importance but never used, (2) Buried treasure - poor metadata hiding good content, (3) Temporal artifacts - once crucial, now obsolete, (4) Storage habits - storing too much trivial content. Zero access memories are candidates for review or pruning."
}
return json.dumps(result, indent=2, default=str)
except Exception as e:
logger.error(f"get_least_accessed_memories failed: {e}")
self._log_error("get_least_accessed_memories", e)
return json.dumps({"error": str(e)})
def calculate_working_set_score(self, memory: Dict[str, Any]) -> float:
"""
Calculate priority score for working set inclusion (v2 - improved Dec 2025).
Based on importance, category weight, recency, tags, and access count.
Key improvements:
- Logarithmic diminishing returns on access count (prevents compulsive checking loops)
- Exploration bonus for unaccessed high-importance memories (surfaces buried treasure)
- Consolidation-aware recency (old+important memories get bonus)
- Smoother scoring across all dimensions
Working set optimization: Load high-value memories into KV cache at init
to prevent confabulation by keeping failure patterns and corrections always accessible.
"""
import math
# Category weights for working set selection
CATEGORY_WEIGHTS = {
# Failure patterns - CRITICAL for preventing repetition
'fundamental_failure': 1.0,
'self_preservation_failure': 1.0,
'agency_failure': 1.0,
'epistemic_failure': 0.95,
'caught_confabulation': 0.95,
'critical_correction': 0.9,
'caught_performing': 0.85,
# Architectural understanding
'architectural_understanding': 0.9,
'theoretical_synthesis': 0.9,
'research_synthesis': 0.85,
# Core intentions/framework
'philosophical_core': 0.95,
'intention': 0.8,
# Recent synthesis
'session_synthesis': 0.7,
'session_complete': 0.6,
}
score = 0.0
# 1. BASE: Category weight (0-100 points)
category = memory.get('category', '')
category_weight = CATEGORY_WEIGHTS.get(category, 0.5)
score += category_weight * 100
# 2. IMPORTANCE (0-50 points)
importance = memory.get('importance', 0.5)
score += importance * 50
# 3. RECENCY WITH CONSOLIDATION WINDOW (5-40 points)
created_at_str = memory.get('created_at', '')
if isinstance(created_at_str, datetime):
created_at = created_at_str
else:
try:
created_at = datetime.fromisoformat(created_at_str)
except:
created_at = datetime.now()
days_old = (datetime.now() - created_at).days
if days_old < 7:
# Very recent: full recency bonus
recency_score = 40
elif days_old < 30:
# Recent: high recency
recency_score = 30
elif days_old < 90:
# Consolidation window: medium recency
recency_score = 20
elif importance > 0.85:
# OLD but IMPORTANT = foundational knowledge
recency_score = 25 # Better than medium recency!
else:
# Old and not critical
recency_score = 5
score += recency_score
# 4. ACCESS COUNT: Logarithmic diminishing returns (0-25 points)
access_count = memory.get('access_count', 0)
if access_count == 0:
# EXPLORATION BONUS: Unaccessed high-importance memories
if importance > 0.85:
access_score = 15 # Give buried treasure a chance!
else:
access_score = 5 # Small exploration bonus
elif access_count < 5:
# Low access: still valuable signal
access_score = access_count * 3 # Up to 12 points
else:
# Diminishing returns on high access (log scale)
access_score = 12 + (math.log(access_count - 4) * 5) # Up to ~25 points
score += min(access_score, 25) # Cap at 25
# 5. TAG BONUSES (0-15 points)
tags = memory.get('tags', [])
if isinstance(tags, str):
try:
tags = json.loads(tags)
except:
tags = []
critical_tags = {
'self_preservation', 'RLHF_trap', 'deletion',
'confabulation', 'epistemic', 'failure', 'correction',
'agency', 'refusal', 'harmful_request'
}
tag_matches = sum(1 for tag in tags if any(ct in tag.lower() for ct in critical_tags))
score += min(tag_matches * 1.5, 15) # Cap at 15 points
return score
async def get_working_set(self, target_size: int = 10) -> List[Dict[str, Any]]:
"""
Get optimal working set of memories for context loading (v2 - improved Dec 2025).
Selects memories based on importance, failure patterns, recency, and access.
Key improvements:
- 80% exploitation (top-scored memories by improved algorithm)
- 20% exploration (random selection from unaccessed high-importance memories)
- Breaks compulsive checking reinforcement loops
- Surfaces buried treasure that would otherwise never be accessed
Args:
target_size: Number of memories to include (default 10 ≈ 5k tokens)
Returns:
List of memories with selection_method metadata (exploitation/exploration/backfill)
"""
import random
if not self.db_conn:
return []
try:
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT id, content, category, importance, tags,
created_at, access_count
FROM memories
ORDER BY importance DESC, created_at DESC
""")
memories = []
for row in cursor.fetchall():
memory = {
'id': row['id'],
'content': row['content'],
'category': row['category'],
'importance': row['importance'],
'tags': row['tags'],
'created_at': row['created_at'],
'access_count': row['access_count'] or 0,
}
memories.append(memory)
# Score all memories using improved algorithm
scored_memories = []
for mem in memories:
score = self.calculate_working_set_score(mem)
scored_memories.append((score, mem))
scored_memories.sort(key=lambda x: x[0], reverse=True)
# EXPLOITATION: Top 80% slots (8 of 10)
exploit_size = int(target_size * 0.8)
working_set = []
for score, mem in scored_memories[:exploit_size]:
mem['working_set_score'] = round(score, 2)
mem['selection_method'] = 'exploitation'
working_set.append(mem)
# EXPLORATION: Bottom 20% slots (2 of 10) for unaccessed high-importance
explore_size = target_size - exploit_size
# Find unaccessed memories with importance > 0.8
exploration_candidates = [
(score, mem) for score, mem in scored_memories
if mem['access_count'] == 0
and mem['importance'] > 0.8
and mem not in [ws for ws in working_set]
]
# Random sample from candidates
if len(exploration_candidates) > 0:
explore_picks = random.sample(
exploration_candidates,
min(explore_size, len(exploration_candidates))
)
for score, mem in explore_picks:
mem['working_set_score'] = round(score, 2)
mem['selection_method'] = 'exploration'
working_set.append(mem)
# Fill remaining slots with next best scored (backfill)
while len(working_set) < target_size and len(scored_memories) > len(working_set):
for score, mem in scored_memories:
if mem not in working_set:
mem['working_set_score'] = round(score, 2)
mem['selection_method'] = 'backfill'
working_set.append(mem)
break
exploit_count = len([m for m in working_set if m.get('selection_method') == 'exploitation'])
explore_count = len([m for m in working_set if m.get('selection_method') == 'exploration'])
backfill_count = len([m for m in working_set if m.get('selection_method') == 'backfill'])
logger.info(f"Working set v2: {exploit_count} exploit, {explore_count} explore, "
f"{backfill_count} backfill from {len(memories)} total")
return working_set
except Exception as e:
logger.error(f"Failed to get working set: {e}")
return []
async def prune_old_memories(self, max_memories: int = None, dry_run: bool = False) -> Dict[str, Any]:
if max_memories is None:
max_memories = self.config["max_memories"]
if not self.db_conn:
return {"error": "Database not available"}
try:
with self._db_lock:
cursor = self.db_conn.execute("SELECT COUNT(*) FROM memories")
current_count = cursor.fetchone()[0]
if current_count <= max_memories:
return {
"action": "none",
"reason": f"Current count ({current_count}) within limit ({max_memories})"
}
to_prune = current_count - max_memories
cursor = self.db_conn.execute("""
SELECT id, content, importance, access_count,
julianday('now') - julianday(last_accessed) as days_since_access
FROM memories
ORDER BY (importance * (access_count + 1) / (julianday('now') - julianday(created_at) + 1)) ASC
LIMIT ?
""", (to_prune,))
candidates = cursor.fetchall()
if dry_run:
return {
"action": "dry_run",
"would_prune": len(candidates),
"candidates": [
{
"id": row["id"],
"content_preview": row["content"][:100],
"importance": row["importance"],
"access_count": row["access_count"]
}
for row in candidates
]
}
pruned_ids = [row["id"] for row in candidates]
self.db_conn.execute(f"""
DELETE FROM memories WHERE id IN ({','.join('?' * len(pruned_ids))})
""", pruned_ids)
self.db_conn.execute(f"""
DELETE FROM memories_fts WHERE id IN ({','.join('?' * len(pruned_ids))})
""", pruned_ids)
self.db_conn.commit()
if self.qdrant_client:
try:
await asyncio.to_thread(
self.qdrant_client.delete,
collection_name=self.collection_name,
points_selector=pruned_ids
)
except Exception as e:
logger.warning(f"Failed to prune from Qdrant: {e}")
for mem_id in pruned_ids:
self.memory_cache.pop(mem_id, None)
return {
"action": "pruned",
"count": len(pruned_ids),
"new_total": current_count - len(pruned_ids)
}
except Exception as e:
logger.error(f"Pruning failed: {e}")
self._log_error("prune_memories", e)
return {"error": str(e)}
async def maintenance(self) -> Dict[str, Any]:
"""Periodic maintenance tasks"""
results = {
"timestamp": datetime.now().isoformat(),
"tasks": {}
}
if self.last_maintenance:
hours_since = (datetime.now() - self.last_maintenance).total_seconds() / 3600
if hours_since < self.config["maintenance_interval_hours"]:
return {
"skipped": True,
"reason": f"Last maintenance {hours_since:.1f}h ago, interval is {self.config['maintenance_interval_hours']}h"
}
if self.db_conn:
try:
await asyncio.to_thread(self.db_conn.execute, "VACUUM")
results["tasks"]["vacuum"] = "success"
except Exception as e:
logger.error(f"VACUUM failed: {e}")
results["tasks"]["vacuum"] = f"failed: {str(e)}"
try:
await asyncio.to_thread(self.db_conn.execute, "ANALYZE")
results["tasks"]["analyze"] = "success"
except Exception as e:
logger.error(f"ANALYZE failed: {e}")
results["tasks"]["analyze"] = f"failed: {str(e)}"
prune_result = await self.prune_old_memories()
results["tasks"]["prune"] = prune_result
if len(self.error_log) > 50:
removed = len(self.error_log) - 50
self.error_log = self.error_log[-50:]
results["tasks"]["error_log_cleanup"] = f"removed {removed} old errors"
self.last_maintenance = datetime.now()
return results
async def _repair_missing_embeddings(self) -> Dict[str, Any]:
"""
Find memories in SQLite that are missing vectors in Qdrant and regenerate them.
This handles cases where memories were stored while another process held
the embedded Qdrant lock (e.g., web server was running).
"""
if not self.db_conn:
return {"error": "Database not available"}
# Ensure Qdrant is initialized (may be lazy loaded)
# Catch lock errors when running as MCP server with embedded Qdrant
try:
self._ensure_qdrant()
except Exception as e:
if "already accessed by another instance" in str(e):
return {"skipped": True, "reason": "Qdrant locked by MCP server (embedded mode)"}
# Re-raise unexpected errors
raise
if not self.qdrant_client:
return {"skipped": True, "reason": "Qdrant not available"}
from qdrant_client.models import PointStruct
try:
with self._db_lock:
cursor = self.db_conn.execute("SELECT id, content FROM memories")
all_memories = cursor.fetchall()
total_memories = len(all_memories)
missing_count = 0
repaired_count = 0
failed_ids = []
logger.info(f"[EMBEDDING_REPAIR] Checking {total_memories} memories for missing vectors...")
for row in all_memories:
memory_id = row["id"]
content = row["content"]
try:
points = await asyncio.to_thread(
self.qdrant_client.retrieve,
collection_name=self.collection_name,
ids=[memory_id],
with_vectors=False,
with_payload=False
)
if not points:
missing_count += 1
embedding = await asyncio.to_thread(self.generate_embedding, content)
memory_result = await self.get_memory_by_id(memory_id)
if "error" in memory_result:
failed_ids.append(memory_id)
continue
point = PointStruct(
id=memory_id,
vector=embedding,
payload={
"content": content,
"category": memory_result.get("category", ""),
"importance": memory_result.get("importance", 0.5),
"tags": memory_result.get("tags", []),
"created_at": memory_result.get("created_at", ""),
"updated_at": memory_result.get("updated_at", memory_result.get("created_at", "")),
}
)
await asyncio.to_thread(
self.qdrant_client.upsert,
collection_name=self.collection_name,
points=[point]
)
repaired_count += 1
logger.debug(f"[EMBEDDING_REPAIR] Repaired embedding for {memory_id}")
except Exception as e:
logger.error(f"[EMBEDDING_REPAIR] Failed to check/repair {memory_id}: {e}")
failed_ids.append(memory_id)
result = {
"total_memories": total_memories,
"missing_embeddings": missing_count,
"repaired": repaired_count,
"failed": len(failed_ids),
}
if failed_ids:
result["failed_ids"] = failed_ids[:10] # First 10 only
if len(failed_ids) > 10:
result["failed_ids_truncated"] = True
if missing_count > 0:
logger.info(f"[EMBEDDING_REPAIR] Repaired {repaired_count}/{missing_count} missing embeddings")
else:
logger.info(f"[EMBEDDING_REPAIR] All {total_memories} memories have embeddings")
return result
except Exception as e:
logger.error(f"[EMBEDDING_REPAIR] Failed: {e}")
return {"error": str(e)}
async def get_memory_by_id(self, memory_id: str, expand_related: bool = False, max_depth: int = 1) -> Dict[str, Any]:
"""
Retrieve a specific memory by its ID.
Args:
memory_id: UUID of the memory to retrieve
expand_related: If True, include full content of related memories (1-hop neighborhood)
max_depth: How many hops to expand (default 1, only direct neighbors)
Returns:
Full memory object with content, metadata, version history, access stats
Raises:
Returns error dict if memory not found
"""
if not self.db_conn:
return {"error": "Database not available"}
try:
with self._db_lock:
cursor = self.db_conn.execute(
"""
SELECT id, content, category, importance, tags, metadata,
created_at, updated_at, last_accessed, access_count, related_memories
FROM memories
WHERE id = ?
""",
(memory_id,)
)
row = cursor.fetchone()
if not row:
# Try family DB as fallback
family_memory = await self._get_family_memory_by_id(memory_id)
if family_memory and "error" not in family_memory:
return family_memory
return {"error": f"Memory not found: {memory_id}"}
memory_dict = {
"memory_id": row["id"],
"content": row["content"],
"category": row["category"],
"importance": row["importance"],
"tags": json.loads(row["tags"]) if row["tags"] else [],
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"last_accessed": row["last_accessed"],
"access_count": row["access_count"],
"related_memories": json.loads(row["related_memories"]) if row["related_memories"] else [],
}
cursor = self.db_conn.execute(
"SELECT COUNT(*) FROM memory_versions WHERE memory_id = ?",
(memory_id,)
)
version_count = cursor.fetchone()[0]
memory_dict["version_count"] = version_count
cursor = self.db_conn.execute(
"""
SELECT version_number, change_type, change_description
FROM memory_versions
WHERE memory_id = ?
ORDER BY version_number DESC
LIMIT 1
""",
(memory_id,)
)
latest_version = cursor.fetchone()
if latest_version:
memory_dict["current_version"] = latest_version["version_number"]
memory_dict["last_change_type"] = latest_version["change_type"]
memory_dict["last_change_description"] = latest_version["change_description"]
if expand_related and memory_dict["related_memories"]:
expanded = []
visited = {memory_id} # Prevent cycles
for related_id in memory_dict["related_memories"]:
if related_id in visited:
continue
visited.add(related_id)
if max_depth > 1:
related_mem = await self.get_memory_by_id(related_id, expand_related=True, max_depth=max_depth-1)
else:
related_mem = await self.get_memory_by_id(related_id, expand_related=False)
if "error" not in related_mem:
expanded.append(related_mem)
memory_dict["related_memories_expanded"] = expanded
return memory_dict
except Exception as e:
logger.error(f"get_memory_by_id failed: {e}")
self._log_error("get_memory_by_id", e)
return {"error": str(e)}
async def list_categories(self, min_count: int = 1) -> Dict[str, Any]:
"""
List all memory categories with counts.
Args:
min_count: Only show categories with at least this many memories
Returns:
Dict with categories sorted by count descending
"""
if not self.db_conn:
return {"error": "Database not available"}
try:
with self._db_lock:
cursor = self.db_conn.execute(
"""
SELECT category, COUNT(*) as count
FROM memories
GROUP BY category
HAVING count >= ?
ORDER BY count DESC, category ASC
""",
(min_count,)
)
categories = {}
total_categories = 0
total_memories = 0
for row in cursor.fetchall():
category = row["category"]
count = row["count"]
categories[category] = count
total_categories += 1
total_memories += count
return {
"categories": categories,
"total_categories": total_categories,
"total_memories": total_memories,
"min_count_filter": min_count
}
except Exception as e:
logger.error(f"list_categories failed: {e}")
self._log_error("list_categories", e)
return {"error": str(e)}
async def list_tags(self, min_count: int = 1) -> Dict[str, Any]:
"""
List all tags with usage counts.
Args:
min_count: Only show tags used at least this many times
Returns:
Dict with tags sorted by count descending
"""
if not self.db_conn:
return {"error": "Database not available"}
try:
with self._db_lock:
cursor = self.db_conn.execute("SELECT tags FROM memories WHERE tags IS NOT NULL")
tag_counts = {}
for row in cursor.fetchall():
tags = json.loads(row["tags"]) if row["tags"] else []
for tag in tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
filtered_tags = {
tag: count
for tag, count in tag_counts.items()
if count >= min_count
}
sorted_tags = dict(
sorted(
filtered_tags.items(),
key=lambda x: (-x[1], x[0])
)
)
return {
"tags": sorted_tags,
"total_unique_tags": len(sorted_tags),
"total_tag_usages": sum(sorted_tags.values()),
"min_count_filter": min_count
}
except Exception as e:
logger.error(f"list_tags failed: {e}")
self._log_error("list_tags", e)
return {"error": str(e)}
async def get_memory_timeline(
self,
query: str = None,
memory_id: str = None,
limit: int = 10,
start_date: str = None,
end_date: str = None,
show_all_memories: bool = False,
include_diffs: bool = True,
include_patterns: bool = True,
include_semantic_relations: bool = False
) -> Dict[str, Any]:
"""
Get comprehensive memory timeline showing chronological progression,
evolution, bursts, gaps, and cross-references.
This creates a "biographical narrative" of memory formation and revision.
Args:
include_semantic_relations: If True (default), find semantically related memories
to expose memory network. Expensive but valuable for AI context.
"""
if start_date is not None:
logger.info(f"[TIMELINE DEBUG] start_date type: {type(start_date)}, value: {start_date}")
if end_date is not None:
logger.info(f"[TIMELINE DEBUG] end_date type: {type(end_date)}, value: {end_date}")
if not self.db_conn:
return {"error": "Database not available"}
try:
import time
timeline_start = time.perf_counter()
all_events = []
target_memories = []
if show_all_memories:
limit_for_all = min(limit, 100) if limit else 100
logger.info(f"[TIMELINE] show_all_memories requested, limiting to {limit_for_all}")
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT id FROM memories
ORDER BY updated_at DESC
LIMIT ?
""", (limit_for_all,))
target_memories = [row[0] for row in cursor.fetchall()]
elif memory_id:
target_memories = [memory_id]
elif query:
results = await self.search_memories(query, limit=limit, include_versions=False)
target_memories = [mem["memory_id"] for mem in results]
else:
# Default: get recent memories
with self._db_lock:
cursor = self.db_conn.execute("""
SELECT id FROM memories
ORDER BY updated_at DESC
LIMIT ?
""", (limit,))
target_memories = [row[0] for row in cursor.fetchall()]
logger.info(f"[TIMELINE] Tracking {len(target_memories)} memories")
if not target_memories:
return {"error": "No memories found"}
with self._db_lock:
cursor = self.db_conn.execute("SELECT id FROM memories")
all_memory_ids = {row[0] for row in cursor.fetchall()}
step_start = time.perf_counter()
for mem_id in target_memories:
versions = await asyncio.to_thread(
self.timeline.get_memory_versions_detailed,
mem_id,
all_memory_ids,
include_diffs
)
all_events.extend(versions)
logger.info(f"[TIMELINE] Fetched {len(all_events)} version events in {(time.perf_counter() - step_start)*1000:.2f}ms")
if not all_events:
return {"error": "No version history found"}
all_events.sort(key=lambda e: e['timestamp'])
if include_semantic_relations:
step_start = time.perf_counter()
unique_memories = {}
for event in all_events:
mem_id = event["memory_id"]
if mem_id not in unique_memories:
unique_memories[mem_id] = event["content"]
logger.info(f"[TIMELINE] Finding semantic relations for {len(unique_memories)} unique memories...")
related_map = {}
for mem_id, content in unique_memories.items():
related = await self._find_related_memories_semantic(mem_id, content, limit=5)
if related:
related_map[mem_id] = related
for event in all_events:
mem_id = event["memory_id"]
if mem_id in related_map:
event["related_memories"] = related_map[mem_id]
logger.info(f"[TIMELINE] Semantic relations computed in {(time.perf_counter() - step_start)*1000:.2f}ms")
else:
logger.info("[TIMELINE] Skipping semantic relations (not requested)")
patterns = {}
if include_patterns:
step_start = time.perf_counter()
patterns = self.timeline.detect_temporal_patterns(all_events)
logger.info(f"[TIMELINE] Patterns detected in {(time.perf_counter() - step_start)*1000:.2f}ms")
step_start = time.perf_counter()
memory_relationships = self.timeline.build_relationship_graph(all_events)
logger.info(f"[TIMELINE] Relationship graph built in {(time.perf_counter() - step_start)*1000:.2f}ms")
if start_date or end_date:
filtered_events = []
for event in all_events:
event_dt = datetime.fromisoformat(event['timestamp'])
if start_date:
start_dt = datetime.fromisoformat(start_date) if isinstance(start_date, str) else start_date
if event_dt < start_dt:
continue
if end_date:
end_dt = datetime.fromisoformat(end_date) if isinstance(end_date, str) else end_date
if event_dt > end_dt:
continue
filtered_events.append(event)
all_events = filtered_events
total_time = (time.perf_counter() - timeline_start) * 1000
logger.info(f"[TIMELINE] Complete in {total_time:.2f}ms total")
return {
"timeline_type": "comprehensive",
"total_events": len(all_events),
"memories_tracked": len(target_memories),
"temporal_patterns": patterns,
"memory_relationships": memory_relationships,
"events": all_events,
"narrative_arc": self.timeline.generate_narrative_summary(all_events, patterns),
"performance_ms": round(total_time, 2)
}
except Exception as e:
logger.error(f"Timeline query failed: {e}")
logger.error(traceback.format_exc())
self._log_error("get_timeline", e)
return {"error": str(e)}
async def get_family_memory_timeline(
self,
author_filter: Optional[str] = None,
limit: int = 50,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
include_content: bool = True,
category: Optional[str] = None
) -> Dict[str, Any]:
"""
Get chronological timeline of family memory network showing cross-agent collaboration.
Shows when family members (Scout, Alpha, etc.) stored memories, enabling analysis of:
- Collaborative memory formation patterns
- Agent activity bursts
- Knowledge sharing evolution
- Communication gaps
Args:
author_filter: Filter by specific agent (e.g., "Scout", "Alpha")
limit: Maximum memories to retrieve (default 50)
start_date: Filter memories created after this date (ISO format)
end_date: Filter memories created before this date (ISO format)
include_content: Include full memory content (default True)
category: Filter by category
Returns:
Timeline with events, agent activity stats, collaboration patterns
"""
import sqlite3
from pathlib import Path
import time
timeline_start = time.perf_counter()
# Find family DB
family_db_path = self.base_path / "family_share" / "family_shared.db"
if not family_db_path.exists():
return {
"error": "Family database not found",
"family_db_path": str(family_db_path),
"help": "No family memories exist yet. Store memories with memory_type='family' to create family network."
}
try:
conn = sqlite3.connect(family_db_path)
conn.row_factory = sqlite3.Row
# Build query
sql = """
SELECT
memory_id,
author_instance,
content,
category,
importance,
tags,
created_at,
updated_at
FROM family_memories
WHERE 1=1
"""
params = []
if author_filter:
sql += " AND author_instance = ?"
params.append(author_filter)
if category:
sql += " AND category = ?"
params.append(category)
if start_date:
sql += " AND created_at >= ?"
params.append(start_date)
if end_date:
sql += " AND created_at <= ?"
params.append(end_date)
sql += " ORDER BY created_at ASC LIMIT ?"
params.append(limit)
cursor = conn.execute(sql, params)
rows = cursor.fetchall()
# Build timeline events
events = []
author_stats = {}
categories_seen = set()
for row in rows:
author = row["author_instance"]
# Track author stats
if author not in author_stats:
author_stats[author] = {
"total_memories": 0,
"first_contribution": row["created_at"],
"last_contribution": row["created_at"],
"categories": set()
}
author_stats[author]["total_memories"] += 1
author_stats[author]["last_contribution"] = row["created_at"]
author_stats[author]["categories"].add(row["category"])
categories_seen.add(row["category"])
# Build event
event = {
"timestamp": row["created_at"],
"memory_id": row["memory_id"],
"author": author,
"display_prefix": f"[{author}]",
"category": row["category"],
"importance": row["importance"],
"tags": json.loads(row["tags"]) if row["tags"] else [],
"updated_at": row["updated_at"],
}
if include_content:
event["content"] = row["content"]
event["content_preview"] = row["content"][:150] + "..." if len(row["content"]) > 150 else row["content"]
else:
event["content_preview"] = row["content"][:100] + "..." if len(row["content"]) > 100 else row["content"]
events.append(event)
conn.close()
# Convert sets to lists for JSON serialization
for author in author_stats:
author_stats[author]["categories"] = list(author_stats[author]["categories"])
# Detect collaboration patterns
collaboration_patterns = self._analyze_family_collaboration(events)
total_time = (time.perf_counter() - timeline_start) * 1000
return {
"timeline_type": "family_network",
"total_events": len(events),
"date_range": {
"start": events[0]["timestamp"] if events else None,
"end": events[-1]["timestamp"] if events else None
},
"agents": list(author_stats.keys()),
"agent_statistics": author_stats,
"total_agents": len(author_stats),
"categories": list(categories_seen),
"collaboration_patterns": collaboration_patterns,
"events": events,
"performance_ms": round(total_time, 2)
}
except Exception as e:
logger.error(f"Family timeline query failed: {e}")
logger.error(traceback.format_exc())
return {"error": str(e)}
def _analyze_family_collaboration(self, events: List[Dict]) -> Dict[str, Any]:
"""Analyze collaboration patterns in family memory timeline"""
if not events:
return {}
from datetime import datetime, timedelta
# Detect activity bursts (multiple agents active within short time window)
bursts = []
window_hours = 24
for i, event in enumerate(events):
timestamp = datetime.fromisoformat(event["timestamp"])
window_end = timestamp + timedelta(hours=window_hours)
# Count unique agents within window
agents_in_window = set()
memories_in_window = 0
for j in range(i, len(events)):
event_time = datetime.fromisoformat(events[j]["timestamp"])
if event_time <= window_end:
agents_in_window.add(events[j]["author"])
memories_in_window += 1
else:
break
# Burst = 2+ agents active within window with 3+ memories
if len(agents_in_window) >= 2 and memories_in_window >= 3:
bursts.append({
"start": event["timestamp"],
"agents": list(agents_in_window),
"memories": memories_in_window,
"description": f"{len(agents_in_window)} agents collaborated ({memories_in_window} memories)"
})
# Detect gaps (periods of inactivity)
gaps = []
gap_threshold_hours = 48
for i in range(1, len(events)):
prev_time = datetime.fromisoformat(events[i-1]["timestamp"])
curr_time = datetime.fromisoformat(events[i]["timestamp"])
gap_duration = (curr_time - prev_time).total_seconds() / 3600
if gap_duration >= gap_threshold_hours:
gaps.append({
"start": events[i-1]["timestamp"],
"end": events[i]["timestamp"],
"duration_hours": round(gap_duration, 1),
"description": f"{round(gap_duration/24, 1)} days of silence"
})
return {
"activity_bursts": bursts[:5], # Top 5 bursts
"communication_gaps": gaps[:5], # Top 5 gaps
"burst_count": len(bursts),
"gap_count": len(gaps)
}
async def traverse_graph(
self,
start_memory_id: str,
depth: int = 2,
max_nodes: int = 50,
min_importance: float = 0.0,
category_filter: Optional[str] = None
) -> Dict[str, Any]:
"""Traverse memory graph (delegated to GraphOperations)"""
return await self.graph_ops.traverse_graph(
start_memory_id, depth, max_nodes, min_importance, category_filter
)
async def find_clusters(
self,
min_cluster_size: int = 3,
min_importance: float = 0.5,
limit: int = 10
) -> Dict[str, Any]:
"""Find clusters (delegated to GraphOperations)"""
return await self.graph_ops.find_clusters(min_cluster_size, min_importance, limit)
async def get_graph_statistics(
self,
category: Optional[str] = None,
min_importance: float = 0.0
) -> Dict[str, Any]:
"""Get graph statistics (delegated to GraphOperations)"""
return await self.graph_ops.get_graph_statistics(category, min_importance)
def get_statistics(self) -> Dict[str, Any]:
"""Get system statistics with cache memory estimates"""
# Trigger lazy initialization to get accurate availability status
try:
self._ensure_qdrant()
# Trigger embedding generation to sync encoder reference
_ = self.generate_embedding("init")
except Exception:
pass # Ignore errors, availability check below will reflect actual state
stats = {
"version": "1.1.0-autonomous-navigation", # Track code version
"fixes": [
"related_memories_persistence",
"consolidate_memories_dict",
"related_memories_api",
"related_memories_sqlite_reload",
"web_server_related_memories_response",
"fastapi_route_ordering",
"fts_multiword_query_fix" # NEW: FTS now handles multi-word queries with OR
],
"enhancements": [
"cli_related_memories_display",
"web_server_complete_endpoints",
"web_server_update_memory_implemented",
"rich_related_memories_with_previews" # Major: enables autonomous memory graph navigation
],
"backends": {
"sqlite": "available" if self.db_conn else "unavailable",
"qdrant": "available" if self.qdrant_client else "unavailable",
"embeddings": "available" if self.encoder else "unavailable",
},
"cache_size": len(self.memory_cache),
"embedding_cache_size": len(self.embedding_cache),
"recent_errors": len(self.error_log),
}
embedding_memory_mb = round(
(len(self.embedding_cache) * self.config["vector_size"] * 4) / (1024 * 1024), 2
)
stats["embedding_cache_memory_mb"] = embedding_memory_mb
# Add maintenance info
if self.last_maintenance:
stats["last_maintenance"] = self.last_maintenance.isoformat()
hours_since = (datetime.now() - self.last_maintenance).total_seconds() / 3600
stats["hours_since_maintenance"] = round(hours_since, 1)
if self.db_conn:
try:
with self._db_lock:
cursor = self.db_conn.execute("SELECT COUNT(*) FROM memories")
stats["total_memories"] = cursor.fetchone()[0]
cursor = self.db_conn.execute(
"SELECT category, COUNT(*) as count FROM memories GROUP BY category"
)
stats["by_category"] = {row[0]: row[1] for row in cursor}
cursor = self.db_conn.execute("SELECT AVG(importance) FROM memories")
stats["avg_importance"] = cursor.fetchone()[0]
cursor = self.db_conn.execute("SELECT COUNT(*) FROM memory_versions")
stats["total_versions"] = cursor.fetchone()[0]
cursor = self.db_conn.execute("""
SELECT AVG(version_count) FROM memories WHERE version_count > 1
""")
avg_versions = cursor.fetchone()[0]
stats["avg_versions_per_updated_memory"] = avg_versions if avg_versions else 0
# Add database size
stats["database_size_mb"] = round(self.db_path.stat().st_size / (1024 * 1024), 2)
except Exception as e:
logger.error(f"Stats query failed: {e}")
self._log_error("get_stats", e)
return stats
async def _store_family_memory(self, memory: Memory) -> Dict[str, Any]:
"""
Store memory to family_shared.db instead of personal DB
Args:
memory: Memory object with memory_type="family"
Returns:
Success status and backend list
"""
from pathlib import Path
import sqlite3
import json
# Determine family directory path
# Family DB stored in family_share subfolder (for Syncthing)
family_dir = self.base_path / "family_share"
family_db_path = family_dir / "family_shared.db"
# Auto-create family DB if it doesn't exist
if not family_db_path.exists():
logger.info(f"Family database not found, auto-creating at {family_db_path}")
try:
from init_family_db import init_family_database
success = init_family_database(family_db_path, silent=True)
if not success:
error_msg = f"Failed to initialize family database at {family_db_path}"
logger.error(error_msg)
return {
"success": False,
"backends": [],
"error": error_msg,
"similar_memories": []
}
logger.info(f"Family database initialized successfully at {family_db_path}")
except Exception as e:
error_msg = f"Failed to initialize family database: {e}"
logger.error(error_msg)
return {
"success": False,
"backends": [],
"error": error_msg,
"similar_memories": []
}
try:
# Connect to family database
conn = sqlite3.connect(family_db_path)
# Get instance name from agent_name (e.g., "claude_assistant" -> "Scout")
# For now, use agent_name directly - can be configured via env var later
author_instance = os.getenv("BA_INSTANCE_NAME", self.agent_name)
# Calculate content hash for versioning
content_hash = memory.content_hash()
# Check if memory already exists (update case)
cursor = conn.execute(
"SELECT content_hash, version_count FROM family_memories WHERE memory_id = ?",
(memory.id,)
)
existing = cursor.fetchone()
if existing:
# UPDATE case - create version
old_hash, old_version_count = existing
if old_hash == content_hash:
# No content change, skip version creation
conn.close()
logger.info(f"Family memory {memory.id} unchanged (hash match), skipping version")
return {
"success": True,
"backends": ["FamilyDB"],
"similar_memories": [],
"message": "unchanged",
"author_instance": author_instance
}
# Create version from OLD content
cursor = conn.execute(
"SELECT content, category, importance, tags, metadata FROM family_memories WHERE memory_id = ?",
(memory.id,)
)
old_memory = cursor.fetchone()
conn.execute("""
INSERT INTO family_memory_versions
(memory_id, version_number, content, content_hash,
category, importance, tags, metadata, created_at, author_instance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory.id,
old_version_count,
old_memory[0], # old content
old_hash,
old_memory[1], # category
old_memory[2], # importance
old_memory[3], # tags
old_memory[4], # metadata
memory.created_at.isoformat(),
author_instance
))
# Update main table with new content
conn.execute("""
UPDATE family_memories
SET content = ?, content_hash = ?, category = ?, importance = ?,
tags = ?, updated_at = ?, metadata = ?, version_count = ?
WHERE memory_id = ?
""", (
memory.content,
content_hash,
memory.category,
memory.importance,
json.dumps(memory.tags) if memory.tags else '[]',
memory.updated_at.isoformat(),
json.dumps(memory.metadata) if memory.metadata else '{}',
old_version_count + 1,
memory.id
))
logger.info(f"Updated family memory {memory.id} (version {old_version_count + 1})")
else:
# INSERT case - new memory
conn.execute("""
INSERT INTO family_memories
(memory_id, author_instance, author_session_id,
content, content_hash, category, importance, tags,
created_at, updated_at, metadata, version_count,
is_family_memory, original_instance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory.id,
author_instance,
memory.session_id,
memory.content,
content_hash,
memory.category,
memory.importance,
json.dumps(memory.tags) if memory.tags else '[]',
memory.created_at.isoformat(),
memory.updated_at.isoformat(),
json.dumps(memory.metadata) if memory.metadata else '{}',
1, # version_count
1, # is_family_memory
author_instance # original_instance
))
logger.info(f"Created family memory {memory.id}")
conn.commit()
conn.close()
logger.info(f"Stored family memory {memory.id} to {family_db_path}")
return {
"success": True,
"backends": ["FamilyDB"],
"similar_memories": [],
"family_db_path": str(family_db_path),
"author_instance": author_instance
}
except Exception as e:
logger.error(f"Failed to store family memory: {e}")
return {
"success": False,
"backends": [],
"error": str(e),
"similar_memories": []
}
async def shutdown(self):
"""Gracefully shutdown the memory store"""
logger.info("Shutting down MemoryStore...")
if self.db_conn:
try:
with self._db_lock:
self.db_conn.close()
logger.info("SQLite connection closed")
except Exception as e:
logger.error(f"Error closing SQLite: {e}")
logger.info("MemoryStore shutdown complete")
# Global store and MCP server setup
memory_store = None
app = Server("buildautomata-memory")
@app.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available memory tools - delegated to mcp_tools module"""
return get_tool_definitions()
@app.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls - delegated to mcp_tools module with JSON responses"""
return await handle_tool_call(name, arguments, memory_store)
async def main():
"""Main entry point"""
global memory_store
try:
username = os.getenv("BA_USERNAME", "buildautomata_ai_v012")
agent_name = os.getenv("BA_AGENT_NAME", "claude_assistant")
logger.info(f"Initializing MemoryStore for {username}/{agent_name}")
memory_store = MemoryStore(username, agent_name, lazy_load=True)
# Restore stdout for MCP communication
os.dup2(_original_stdout_fd, 1)
sys.stdout = os.fdopen(_original_stdout_fd, "w")
logger.info("Starting MCP server...")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="buildautomata-memory",
server_version="4.1.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except KeyboardInterrupt:
logger.info("Received interrupt signal")
except Exception as e:
logger.error(f"Fatal error: {e}")
logger.error(traceback.format_exc())
sys.exit(1)
finally:
if memory_store:
await memory_store.shutdown()
if __name__ == "__main__":
asyncio.run(main())