"""Integration tests for embedding passthrough flow.
This module verifies the full embedding passthrough flow from queue to hybrid store:
1. Worker computes embedding via Ollama (EmbeddingBatcher)
2. Worker stores embedding in queue via update_embedding()
3. Worker calls memory_store with queue_id
4. HybridStore retrieves pre-computed embedding via get_embedding(queue_id)
5. HybridStore uses pre-computed embedding instead of calling Ollama again
The key assertion is that Ollama embed is called exactly ONCE (by the worker/batcher),
NOT by the hybrid store during _sync_memory_to_chroma.
"""
import sys
import uuid
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from recall.embedding.ollama import OllamaClient
from recall.storage.chromadb import ChromaStore
from recall.storage.hybrid import HybridStore
from recall.storage.sqlite import SQLiteStore
def unique_collection_name() -> str:
"""Generate a unique collection name for test isolation."""
return f"test_passthrough_{uuid.uuid4().hex[:8]}"
@pytest.fixture
def mock_embedding_client():
"""Create mock OllamaClient that tracks calls."""
client = AsyncMock(spec=OllamaClient)
# Return consistent 1024-dim embeddings for mxbai
client.embed.return_value = [0.1] * 1024
return client
@pytest.fixture
def ephemeral_hybrid_store(mock_embedding_client):
"""Create HybridStore with ephemeral stores for testing."""
sqlite = SQLiteStore(ephemeral=True)
chroma = ChromaStore(ephemeral=True, collection_name=unique_collection_name())
store = HybridStore(
sqlite_store=sqlite,
chroma_store=chroma,
embedding_client=mock_embedding_client,
)
yield store
sqlite.close()
class TestEmbeddingPassthrough:
"""Tests for embedding passthrough from queue to hybrid store."""
@pytest.mark.asyncio
async def test_sync_memory_uses_precomputed_embedding_from_queue(
self, ephemeral_hybrid_store
):
"""Test that _sync_memory_to_chroma uses pre-computed embedding from queue.
When queue_id is provided and has a pre-computed embedding, the hybrid store
should use that embedding instead of calling Ollama.
"""
store = ephemeral_hybrid_store
# Create a mock StoreQueue with get_embedding returning a pre-computed embedding
pre_computed_embedding = [0.5] * 1024
mock_queue = MagicMock()
mock_queue.get_embedding.return_value = pre_computed_embedding
# Inject mock queue
store._store_queue = mock_queue
# Reset the mock embed client to ensure clean call tracking
store._embedding_client.embed.reset_mock()
# Call _sync_memory_to_chroma with queue_id
memory_id = "test_mem_001"
content = "Test memory content for passthrough"
namespace = "test:passthrough"
memory_type = "preference"
queue_id = 42
# First add memory to SQLite (required for outbox processing)
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
# Sync to ChromaDB with queue_id
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=queue_id,
)
# Verify sync succeeded
assert result is True
# Verify queue.get_embedding was called with correct queue_id
mock_queue.get_embedding.assert_called_once_with(queue_id)
# CRITICAL: Verify Ollama embed was NOT called (passthrough worked)
store._embedding_client.embed.assert_not_called()
# Verify embedding is in ChromaDB
chroma_result = store._chroma._collection.get(
ids=[memory_id],
include=["embeddings"],
)
assert len(chroma_result["ids"]) == 1
assert chroma_result["embeddings"] is not None
# Verify embedding matches pre-computed embedding
stored_embedding = chroma_result["embeddings"][0]
assert len(stored_embedding) == 1024
# Check first few values match (floating point comparison)
for i in range(10):
assert abs(stored_embedding[i] - pre_computed_embedding[i]) < 1e-6
@pytest.mark.asyncio
async def test_sync_memory_falls_back_to_ollama_when_no_queue_embedding(
self, ephemeral_hybrid_store
):
"""Test that _sync_memory_to_chroma falls back to Ollama when queue has no embedding.
When queue_id is provided but get_embedding returns None, the hybrid store
should fall back to generating embedding via Ollama.
"""
store = ephemeral_hybrid_store
# Create mock queue that returns None (no pre-computed embedding)
mock_queue = MagicMock()
mock_queue.get_embedding.return_value = None
# Inject mock queue
store._store_queue = mock_queue
# Reset the mock embed client to ensure clean call tracking
store._embedding_client.embed.reset_mock()
# Call _sync_memory_to_chroma with queue_id
memory_id = "test_mem_002"
content = "Test memory without pre-computed embedding"
namespace = "test:fallback"
memory_type = "decision"
queue_id = 99
# First add memory to SQLite
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
# Sync to ChromaDB with queue_id
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=queue_id,
)
# Verify sync succeeded
assert result is True
# Verify queue.get_embedding was called
mock_queue.get_embedding.assert_called_once_with(queue_id)
# Verify Ollama embed WAS called as fallback
store._embedding_client.embed.assert_called_once_with(content, is_query=False)
@pytest.mark.asyncio
async def test_sync_memory_uses_ollama_when_no_queue_id(
self, ephemeral_hybrid_store
):
"""Test that _sync_memory_to_chroma uses Ollama when no queue_id is provided.
Without queue_id, the hybrid store should generate embedding via Ollama
directly without trying to access the queue.
"""
store = ephemeral_hybrid_store
# Reset the mock embed client to ensure clean call tracking
store._embedding_client.embed.reset_mock()
# Call _sync_memory_to_chroma WITHOUT queue_id
memory_id = "test_mem_003"
content = "Test memory without queue_id"
namespace = "test:direct"
memory_type = "pattern"
# First add memory to SQLite
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
# Sync to ChromaDB without queue_id (None is default)
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=None, # No queue_id
)
# Verify sync succeeded
assert result is True
# Verify Ollama embed WAS called (no passthrough possible)
store._embedding_client.embed.assert_called_once_with(content, is_query=False)
@pytest.mark.asyncio
async def test_sync_memory_handles_queue_import_failure_gracefully(
self, ephemeral_hybrid_store
):
"""Test that _sync_memory_to_chroma handles StoreQueue import failure gracefully.
If the StoreQueue cannot be imported (e.g., hooks directory not available),
the hybrid store should fall back to Ollama embedding generation.
"""
store = ephemeral_hybrid_store
# Ensure no queue is initialized
store._store_queue = None
# Patch _get_store_queue to simulate import failure
with patch.object(store, "_get_store_queue", return_value=None):
# Reset the mock embed client
store._embedding_client.embed.reset_mock()
memory_id = "test_mem_004"
content = "Test memory with queue import failure"
namespace = "test:import_fail"
memory_type = "session"
queue_id = 123
# First add memory to SQLite
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
# Sync to ChromaDB with queue_id (but queue unavailable)
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=queue_id,
)
# Verify sync succeeded (fallback to Ollama worked)
assert result is True
# Verify Ollama embed WAS called as fallback
store._embedding_client.embed.assert_called_once_with(content, is_query=False)
class TestEmbeddingPassthroughEndToEnd:
"""End-to-end tests for embedding passthrough flow.
These tests simulate the full worker flow:
1. Queue entry is created
2. Worker computes embedding
3. Worker stores embedding in queue
4. memory_store is called with queue_id
5. Embedding is passed through to ChromaDB
"""
@pytest.mark.asyncio
async def test_full_passthrough_flow(self, ephemeral_hybrid_store):
"""Test the complete embedding passthrough flow from queue to ChromaDB.
This test simulates what the embed_worker does:
1. Pre-compute embedding (simulated by pre-defined vector)
2. Store embedding in queue
3. Call memory store operation which calls _sync_memory_to_chroma
4. Verify embedding in ChromaDB matches pre-computed embedding
5. Verify Ollama was NOT called by hybrid store
"""
store = ephemeral_hybrid_store
# Simulate pre-computed embedding from worker/batcher
# In real flow, this comes from EmbeddingBatcher.flush()
worker_computed_embedding = [float(i) / 1024.0 for i in range(1024)]
# Create mock queue that returns the worker-computed embedding
mock_queue = MagicMock()
mock_queue.get_embedding.return_value = worker_computed_embedding
# Inject mock queue
store._store_queue = mock_queue
# Reset embed client call tracking
store._embedding_client.embed.reset_mock()
# Simulate full add_memory call with queue_id passthrough
memory_id = "test_e2e_001"
content = "End-to-end passthrough test memory"
namespace = "test:e2e"
memory_type = "preference"
queue_id = 500
# Add to SQLite (this happens in add_memory)
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
# Sync to ChromaDB (this is the key passthrough step)
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=queue_id,
)
# Assertions
assert result is True
# Verify Ollama was NOT called (key assertion for passthrough)
store._embedding_client.embed.assert_not_called()
# Verify embedding in ChromaDB
chroma_result = store._chroma._collection.get(
ids=[memory_id],
include=["embeddings", "metadatas", "documents"],
)
# Verify document exists
assert len(chroma_result["ids"]) == 1
assert chroma_result["ids"][0] == memory_id
# Verify document content and metadata
assert chroma_result["documents"][0] == content
assert chroma_result["metadatas"][0]["namespace"] == namespace
assert chroma_result["metadatas"][0]["type"] == memory_type
# Verify embedding matches worker-computed embedding exactly
stored_embedding = chroma_result["embeddings"][0]
assert len(stored_embedding) == 1024
for i in range(1024):
assert (
abs(stored_embedding[i] - worker_computed_embedding[i]) < 1e-6
), f"Embedding mismatch at index {i}"
@pytest.mark.asyncio
async def test_passthrough_preserves_embedding_precision(
self, ephemeral_hybrid_store
):
"""Test that embedding passthrough preserves floating point precision.
The embedding should be stored and retrieved with high precision,
as embeddings are sensitive to numerical differences.
"""
store = ephemeral_hybrid_store
# Create embedding with high-precision values
high_precision_embedding = [
0.123456789012345, # Many decimal places
-0.987654321098765,
1e-10, # Very small
1.0 - 1e-10, # Close to 1
0.0,
-0.0,
0.333333333333333,
0.666666666666666,
] + [0.5] * (1024 - 8)
mock_queue = MagicMock()
mock_queue.get_embedding.return_value = high_precision_embedding
store._store_queue = mock_queue
store._embedding_client.embed.reset_mock()
memory_id = "test_precision_001"
content = "Precision test memory"
namespace = "test:precision"
memory_type = "pattern"
queue_id = 777
store._sqlite.add_memory(
content=content,
memory_type=memory_type,
namespace=namespace,
memory_id=memory_id,
)
result = await store._sync_memory_to_chroma(
memory_id=memory_id,
content=content,
namespace=namespace,
memory_type=memory_type,
queue_id=queue_id,
)
assert result is True
store._embedding_client.embed.assert_not_called()
# Retrieve and verify precision
chroma_result = store._chroma._collection.get(
ids=[memory_id],
include=["embeddings"],
)
stored_embedding = chroma_result["embeddings"][0]
# Check high-precision values are preserved (within float32 precision)
# ChromaDB uses float32, so precision is limited to ~7 decimal places
for i in range(8):
assert (
abs(stored_embedding[i] - high_precision_embedding[i]) < 1e-5
), f"Precision loss at index {i}: expected {high_precision_embedding[i]}, got {stored_embedding[i]}"