Skip to main content
Glama
test_cursor_codec.py11.8 kB
"""Unit tests for cursor_codec module. Tests cursor encoding/decoding, signature verification, TTL validation, and error handling for malformed/tampered/expired cursors. """ import time import pytest from src.utils.cursor_codec import ( CursorCodecError, CursorExpiredError, CursorTamperedError, decode_cursor, encode_cursor, ) class TestEncodeCursor: """Test cursor encoding functionality.""" def test_encode_cursor_basic(self) -> None: """Test basic cursor encoding with minimal parameters.""" cursor = encode_cursor(offset=0, secret="test-secret") # Cursor should be non-empty Base64 string assert cursor assert isinstance(cursor, str) # Base64 URL-safe characters assert all( c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_=" for c in cursor ) def test_encode_cursor_with_all_parameters(self) -> None: """Test cursor encoding with all optional parameters.""" timestamp = time.time() # Use current time to avoid expiration cursor = encode_cursor( offset=100, secret="test-secret", timestamp=timestamp, order_by="created_desc", filters={"status": "confirmed"}, ) assert cursor # Decode to verify all fields present decoded = decode_cursor(cursor, secret="test-secret") assert decoded["offset"] == 100 assert decoded["ts"] == timestamp assert decoded["order_by"] == "created_desc" assert decoded["filters"] == {"status": "confirmed"} def test_encode_cursor_deterministic(self) -> None: """Test that encoding with same parameters produces same cursor.""" timestamp = 1697452800.0 cursor1 = encode_cursor(offset=50, secret="test-secret", timestamp=timestamp) cursor2 = encode_cursor(offset=50, secret="test-secret", timestamp=timestamp) assert cursor1 == cursor2 def test_encode_cursor_different_secrets_produce_different_cursors(self) -> None: """Test that different secrets produce different cursors.""" timestamp = 1697452800.0 cursor1 = encode_cursor(offset=50, secret="secret-1", timestamp=timestamp) cursor2 = encode_cursor(offset=50, secret="secret-2", timestamp=timestamp) assert cursor1 != cursor2 def test_encode_cursor_negative_offset_raises_error(self) -> None: """Test that negative offset raises ValueError.""" with pytest.raises(ValueError, match="offset must be >= 0"): encode_cursor(offset=-1, secret="test-secret") def test_encode_cursor_size_approximately_100_bytes(self) -> None: """Test that cursor size is approximately 100 bytes as per spec.""" cursor = encode_cursor( offset=100, secret="test-secret", timestamp=1697452800.0, ) # Cursor should be around 100 bytes (allow some variance) cursor_size = len(cursor.encode("utf-8")) assert 50 <= cursor_size <= 200 class TestDecodeCursor: """Test cursor decoding functionality.""" def test_decode_cursor_basic(self) -> None: """Test basic cursor decoding.""" timestamp = time.time() cursor = encode_cursor(offset=50, secret="test-secret", timestamp=timestamp) decoded = decode_cursor(cursor, secret="test-secret") assert decoded["offset"] == 50 assert decoded["ts"] == timestamp def test_decode_cursor_with_all_fields(self) -> None: """Test decoding cursor with all optional fields.""" timestamp = time.time() cursor = encode_cursor( offset=100, secret="test-secret", timestamp=timestamp, order_by="created_desc", filters={"status": "confirmed", "minPrice": 100}, ) decoded = decode_cursor(cursor, secret="test-secret") assert decoded["offset"] == 100 assert decoded["ts"] == timestamp assert decoded["order_by"] == "created_desc" assert decoded["filters"] == {"status": "confirmed", "minPrice": 100} def test_decode_cursor_wrong_secret_raises_tampered_error(self) -> None: """Test that wrong secret raises CursorTamperedError.""" timestamp = time.time() cursor = encode_cursor(offset=50, secret="correct-secret", timestamp=timestamp) with pytest.raises(CursorTamperedError, match="signature verification failed"): decode_cursor(cursor, secret="wrong-secret") def test_decode_cursor_expired_raises_expired_error(self) -> None: """Test that expired cursor raises CursorExpiredError.""" old_timestamp = time.time() - 700 # 11+ minutes ago (exceeds 10min TTL) cursor = encode_cursor(offset=50, secret="test-secret", timestamp=old_timestamp) with pytest.raises(CursorExpiredError, match="Cursor expired"): decode_cursor(cursor, secret="test-secret", ttl_seconds=600) def test_decode_cursor_custom_ttl(self) -> None: """Test decoding with custom TTL.""" old_timestamp = time.time() - 100 # 100 seconds ago cursor = encode_cursor(offset=50, secret="test-secret", timestamp=old_timestamp) # Should fail with 60-second TTL with pytest.raises(CursorExpiredError): decode_cursor(cursor, secret="test-secret", ttl_seconds=60) # Should succeed with 200-second TTL decoded = decode_cursor(cursor, secret="test-secret", ttl_seconds=200) assert decoded["offset"] == 50 def test_decode_cursor_invalid_base64_raises_error(self) -> None: """Test that invalid Base64 raises CursorCodecError.""" with pytest.raises(CursorCodecError, match="Invalid cursor format"): decode_cursor("not-valid-base64!!!", secret="test-secret") def test_decode_cursor_missing_payload_raises_error(self) -> None: """Test that cursor missing payload raises CursorCodecError.""" import base64 import json # Create cursor without payload field malformed_data = {"sig": "abc123"} malformed_json = json.dumps(malformed_data) malformed_cursor = base64.urlsafe_b64encode(malformed_json.encode("utf-8")).decode("utf-8") with pytest.raises(CursorCodecError, match="missing required field"): decode_cursor(malformed_cursor, secret="test-secret") def test_decode_cursor_missing_timestamp_raises_error(self) -> None: """Test that cursor missing timestamp raises CursorCodecError.""" import base64 import hashlib import hmac import json # Create cursor without timestamp but with valid signature payload = {"offset": 50} # Missing "ts" payload_json = json.dumps(payload, separators=(",", ":"), sort_keys=True) signature = hmac.new( b"test-secret", payload_json.encode("utf-8"), hashlib.sha256, ).hexdigest() malformed_data = { "payload": payload, "sig": signature, } malformed_json = json.dumps(malformed_data) malformed_cursor = base64.urlsafe_b64encode(malformed_json.encode("utf-8")).decode("utf-8") with pytest.raises(CursorCodecError, match="missing timestamp"): decode_cursor(malformed_cursor, secret="test-secret") def test_decode_cursor_invalid_offset_raises_error(self) -> None: """Test that invalid offset raises CursorCodecError.""" import base64 import hashlib import hmac import json # Create cursor with invalid offset but valid signature timestamp = time.time() payload = {"offset": -10, "ts": timestamp} payload_json = json.dumps(payload, separators=(",", ":"), sort_keys=True) signature = hmac.new( b"test-secret", payload_json.encode("utf-8"), hashlib.sha256, ).hexdigest() malformed_data = { "payload": payload, "sig": signature, } malformed_json = json.dumps(malformed_data) malformed_cursor = base64.urlsafe_b64encode(malformed_json.encode("utf-8")).decode("utf-8") with pytest.raises(CursorCodecError, match="Invalid offset"): decode_cursor(malformed_cursor, secret="test-secret") class TestCursorCodecRoundTrip: """Test encode/decode round-trip scenarios.""" def test_round_trip_preserves_data(self) -> None: """Test that encode -> decode preserves all data.""" timestamp = time.time() original_data = { "offset": 123, "order_by": "price_asc", "filters": {"minGuests": 2, "city": "NYC"}, } cursor = encode_cursor( offset=original_data["offset"], secret="test-secret", timestamp=timestamp, order_by=original_data["order_by"], filters=original_data["filters"], ) decoded = decode_cursor(cursor, secret="test-secret") assert decoded["offset"] == original_data["offset"] assert decoded["order_by"] == original_data["order_by"] assert decoded["filters"] == original_data["filters"] assert decoded["ts"] == timestamp def test_round_trip_multiple_cursors(self) -> None: """Test encoding/decoding multiple cursors in sequence.""" timestamp = time.time() cursors = [] # Encode multiple cursors for i in range(5): cursor = encode_cursor( offset=i * 50, secret="test-secret", timestamp=timestamp, ) cursors.append(cursor) # Decode and verify for i, cursor in enumerate(cursors): decoded = decode_cursor(cursor, secret="test-secret") assert decoded["offset"] == i * 50 def test_round_trip_with_special_characters_in_filters(self) -> None: """Test that special characters in filters are preserved.""" timestamp = time.time() cursor = encode_cursor( offset=50, secret="test-secret", timestamp=timestamp, filters={"name": "O'Brien's Place", "description": "A/C & WiFi"}, ) decoded = decode_cursor(cursor, secret="test-secret") assert decoded["filters"]["name"] == "O'Brien's Place" assert decoded["filters"]["description"] == "A/C & WiFi" class TestCursorCodecPerformance: """Test cursor codec performance requirements.""" def test_encode_performance_under_1ms(self) -> None: """Test that encoding completes in under 1ms (spec requirement).""" import timeit def encode_test() -> None: encode_cursor(offset=100, secret="test-secret", timestamp=1697452800.0) # Run 1000 iterations and measure average time execution_time = timeit.timeit(encode_test, number=1000) avg_time_ms = (execution_time / 1000) * 1000 # Should be well under 1ms per operation assert avg_time_ms < 1.0 def test_decode_performance_under_1ms(self) -> None: """Test that decoding completes in under 1ms (spec requirement).""" import timeit cursor = encode_cursor(offset=100, secret="test-secret", timestamp=time.time()) def decode_test() -> None: decode_cursor(cursor, secret="test-secret") # Run 1000 iterations and measure average time execution_time = timeit.timeit(decode_test, number=1000) avg_time_ms = (execution_time / 1000) * 1000 # Should be well under 1ms per operation assert avg_time_ms < 1.0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server