"""Tests for daemon memory management features.
Tests the memory management components added to prevent memory leaks:
- MemoryCache LRU eviction (MAX_CACHE_ENTRIES limit)
- StoreQueue cleanup_embedded method
"""
from __future__ import annotations
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from unittest.mock import patch
# Add hooks directory to path for imports
_hooks_dir = Path(__file__).parent.parent.parent / "hooks"
if str(_hooks_dir) not in sys.path:
sys.path.insert(0, str(_hooks_dir))
class TestMemoryCacheLRUEviction:
"""Test MemoryCache LRU eviction when at capacity."""
def test_cache_evicts_oldest_when_at_capacity(self) -> None:
"""Cache should evict oldest entry when adding beyond MAX_CACHE_ENTRIES."""
# Import here to ensure hooks dir is in path
# We need to patch MAX_CACHE_ENTRIES before importing
with patch.dict("sys.modules", {}): # Clear module cache
# Manually create a minimal MemoryCache for testing
from dataclasses import dataclass
from datetime import datetime
@dataclass(slots=True)
class CacheEntry:
memories: list[dict[str, Any]]
fetched_at: datetime
query: str = ""
MAX_TEST_ENTRIES = 3
class TestableMemoryCache:
def __init__(self) -> None:
self._cache: dict[str, CacheEntry] = {}
def set(self, namespace: str, memories: list[dict[str, Any]], query: str = "") -> None:
if namespace not in self._cache and len(self._cache) >= MAX_TEST_ENTRIES:
oldest_ns = min(self._cache, key=lambda k: self._cache[k].fetched_at)
del self._cache[oldest_ns]
self._cache[namespace] = CacheEntry(
memories=memories,
fetched_at=datetime.now(),
query=query,
)
def get(self, namespace: str) -> list[dict[str, Any]] | None:
entry = self._cache.get(namespace)
return entry.memories if entry else None
def size(self) -> int:
return len(self._cache)
cache = TestableMemoryCache()
# Add entries up to capacity
for i in range(MAX_TEST_ENTRIES):
cache.set(f"ns{i}", [{"content": f"memory {i}"}])
time.sleep(0.01) # Ensure different timestamps
assert cache.size() == MAX_TEST_ENTRIES
assert cache.get("ns0") is not None # Oldest entry exists
# Add one more - should evict oldest (ns0)
cache.set("ns_new", [{"content": "new memory"}])
assert cache.size() == MAX_TEST_ENTRIES # Still at capacity
assert cache.get("ns0") is None # Oldest was evicted
assert cache.get("ns_new") is not None # New one exists
assert cache.get("ns1") is not None # Others still exist
assert cache.get("ns2") is not None
def test_cache_does_not_evict_on_update(self) -> None:
"""Updating an existing namespace should not trigger eviction."""
from dataclasses import dataclass
from datetime import datetime
@dataclass(slots=True)
class CacheEntry:
memories: list[dict[str, Any]]
fetched_at: datetime
query: str = ""
MAX_TEST_ENTRIES = 3
class TestableMemoryCache:
def __init__(self) -> None:
self._cache: dict[str, CacheEntry] = {}
def set(self, namespace: str, memories: list[dict[str, Any]], query: str = "") -> None:
if namespace not in self._cache and len(self._cache) >= MAX_TEST_ENTRIES:
oldest_ns = min(self._cache, key=lambda k: self._cache[k].fetched_at)
del self._cache[oldest_ns]
self._cache[namespace] = CacheEntry(
memories=memories,
fetched_at=datetime.now(),
query=query,
)
def get(self, namespace: str) -> list[dict[str, Any]] | None:
entry = self._cache.get(namespace)
return entry.memories if entry else None
def size(self) -> int:
return len(self._cache)
cache = TestableMemoryCache()
# Fill cache
for i in range(MAX_TEST_ENTRIES):
cache.set(f"ns{i}", [{"content": f"memory {i}"}])
time.sleep(0.01)
# Update existing entry
cache.set("ns1", [{"content": "updated memory"}])
assert cache.size() == MAX_TEST_ENTRIES
# All original namespaces should still exist
assert cache.get("ns0") is not None
assert cache.get("ns1") is not None
assert cache.get("ns2") is not None
# Updated content should be reflected
assert cache.get("ns1") == [{"content": "updated memory"}]
class TestStoreQueueCleanup:
"""Test StoreQueue cleanup_embedded method."""
def test_cleanup_removes_old_embedded_entries(self) -> None:
"""cleanup_embedded should remove entries older than specified hours."""
from recall_queue import QueuedStore, StoreQueue
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_queue.db"
queue = StoreQueue(db_path=db_path)
# Enqueue and mark as embedded
entry = QueuedStore(
content="Test memory",
namespace="test",
memory_type="test",
importance=0.5,
)
queue_id = queue.enqueue(entry)
queue.mark_embedded([queue_id])
# Cleanup with 0 hours should remove it immediately
deleted = queue.cleanup_embedded(older_than_hours=0)
assert deleted == 1
assert queue.stats()["completed"] == 0
queue.close()
def test_cleanup_preserves_recent_entries(self) -> None:
"""cleanup_embedded should not remove recent entries."""
from recall_queue import QueuedStore, StoreQueue
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_queue.db"
queue = StoreQueue(db_path=db_path)
# Enqueue and mark as embedded
entry = QueuedStore(
content="Test memory",
namespace="test",
memory_type="test",
importance=0.5,
)
queue_id = queue.enqueue(entry)
queue.mark_embedded([queue_id])
# Cleanup with 24 hours should preserve it
deleted = queue.cleanup_embedded(older_than_hours=24)
assert deleted == 0
assert queue.stats()["completed"] == 1
queue.close()
def test_cleanup_preserves_pending_entries(self) -> None:
"""cleanup_embedded should not remove pending (non-embedded) entries."""
from recall_queue import QueuedStore, StoreQueue
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_queue.db"
queue = StoreQueue(db_path=db_path)
# Enqueue but don't mark as embedded
entry = QueuedStore(
content="Pending memory",
namespace="test",
memory_type="test",
importance=0.5,
)
queue.enqueue(entry)
# Cleanup should not touch pending entries
deleted = queue.cleanup_embedded(older_than_hours=0)
assert deleted == 0
assert queue.stats()["pending"] == 1
queue.close()