Skip to main content
Glama
test_timestamp_preservation.py12.7 kB
""" Comprehensive tests for timestamp preservation during sync operations. This test suite verifies that the fix for the timestamp regression bug (where created_at was being reset during metadata sync) works correctly. """ import pytest import pytest_asyncio import time import tempfile import os from datetime import datetime from pathlib import Path from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage from mcp_memory_service.models import Memory @pytest_asyncio.fixture async def storage(): """Create a temporary SQLite storage for testing.""" # Create temporary database file temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test.db") storage = SqliteVecMemoryStorage( db_path=db_path, embedding_model="all-MiniLM-L6-v2" ) await storage.initialize() yield storage # Cleanup storage.close() # Not async try: os.remove(db_path) os.rmdir(temp_dir) except: pass @pytest.fixture def old_memory(): """Create a memory with an old timestamp (24 hours ago).""" old_time = time.time() - 86400 # 24 hours ago old_iso = datetime.utcfromtimestamp(old_time).isoformat() + "Z" return Memory( content="This is a test memory from yesterday", content_hash="test_hash_old_memory", tags=["test", "old"], memory_type="note", metadata={"original": True}, created_at=old_time, created_at_iso=old_iso, updated_at=old_time, updated_at_iso=old_iso ) @pytest.mark.asyncio class TestTimestampPreservation: """Test suite for timestamp preservation during metadata updates.""" async def test_preserve_timestamps_true_only_updates_updated_at(self, storage, old_memory): """ Test that preserve_timestamps=True only updates updated_at, leaving created_at unchanged. """ # Store memory with old timestamp await storage.store(old_memory) # Update metadata with preserve_timestamps=True (default) updates = { "tags": ["test", "updated"], "metadata": {"updated": True} } success, _ = await storage.update_memory_metadata( old_memory.content_hash, updates, preserve_timestamps=True ) assert success # Retrieve and verify timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso, updated_at, updated_at_iso FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, created_at_iso, updated_at, updated_at_iso = row # created_at should be preserved (within 1 second tolerance) assert abs(created_at - old_memory.created_at) < 1.0, \ f"created_at changed! Expected {old_memory.created_at}, got {created_at}" assert created_at_iso == old_memory.created_at_iso # updated_at should be recent (within last 5 seconds) now = time.time() assert abs(updated_at - now) < 5.0, \ f"updated_at not updated! Expected ~{now}, got {updated_at}" async def test_preserve_timestamps_false_without_source_preserves_created_at(self, storage, old_memory): """ Test that preserve_timestamps=False without providing timestamps still preserves created_at (regression test for the bug). """ # Store memory with old timestamp await storage.store(old_memory) # Update metadata with preserve_timestamps=False but NO timestamps in updates # This simulates the BUGGY behavior that was resetting created_at updates = { "tags": ["test", "synced"], "metadata": {"synced": True} } success, _ = await storage.update_memory_metadata( old_memory.content_hash, updates, preserve_timestamps=False ) assert success # Retrieve and verify timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso, updated_at, updated_at_iso FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, created_at_iso, updated_at, updated_at_iso = row # created_at should STILL be preserved (this is the fix!) assert abs(created_at - old_memory.created_at) < 1.0, \ f"BUG: created_at was reset! Expected {old_memory.created_at}, got {created_at}" assert created_at_iso == old_memory.created_at_iso, \ f"BUG: created_at_iso was reset! Expected {old_memory.created_at_iso}, got {created_at_iso}" async def test_preserve_timestamps_false_with_source_uses_source_timestamps(self, storage, old_memory): """ Test that preserve_timestamps=False WITH source timestamps uses the provided timestamps (for drift detection sync). """ # Store memory with old timestamp await storage.store(old_memory) # Simulate a sync from Cloudflare with newer metadata but original created_at cloudflare_updated_at = time.time() - 3600 # 1 hour ago (newer than local) cloudflare_updated_iso = datetime.utcfromtimestamp(cloudflare_updated_at).isoformat() + "Z" updates = { "tags": ["test", "synced-from-cloudflare"], "metadata": {"source": "cloudflare"}, "created_at": old_memory.created_at, # Original creation time "created_at_iso": old_memory.created_at_iso, "updated_at": cloudflare_updated_at, # Newer update time "updated_at_iso": cloudflare_updated_iso } success, _ = await storage.update_memory_metadata( old_memory.content_hash, updates, preserve_timestamps=False ) assert success # Retrieve and verify timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso, updated_at, updated_at_iso FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, created_at_iso, updated_at, updated_at_iso = row # created_at should match the source (Cloudflare) assert abs(created_at - old_memory.created_at) < 1.0, \ f"created_at not preserved from source! Expected {old_memory.created_at}, got {created_at}" assert created_at_iso == old_memory.created_at_iso # updated_at should match the source (Cloudflare) assert abs(updated_at - cloudflare_updated_at) < 1.0, \ f"updated_at not from source! Expected {cloudflare_updated_at}, got {updated_at}" assert updated_at_iso == cloudflare_updated_iso async def test_drift_detection_scenario(self, storage, old_memory): """ Test the complete drift detection scenario: 1. Memory exists locally with old metadata 2. Cloudflare has newer metadata 3. Drift detection syncs metadata while preserving created_at """ # Store memory with old timestamp await storage.store(old_memory) # Simulate Cloudflare memory with newer metadata cf_updated_at = time.time() - 1800 # 30 minutes ago cf_updated_iso = datetime.utcfromtimestamp(cf_updated_at).isoformat() + "Z" # This is what hybrid storage does during drift detection cf_updates = { 'tags': ["test", "cloudflare-updated"], 'memory_type': "reference", 'metadata': {"updated_via": "cloudflare"}, 'created_at': old_memory.created_at, # Preserve original 'created_at_iso': old_memory.created_at_iso, 'updated_at': cf_updated_at, # Use Cloudflare's update time 'updated_at_iso': cf_updated_iso, } success, _ = await storage.update_memory_metadata( old_memory.content_hash, cf_updates, preserve_timestamps=False # Use Cloudflare timestamps ) assert success # Verify the sync preserved created_at but updated metadata cursor = storage.conn.execute(''' SELECT created_at, updated_at, tags, memory_type, metadata FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, updated_at, tags, memory_type, metadata_str = row # Timestamps assert abs(created_at - old_memory.created_at) < 1.0, \ "Drift detection reset created_at!" assert abs(updated_at - cf_updated_at) < 1.0, \ "Drift detection didn't use Cloudflare updated_at!" # Metadata assert tags == "test,cloudflare-updated" assert memory_type == "reference" async def test_multiple_syncs_preserve_original_created_at(self, storage, old_memory): """ Test that multiple sync operations (as would happen over time) never reset the original created_at timestamp. """ # Store memory with old timestamp await storage.store(old_memory) original_created_at = old_memory.created_at # Simulate 3 sync operations over time for i in range(3): sync_time = time.time() - (3600 * (3 - i)) # 3h, 2h, 1h ago sync_iso = datetime.utcfromtimestamp(sync_time).isoformat() + "Z" updates = { 'tags': ["test", f"sync-{i+1}"], 'metadata': {"sync_count": i + 1}, 'created_at': original_created_at, # Always the original 'created_at_iso': old_memory.created_at_iso, 'updated_at': sync_time, 'updated_at_iso': sync_iso, } success, _ = await storage.update_memory_metadata( old_memory.content_hash, updates, preserve_timestamps=False ) assert success # Verify created_at never changed cursor = storage.conn.execute(''' SELECT created_at, created_at_iso FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, created_at_iso = row assert abs(created_at - original_created_at) < 1.0, \ f"After {3} syncs, created_at changed! Expected {original_created_at}, got {created_at}" assert created_at_iso == old_memory.created_at_iso async def test_new_memory_store_sets_timestamps_correctly(self, storage): """ Test that storing a new memory without explicit timestamps sets them correctly (current time). """ now_before = time.time() memory = Memory( content="New memory without explicit timestamps", content_hash="test_hash_new" ) await storage.store(memory) now_after = time.time() # Retrieve and verify timestamps cursor = storage.conn.execute(''' SELECT created_at, updated_at FROM memories WHERE content_hash = ? ''', (memory.content_hash,)) row = cursor.fetchone() created_at, updated_at = row # Both should be recent (between before and after) assert now_before <= created_at <= now_after, \ f"created_at not set to current time: {created_at}" assert now_before <= updated_at <= now_after, \ f"updated_at not set to current time: {updated_at}" async def test_store_memory_with_explicit_timestamps_preserves_them(self, storage, old_memory): """ Test that storing a memory WITH explicit timestamps (e.g., synced from Cloudflare) preserves those timestamps. """ await storage.store(old_memory) # Retrieve and verify timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso, updated_at, updated_at_iso FROM memories WHERE content_hash = ? ''', (old_memory.content_hash,)) row = cursor.fetchone() created_at, created_at_iso, updated_at, updated_at_iso = row # All timestamps should match what was provided assert abs(created_at - old_memory.created_at) < 1.0 assert created_at_iso == old_memory.created_at_iso assert abs(updated_at - old_memory.updated_at) < 1.0 assert updated_at_iso == old_memory.updated_at_iso if __name__ == "__main__": pytest.main([__file__, "-v"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server