Skip to main content
Glama
test_cursor_storage.py11.4 kB
"""Unit tests for cursor_storage module. Tests in-memory cursor storage with TTL, cleanup, and thread safety. """ import asyncio import time from src.services.cursor_storage import CursorStorage, get_cursor_storage class TestCursorStorage: """Test cursor storage functionality.""" async def test_store_and_retrieve_cursor(self) -> None: """Test storing and retrieving a cursor.""" storage = CursorStorage() cursor_data = { "offset": 50, "timestamp": time.time(), "order_by": "created_desc", } await storage.store("cursor-1", cursor_data) retrieved = await storage.retrieve("cursor-1") assert retrieved == cursor_data async def test_retrieve_nonexistent_cursor(self) -> None: """Test retrieving nonexistent cursor returns None.""" storage = CursorStorage() retrieved = await storage.retrieve("nonexistent") assert retrieved is None async def test_store_overwrites_existing_cursor(self) -> None: """Test that storing with same cursor_id overwrites existing data.""" storage = CursorStorage() await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-1", {"offset": 100}) retrieved = await storage.retrieve("cursor-1") assert retrieved == {"offset": 100} async def test_delete_cursor(self) -> None: """Test deleting a cursor.""" storage = CursorStorage() await storage.store("cursor-1", {"offset": 50}) await storage.delete("cursor-1") retrieved = await storage.retrieve("cursor-1") assert retrieved is None async def test_delete_nonexistent_cursor(self) -> None: """Test deleting nonexistent cursor (should not raise error).""" storage = CursorStorage() await storage.delete("nonexistent") # Should not raise async def test_clear_all_cursors(self) -> None: """Test clearing all cursors.""" storage = CursorStorage() await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-2", {"offset": 100}) await storage.clear_all() assert await storage.retrieve("cursor-1") is None assert await storage.retrieve("cursor-2") is None class TestCursorStorageTTL: """Test cursor TTL and expiration.""" async def test_cursor_expires_after_ttl(self) -> None: """Test that cursor expires after TTL.""" storage = CursorStorage(default_ttl=1) # 1 second TTL await storage.store("cursor-1", {"offset": 50}) # Immediately retrievable retrieved = await storage.retrieve("cursor-1") assert retrieved is not None # Wait for expiration await asyncio.sleep(1.5) # Should be expired now retrieved = await storage.retrieve("cursor-1") assert retrieved is None async def test_cursor_with_custom_ttl(self) -> None: """Test storing cursor with custom TTL.""" storage = CursorStorage(default_ttl=10) # Store with 1-second TTL (overrides default) await storage.store("cursor-1", {"offset": 50}, ttl=1) # Wait for expiration await asyncio.sleep(1.5) # Should be expired retrieved = await storage.retrieve("cursor-1") assert retrieved is None async def test_cursor_not_expired_within_ttl(self) -> None: """Test that cursor is available within TTL.""" storage = CursorStorage(default_ttl=5) await storage.store("cursor-1", {"offset": 50}) # Wait half the TTL await asyncio.sleep(2.5) # Should still be available retrieved = await storage.retrieve("cursor-1") assert retrieved == {"offset": 50} class TestCursorStorageCleanup: """Test automatic cleanup task.""" async def test_cleanup_task_starts_and_stops(self) -> None: """Test that cleanup task can be started and stopped.""" storage = CursorStorage() await storage.start() assert storage._running is True assert storage._cleanup_task is not None await storage.stop() assert storage._running is False async def test_cleanup_task_removes_expired_cursors(self) -> None: """Test that cleanup task removes expired cursors.""" storage = CursorStorage(default_ttl=1, cleanup_interval=0.5) await storage.start() # Store cursors await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-2", {"offset": 100}) # Wait for expiration + cleanup await asyncio.sleep(2) stats = storage.get_stats() assert stats["active_cursors"] == 0 await storage.stop() async def test_cleanup_task_preserves_active_cursors(self) -> None: """Test that cleanup preserves non-expired cursors.""" storage = CursorStorage(default_ttl=10, cleanup_interval=0.5) await storage.start() # Store cursors await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-2", {"offset": 100}) # Wait for cleanup cycle (but not long enough for expiration) await asyncio.sleep(1) stats = storage.get_stats() assert stats["active_cursors"] == 2 await storage.stop() async def test_cleanup_task_can_be_stopped_and_restarted(self) -> None: """Test that cleanup task can be stopped and restarted.""" storage = CursorStorage() # Start await storage.start() assert storage._running is True # Stop await storage.stop() assert storage._running is False # Restart await storage.start() assert storage._running is True await storage.stop() class TestCursorStorageStats: """Test storage statistics.""" async def test_get_stats_empty(self) -> None: """Test stats for empty storage.""" storage = CursorStorage() stats = storage.get_stats() assert stats["total_cursors"] == 0 assert stats["expired_cursors"] == 0 assert stats["active_cursors"] == 0 async def test_get_stats_with_active_cursors(self) -> None: """Test stats with active cursors.""" storage = CursorStorage(default_ttl=10) await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-2", {"offset": 100}) stats = storage.get_stats() assert stats["total_cursors"] == 2 assert stats["expired_cursors"] == 0 assert stats["active_cursors"] == 2 async def test_get_stats_with_expired_cursors(self) -> None: """Test stats with expired cursors.""" storage = CursorStorage(default_ttl=1) await storage.store("cursor-1", {"offset": 50}) await storage.store("cursor-2", {"offset": 100}) # Wait for expiration await asyncio.sleep(1.5) stats = storage.get_stats() assert stats["total_cursors"] == 2 assert stats["expired_cursors"] == 2 assert stats["active_cursors"] == 0 async def test_get_stats_mixed_active_and_expired(self) -> None: """Test stats with mix of active and expired cursors.""" storage = CursorStorage(default_ttl=10) # Store with different TTLs await storage.store("cursor-1", {"offset": 50}, ttl=1) # Will expire await storage.store("cursor-2", {"offset": 100}, ttl=10) # Will not expire # Wait for first to expire await asyncio.sleep(1.5) stats = storage.get_stats() assert stats["total_cursors"] == 2 assert stats["expired_cursors"] == 1 assert stats["active_cursors"] == 1 class TestCursorStorageMultipleCursors: """Test storage with multiple cursors.""" async def test_store_multiple_cursors(self) -> None: """Test storing multiple independent cursors.""" storage = CursorStorage() cursors = {f"cursor-{i}": {"offset": i * 50} for i in range(10)} # Store all cursors for cursor_id, data in cursors.items(): await storage.store(cursor_id, data) # Retrieve and verify all for cursor_id, expected_data in cursors.items(): retrieved = await storage.retrieve(cursor_id) assert retrieved == expected_data async def test_cursors_expire_independently(self) -> None: """Test that cursors with different TTLs expire independently.""" storage = CursorStorage() await storage.store("short-ttl", {"offset": 50}, ttl=1) await storage.store("long-ttl", {"offset": 100}, ttl=10) # Wait for short TTL to expire await asyncio.sleep(1.5) assert await storage.retrieve("short-ttl") is None assert await storage.retrieve("long-ttl") == {"offset": 100} class TestGetCursorStorage: """Test global singleton accessor.""" def test_get_cursor_storage_returns_instance(self) -> None: """Test that get_cursor_storage returns a CursorStorage instance.""" storage = get_cursor_storage() assert isinstance(storage, CursorStorage) def test_get_cursor_storage_returns_same_instance(self) -> None: """Test that get_cursor_storage returns singleton instance.""" storage1 = get_cursor_storage() storage2 = get_cursor_storage() assert storage1 is storage2 class TestCursorStorageEdgeCases: """Test edge cases and error handling.""" async def test_store_with_zero_ttl(self) -> None: """Test storing cursor with zero TTL.""" storage = CursorStorage() await storage.store("cursor-1", {"offset": 50}, ttl=0) # Should be immediately expired retrieved = await storage.retrieve("cursor-1") assert retrieved is None async def test_store_with_negative_ttl(self) -> None: """Test storing cursor with negative TTL.""" storage = CursorStorage() await storage.store("cursor-1", {"offset": 50}, ttl=-1) # Should be immediately expired retrieved = await storage.retrieve("cursor-1") assert retrieved is None async def test_store_large_data(self) -> None: """Test storing large cursor data.""" storage = CursorStorage() large_data = { "offset": 50, "filters": {f"field_{i}": f"value_{i}" for i in range(1000)}, } await storage.store("cursor-1", large_data) retrieved = await storage.retrieve("cursor-1") assert retrieved == large_data async def test_concurrent_operations(self) -> None: """Test concurrent store/retrieve operations.""" storage = CursorStorage() async def store_cursor(i: int) -> None: await storage.store(f"cursor-{i}", {"offset": i * 50}) async def retrieve_cursor(i: int) -> dict | None: return await storage.retrieve(f"cursor-{i}") # Store concurrently await asyncio.gather(*[store_cursor(i) for i in range(100)]) # Retrieve concurrently results = await asyncio.gather(*[retrieve_cursor(i) for i in range(100)]) # All should be retrieved successfully for i, result in enumerate(results): assert result == {"offset": i * 50}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server