Skip to main content
Glama
Arize-ai

@arizeai/phoenix-mcp

Official
by Arize-ai
test_ldap.py73.5 kB
"""Unit tests for LDAP authentication module. Tests security-critical functionality: - Anonymous bind bypass prevention - Empty credential rejection - LDAP injection prevention - Role mapping logic - DN validation """ from typing import Any from unittest.mock import MagicMock, Mock, patch import pytest from ldap3.core.exceptions import LDAPException from pytest import LogCaptureFixture from phoenix.config import LDAPConfig from phoenix.server.ldap import ( LDAP_CLIENT_ID_MARKER, NULL_EMAIL_MARKER_PREFIX, LDAPAuthenticator, LDAPUserInfo, _get_attribute, _get_unique_id, _is_member_of, canonicalize_dn, generate_null_email_marker, is_ldap_user, is_null_email_marker, ) class TestIsLdapUser: """Test is_ldap_user utility function.""" def test_ldap_user_detected(self) -> None: """Test LDAP user marker is correctly detected.""" ldap_client_id = f"{LDAP_CLIENT_ID_MARKER}:some-unique-id" assert is_ldap_user(ldap_client_id) is True def test_oauth_user_not_detected(self) -> None: """Test OAuth users are not detected as LDAP.""" assert is_ldap_user("google-oauth2|123456") is False assert is_ldap_user("auth0|user123") is False def test_none_returns_false(self) -> None: """Test None input returns False.""" assert is_ldap_user(None) is False def test_empty_string_returns_false(self) -> None: """Test empty string returns False.""" assert is_ldap_user("") is False def test_marker_alone_detected(self) -> None: """Test marker without suffix is still detected.""" assert is_ldap_user(LDAP_CLIENT_ID_MARKER) is True class TestLDAPSecurityValidation: """Test security-critical input validation.""" @pytest.fixture def config(self) -> LDAPConfig: """Minimal LDAP configuration for testing.""" return LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) @pytest.fixture def authenticator(self, config: LDAPConfig) -> LDAPAuthenticator: """Create authenticator instance.""" return LDAPAuthenticator(config) async def test_empty_username_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: Empty username must be rejected to prevent anonymous bind bypass.""" result = await authenticator.authenticate("", "password123") assert result is None async def test_whitespace_username_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: Whitespace-only username must be rejected.""" result = await authenticator.authenticate(" ", "password123") assert result is None async def test_empty_password_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: Empty password must be rejected to prevent anonymous bind bypass. Many LDAP servers (including Active Directory) treat a bind with an empty password as an anonymous bind and return success, even with a valid DN. This would allow attackers to authenticate as any user. """ result = await authenticator.authenticate("admin", "") assert result is None async def test_none_password_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: None password must be rejected.""" # Type ignore since we're testing runtime behavior result = await authenticator.authenticate("admin", None) # type: ignore assert result is None async def test_oversized_username_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: Oversized username must be rejected to prevent DoS.""" long_username = "a" * 257 # Exceeds 256 limit result = await authenticator.authenticate(long_username, "password123") assert result is None async def test_oversized_password_rejected(self, authenticator: LDAPAuthenticator) -> None: """SECURITY: Oversized password must be rejected to prevent DoS.""" long_password = "p" * 1025 # Exceeds 1024 limit result = await authenticator.authenticate("admin", long_password) assert result is None async def test_max_length_username_accepted(self, authenticator: LDAPAuthenticator) -> None: """Username at exactly max length should be accepted (not rejected).""" max_username = "a" * 256 # Exactly at limit # Will fail for other reasons (no LDAP server), but shouldn't fail length check with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_conn.entries = [] # User not found with patch.object(authenticator, "_dummy_bind_for_timing"): result = await authenticator.authenticate(max_username, "password123") # Should return None due to user not found, not length validation assert result is None # Verify the connection was attempted (length check passed) mock_establish.assert_called_once() async def test_max_length_password_accepted(self, authenticator: LDAPAuthenticator) -> None: """Password at exactly max length should be accepted (not rejected).""" max_password = "p" * 1024 # Exactly at limit with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_conn.entries = [] with patch.object(authenticator, "_dummy_bind_for_timing"): result = await authenticator.authenticate("admin", max_password) assert result is None mock_establish.assert_called_once() class TestAuthenticationFlow: """Test complete authentication flow scenarios.""" @pytest.fixture def config(self) -> LDAPConfig: """LDAP configuration for authentication tests.""" return LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, {"group_dn": "*", "role": "VIEWER"}, ), ) @pytest.fixture def authenticator(self, config: LDAPConfig) -> LDAPAuthenticator: """Create authenticator instance.""" return LDAPAuthenticator(config) async def test_successful_authentication_returns_user_info( self, authenticator: LDAPAuthenticator ) -> None: """Happy path: successful auth returns complete LDAPUserInfo.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Mock user search result mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" # Mock email attribute mock_email = MagicMock() mock_email.values = ["jdoe@example.com"] mock_entry.mail = mock_email # Mock display name attribute mock_display_name = MagicMock() mock_display_name.values = ["John Doe"] mock_entry.displayName = mock_display_name # Mock memberOf attribute mock_member_of = MagicMock() mock_member_of.values = ["cn=admins,ou=groups,dc=example,dc=com"] mock_entry.memberOf = mock_member_of mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") assert result is not None assert isinstance(result, LDAPUserInfo) assert result.email == "jdoe@example.com" assert result.display_name == "John Doe" assert result.user_dn == "uid=jdoe,ou=users,dc=example,dc=com" assert result.ldap_username == "jdoe" assert result.role == "ADMIN" assert "cn=admins,ou=groups,dc=example,dc=com" in result.groups async def test_ambiguous_search_rejected( self, authenticator: LDAPAuthenticator, caplog: LogCaptureFixture ) -> None: """SECURITY: Multiple matching users must be rejected.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Mock search returning multiple users (ambiguous result) mock_entry1 = MagicMock() mock_entry1.entry_dn = "uid=jdoe,ou=employees,dc=example,dc=com" mock_entry2 = MagicMock() mock_entry2.entry_dn = "uid=jdoe,ou=contractors,dc=example,dc=com" mock_conn.entries = [mock_entry1, mock_entry2] with patch.object(authenticator, "_dummy_bind_for_timing"): result = await authenticator.authenticate("jdoe", "password") assert result is None assert "Ambiguous LDAP search" in caplog.text assert "found 2 matching entries" in caplog.text async def test_missing_email_attribute_rejected( self, authenticator: LDAPAuthenticator, caplog: LogCaptureFixture ) -> None: """User without required email attribute must be rejected.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Mock user entry without email attribute mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" # Simulate missing email attribute mock_email = MagicMock() mock_email.values = [] # Empty = no email mock_entry.mail = mock_email mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") assert result is None assert "missing required email attribute" in caplog.text async def test_missing_unique_id_when_configured_rejected( self, caplog: LogCaptureFixture ) -> None: """User without configured unique_id attribute must be rejected.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", attr_unique_id="objectGUID", # Configured but will be missing group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Mock user entry with email but missing objectGUID mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["jdoe@example.com"] mock_entry.mail = mock_email mock_display_name = MagicMock() mock_display_name.values = ["John Doe"] mock_entry.displayName = mock_display_name mock_member_of = MagicMock() mock_member_of.values = [] mock_entry.memberOf = mock_member_of # objectGUID attribute missing (spec=[] means no attributes) mock_entry.objectGUID = MagicMock(spec=[]) mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") assert result is None assert "missing configured unique_id attribute" in caplog.text assert "objectGUID" in caplog.text async def test_no_matching_role_rejected(self) -> None: """User with no matching group-role mapping must be rejected.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", # Only admins allowed - no wildcard fallback group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["jdoe@example.com"] mock_entry.mail = mock_email mock_display_name = MagicMock() mock_display_name.values = ["John Doe"] mock_entry.displayName = mock_display_name # User is only in developers group, not admins mock_member_of = MagicMock() mock_member_of.values = ["cn=developers,ou=groups,dc=example,dc=com"] mock_entry.memberOf = mock_member_of mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") # User authenticated but has no role mapping → rejected assert result is None async def test_password_verification_failure_rejected( self, authenticator: LDAPAuthenticator ) -> None: """Wrong password must be rejected.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" mock_conn.entries = [mock_entry] # Password verification fails with patch.object(authenticator, "_verify_user_password", return_value=False): result = await authenticator.authenticate("jdoe", "wrongpassword") assert result is None async def test_user_not_found_rejected(self, authenticator: LDAPAuthenticator) -> None: """Non-existent user must be rejected.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # No users found mock_conn.entries = [] with patch.object(authenticator, "_dummy_bind_for_timing") as mock_dummy: result = await authenticator.authenticate("nonexistent", "password") # Verify timing attack mitigation was performed mock_dummy.assert_called_once() assert result is None async def test_successful_authentication_with_unique_id(self) -> None: """Successful auth with unique_id configured returns complete LDAPUserInfo.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", attr_unique_id="entryUUID", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["jdoe@example.com"] mock_entry.mail = mock_email mock_display_name = MagicMock() mock_display_name.values = ["John Doe"] mock_entry.displayName = mock_display_name mock_member_of = MagicMock() mock_member_of.values = [] mock_entry.memberOf = mock_member_of # Mock entryUUID attribute (OpenLDAP style) mock_uuid = MagicMock() mock_uuid.raw_values = [b"550e8400-e29b-41d4-a716-446655440000"] mock_entry.entryUUID = mock_uuid mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") assert result is not None assert result.unique_id == "550e8400-e29b-41d4-a716-446655440000" async def test_display_name_defaults_to_username( self, authenticator: LDAPAuthenticator ) -> None: """Missing display name should default to username.""" with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) mock_entry = MagicMock() mock_entry.entry_dn = "uid=jdoe,ou=users,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["john.doe@example.com"] mock_entry.mail = mock_email # Display name missing mock_display_name = MagicMock() mock_display_name.values = [] mock_entry.displayName = mock_display_name mock_member_of = MagicMock() mock_member_of.values = [] mock_entry.memberOf = mock_member_of mock_conn.entries = [mock_entry] with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("jdoe", "validpassword") assert result is not None assert result.display_name == "jdoe" # Falls back to username class TestTimingAttackMitigation: """Test timing attack prevention for user enumeration.""" @pytest.fixture def config(self) -> LDAPConfig: """LDAP configuration for timing tests.""" return LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) @pytest.fixture def authenticator(self, config: LDAPConfig) -> LDAPAuthenticator: """Create authenticator instance.""" return LDAPAuthenticator(config) def test_dummy_bind_uses_randomized_dn(self, authenticator: LDAPAuthenticator) -> None: """Dummy bind should use randomized DN to prevent caching.""" with patch.object(authenticator, "_verify_user_password") as mock_verify: mock_verify.return_value = False # Call dummy bind twice authenticator._dummy_bind_for_timing(authenticator.servers[0], "password1") authenticator._dummy_bind_for_timing(authenticator.servers[0], "password2") # Both calls should use different DNs (randomized) assert mock_verify.call_count == 2 call1_dn = mock_verify.call_args_list[0][0][1] call2_dn = mock_verify.call_args_list[1][0][1] assert call1_dn != call2_dn assert "dummy-" in call1_dn assert "dummy-" in call2_dn def test_dummy_bind_swallows_exceptions(self, authenticator: LDAPAuthenticator) -> None: """Dummy bind should not raise exceptions (timing only, result ignored).""" with patch.object(authenticator, "_verify_user_password") as mock_verify: mock_verify.side_effect = LDAPException("Connection failed") # Should not raise - exceptions are swallowed authenticator._dummy_bind_for_timing(authenticator.servers[0], "password") mock_verify.assert_called_once() class TestMultipleSearchBases: """Test user search across multiple base DNs.""" async def test_user_found_in_second_search_base(self) -> None: """User in second search base should be found after first base returns empty.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=( "ou=employees,dc=example,dc=com", "ou=contractors,dc=example,dc=com", ), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Track search calls to verify both bases are searched search_call_count = 0 def search_side_effect(**kwargs: Any) -> None: nonlocal search_call_count search_call_count += 1 if "ou=employees" in kwargs.get("search_base", ""): # First base: no results mock_conn.entries = [] else: # Second base: user found mock_entry = MagicMock() mock_entry.entry_dn = "uid=contractor1,ou=contractors,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["contractor1@example.com"] mock_entry.mail = mock_email mock_display_name = MagicMock() mock_display_name.values = ["Contractor One"] mock_entry.displayName = mock_display_name mock_member_of = MagicMock() mock_member_of.values = [] mock_entry.memberOf = mock_member_of mock_conn.entries = [mock_entry] mock_conn.search.side_effect = search_side_effect with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("contractor1", "validpassword") # Both search bases should have been searched assert search_call_count == 2 assert result is not None assert result.email == "contractor1@example.com" assert "ou=contractors" in result.user_dn async def test_user_found_in_first_search_base_stops_search(self) -> None: """User found in first search base should not search remaining bases.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=( "ou=employees,dc=example,dc=com", "ou=contractors,dc=example,dc=com", ), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) search_call_count = 0 def search_side_effect(**kwargs: Any) -> None: nonlocal search_call_count search_call_count += 1 # First base: user found mock_entry = MagicMock() mock_entry.entry_dn = "uid=employee1,ou=employees,dc=example,dc=com" mock_email = MagicMock() mock_email.values = ["employee1@example.com"] mock_entry.mail = mock_email mock_display_name = MagicMock() mock_display_name.values = ["Employee One"] mock_entry.displayName = mock_display_name mock_member_of = MagicMock() mock_member_of.values = [] mock_entry.memberOf = mock_member_of mock_conn.entries = [mock_entry] mock_conn.search.side_effect = search_side_effect with patch.object(authenticator, "_verify_user_password", return_value=True): result = await authenticator.authenticate("employee1", "validpassword") # Only first search base should have been searched assert search_call_count == 1 assert result is not None assert "ou=employees" in result.user_dn class TestRoleMapping: """Test role mapping logic.""" @pytest.fixture def authenticator(self) -> LDAPAuthenticator: """Create authenticator with role mappings.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, {"group_dn": "cn=members,ou=groups,dc=example,dc=com", "role": "MEMBER"}, {"group_dn": "*", "role": "VIEWER"}, # Wildcard default ), ) return LDAPAuthenticator(config) def test_admin_role_mapping(self, authenticator: LDAPAuthenticator) -> None: """Test admin group maps to ADMIN role.""" groups = ["cn=admins,ou=groups,dc=example,dc=com"] role = authenticator.map_groups_to_role(groups) assert role == "ADMIN" def test_member_role_mapping(self, authenticator: LDAPAuthenticator) -> None: """Test member group maps to MEMBER role.""" groups = ["cn=members,ou=groups,dc=example,dc=com"] role = authenticator.map_groups_to_role(groups) assert role == "MEMBER" def test_wildcard_role_mapping(self, authenticator: LDAPAuthenticator) -> None: """Test wildcard matches any user.""" groups = ["cn=unknown,ou=groups,dc=example,dc=com"] role = authenticator.map_groups_to_role(groups) assert role == "VIEWER" def test_first_match_wins(self, authenticator: LDAPAuthenticator) -> None: """Test first matching group determines role (priority order).""" # User in both admin and member groups - admin should win groups = [ "cn=admins,ou=groups,dc=example,dc=com", "cn=members,ou=groups,dc=example,dc=com", ] role = authenticator.map_groups_to_role(groups) assert role == "ADMIN" def test_config_order_determines_role(self) -> None: """Test that config order (not user group order) determines role.""" # Config has MEMBER before ADMIN config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=( {"group_dn": "cn=members,ou=groups,dc=example,dc=com", "role": "MEMBER"}, {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) authenticator = LDAPAuthenticator(config) # User is in both groups - MEMBER wins because it's first in config groups = ["cn=admins,ou=groups,dc=example,dc=com", "cn=members,ou=groups,dc=example,dc=com"] role = authenticator.map_groups_to_role(groups) assert role == "MEMBER" def test_case_insensitive_matching(self, authenticator: LDAPAuthenticator) -> None: """Test DN matching is case-insensitive per RFC 4514.""" groups = ["CN=ADMINS,OU=GROUPS,DC=EXAMPLE,DC=COM"] # Uppercase role = authenticator.map_groups_to_role(groups) assert role == "ADMIN" def test_dn_normalization_matching(self) -> None: """Group matching should handle spacing/order differences via canonicalization.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=( { "group_dn": "cn=Admins+email=admins@example.com,ou=Groups,dc=Example,dc=Com", "role": "ADMIN", }, ), ) authenticator = LDAPAuthenticator(config) # Same group, different ordering/casing/spacing groups = ["EMAIL=admins@example.com+CN=ADMINS, OU=Groups , DC=example , DC=com"] role = authenticator.map_groups_to_role(groups) assert role == "ADMIN" def test_no_groups_no_wildcard(self) -> None: """Test user with no matching groups is denied when no wildcard.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) authenticator = LDAPAuthenticator(config) groups = ["cn=unknown,ou=groups,dc=example,dc=com"] role = authenticator.map_groups_to_role(groups) assert role is None def test_empty_groups_with_wildcard(self, authenticator: LDAPAuthenticator) -> None: """Test user with no groups still matches wildcard.""" groups: list[str] = [] role = authenticator.map_groups_to_role(groups) assert role == "VIEWER" class TestExceptionSanitization: """Test exception messages don't leak sensitive information.""" @pytest.fixture def config(self) -> LDAPConfig: """Create test config.""" return LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) async def test_ldap_exception_not_leaked( self, config: LDAPConfig, caplog: LogCaptureFixture ) -> None: """SECURITY: LDAP exception details must not be logged.""" authenticator = LDAPAuthenticator(config) with patch.object(authenticator, "_establish_connection") as mock_establish: mock_conn = MagicMock() mock_establish.return_value.__enter__ = Mock(return_value=mock_conn) mock_establish.return_value.__exit__ = Mock(return_value=None) # Simulate LDAP exception with sensitive info mock_conn.search.side_effect = LDAPException( "Connection to ldap://internal-ldap.corp.local:389 failed: " "Invalid credentials for cn=admin,dc=corp,dc=local" ) result = await authenticator.authenticate("testuser", "password") assert result is None # Check logs don't contain the detailed exception message log_text = caplog.text assert "Invalid credentials for cn=admin" not in log_text assert "internal-ldap.corp.local" not in log_text # Should only contain error type assert "LDAPException" in log_text class TestAttributeExtraction: """Test safe attribute extraction from LDAP entries.""" def test_missing_attribute_returns_none(self) -> None: """Test missing attribute returns None (not exception).""" # Create minimal entry without the requested attribute # Using type() instead of MagicMock because MagicMock auto-generates attributes entry = type("Entry", (), {})() result = _get_attribute(entry, "nonexistent") assert result is None def test_none_attribute_returns_none(self) -> None: """Test attribute set to None returns None.""" entry = type("Entry", (), {"mail": None})() result = _get_attribute(entry, "mail") assert result is None def test_empty_attribute_returns_none(self) -> None: """Test empty attribute returns None.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.values = [] entry.mail = mock_attr result = _get_attribute(entry, "mail") assert result is None def test_single_value_extraction(self) -> None: """Test extracting single attribute value.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.values = ["user@example.com"] entry.mail = mock_attr result = _get_attribute(entry, "mail") assert result == "user@example.com" def test_multiple_value_extraction(self) -> None: """Test extracting multiple attribute values.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.values = [ "cn=admins,ou=groups,dc=example,dc=com", "cn=members,ou=groups,dc=example,dc=com", ] entry.memberOf = mock_attr result = _get_attribute(entry, "memberOf", multiple=True) assert result == [ "cn=admins,ou=groups,dc=example,dc=com", "cn=members,ou=groups,dc=example,dc=com", ] class TestUniqueIdExtraction: """Test unique identifier extraction from LDAP entries. Tests the _get_unique_id helper function which handles: - Active Directory objectGUID (binary, mixed-endian) - OpenLDAP entryUUID (string) - 389 DS nsUniqueId (string) """ def test_missing_attribute_returns_none(self) -> None: """Test missing unique_id attribute returns None.""" entry = type("Entry", (), {})() result = _get_unique_id(entry, "objectGUID") assert result is None def test_empty_attribute_returns_none(self) -> None: """Test empty raw_values returns None.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.raw_values = [] entry.objectGUID = mock_attr result = _get_unique_id(entry, "objectGUID") assert result is None def test_string_uuid_as_bytes(self) -> None: """Test OpenLDAP entryUUID (string format stored as bytes). OpenLDAP stores entryUUID as a string like "550e8400-e29b-41d4-a716-446655440000". ldap3 returns this as bytes: b"550e8400-e29b-41d4-a716-446655440000" (36 bytes). This should be decoded as UTF-8, not treated as binary. """ entry = MagicMock() mock_attr = MagicMock() # ldap3 always returns bytes - even for string UUIDs mock_attr.raw_values = [b"550e8400-e29b-41d4-a716-446655440000"] entry.entryUUID = mock_attr result = _get_unique_id(entry, "entryUUID") assert result == "550e8400-e29b-41d4-a716-446655440000" def test_uppercase_uuid_normalized_to_lowercase(self) -> None: """Test uppercase entryUUID is normalized to lowercase. UUIDs are case-insensitive per RFC 4122. We normalize to lowercase to ensure consistent database lookups. Existing DB entries with different casing will be updated via email fallback on next login. """ entry = MagicMock() mock_attr = MagicMock() mock_attr.raw_values = [b"550E8400-E29B-41D4-A716-446655440000"] entry.entryUUID = mock_attr result = _get_unique_id(entry, "entryUUID") assert result == "550e8400-e29b-41d4-a716-446655440000" def test_whitespace_stripped_from_uuid(self) -> None: """Test whitespace is stripped from string UUIDs.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.raw_values = [b" 550e8400-e29b-41d4-a716-446655440000 "] entry.entryUUID = mock_attr result = _get_unique_id(entry, "entryUUID") assert result == "550e8400-e29b-41d4-a716-446655440000" def test_empty_bytes_returns_none(self) -> None: """Test empty bytes returns None, not empty string.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.raw_values = [b""] entry.entryUUID = mock_attr result = _get_unique_id(entry, "entryUUID") assert result is None def test_whitespace_only_returns_none(self) -> None: """Test whitespace-only value returns None after stripping.""" entry = MagicMock() mock_attr = MagicMock() mock_attr.raw_values = [b" "] entry.entryUUID = mock_attr result = _get_unique_id(entry, "entryUUID") assert result is None def test_binary_objectguid_conversion(self) -> None: """Test AD objectGUID binary to UUID string conversion (MS-DTYP §2.3.4). Active Directory stores objectGUID in mixed-endian format: - Data1 (4 bytes): little-endian - Data2 (2 bytes): little-endian - Data3 (2 bytes): little-endian - Data4 (8 bytes): big-endian Python's uuid.UUID(bytes_le=...) handles this correctly. """ entry = MagicMock() mock_attr = MagicMock() # Known test case from Microsoft documentation # UUID: 2212e4c7-051e-4d0c-9a5b-12770a9bb7ab # Binary (little-endian for first 3 components): binary_guid = bytes( [ 0xC7, 0xE4, 0x12, 0x22, # Data1: 2212e4c7 reversed 0x1E, 0x05, # Data2: 051e reversed 0x0C, 0x4D, # Data3: 4d0c reversed 0x9A, 0x5B, 0x12, 0x77, 0x0A, 0x9B, 0xB7, 0xAB, # Data4: as-is ] ) mock_attr.raw_values = [binary_guid] entry.objectGUID = mock_attr result = _get_unique_id(entry, "objectGUID") assert result == "2212e4c7-051e-4d0c-9a5b-12770a9bb7ab" def test_bytearray_objectguid_conversion(self) -> None: """Test AD objectGUID as bytearray for defensive coding. Note: ldap3 always returns bytes (see ldap3/operation/search.py decode_raw_vals), but we test bytearray to ensure robustness if this ever changes. """ entry = MagicMock() mock_attr = MagicMock() # Same GUID as above, but as bytearray binary_guid = bytearray( [ 0xC7, 0xE4, 0x12, 0x22, # Data1 0x1E, 0x05, # Data2 0x0C, 0x4D, # Data3 0x9A, 0x5B, 0x12, 0x77, 0x0A, 0x9B, 0xB7, 0xAB, # Data4 ] ) mock_attr.raw_values = [binary_guid] entry.objectGUID = mock_attr result = _get_unique_id(entry, "objectGUID") assert result == "2212e4c7-051e-4d0c-9a5b-12770a9bb7ab" def test_binary_non_utf8_hex_encoded(self) -> None: """Test non-UTF-8 binary format falls back to hex encoding. If the value is not 16 bytes (binary UUID) and not valid UTF-8 (string UUID), it's hex-encoded as a safe fallback. """ entry = MagicMock() mock_attr = MagicMock() # Invalid UTF-8 byte sequence (0x80-0xBF are continuation bytes, invalid as start) mock_attr.raw_values = [b"\x80\x81\x82\x83\x84"] entry.customId = mock_attr result = _get_unique_id(entry, "customId") assert result == "8081828384" def test_attribute_without_raw_values(self) -> None: """Test attribute object without raw_values property returns None.""" entry = MagicMock() mock_attr = MagicMock(spec=[]) # No raw_values attribute entry.objectGUID = mock_attr result = _get_unique_id(entry, "objectGUID") assert result is None class TestGroupMembershipCheck: """Test _is_member_of helper function.""" def test_wildcard_matches_all(self) -> None: """Test wildcard '*' matches any user.""" user_groups: set[str] = set() assert _is_member_of(user_groups, "*") is True user_groups = {"cn=admins,ou=groups,dc=example,dc=com"} assert _is_member_of(user_groups, "*") is True def test_exact_match(self) -> None: """Test exact DN match.""" canonical = canonicalize_dn("cn=admins,ou=groups,dc=example,dc=com") assert canonical is not None user_groups: set[str] = {canonical} assert _is_member_of(user_groups, "cn=admins,ou=groups,dc=example,dc=com") is True def test_case_insensitive_match(self) -> None: """Test DN matching is case-insensitive per RFC 4514.""" canonical = canonicalize_dn("cn=admins,ou=groups,dc=example,dc=com") assert canonical is not None user_groups: set[str] = {canonical} # Different casing in target assert _is_member_of(user_groups, "CN=ADMINS,OU=GROUPS,DC=EXAMPLE,DC=COM") is True def test_no_match(self) -> None: """Test non-matching group returns False.""" canonical = canonicalize_dn("cn=members,ou=groups,dc=example,dc=com") assert canonical is not None user_groups: set[str] = {canonical} assert _is_member_of(user_groups, "cn=admins,ou=groups,dc=example,dc=com") is False def test_empty_user_groups_no_wildcard(self) -> None: """Test empty user groups with non-wildcard target returns False.""" user_groups: set[str] = set() assert _is_member_of(user_groups, "cn=admins,ou=groups,dc=example,dc=com") is False def test_whitespace_normalization(self) -> None: """Test DN whitespace is normalized for matching.""" canonical = canonicalize_dn("cn=admins,ou=groups,dc=example,dc=com") assert canonical is not None user_groups: set[str] = {canonical} # Target with extra whitespace assert ( _is_member_of(user_groups, "cn = admins , ou = groups , dc = example , dc = com") is True ) class TestGetUserGroups: """Test _get_user_groups method for AD and POSIX modes.""" def test_ad_mode_returns_member_of_attribute(self) -> None: """AD mode: returns groups from memberOf attribute.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_member_of="memberOf", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), # No group_search_filter = AD mode ) authenticator = LDAPAuthenticator(config) # Mock user entry with memberOf attribute user_entry = MagicMock() user_entry.memberOf.values = [ "cn=admins,ou=groups,dc=example,dc=com", "cn=developers,ou=groups,dc=example,dc=com", ] conn = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == [ "cn=admins,ou=groups,dc=example,dc=com", "cn=developers,ou=groups,dc=example,dc=com", ] # AD mode should not search conn.search.assert_not_called() def test_ad_mode_empty_member_of(self) -> None: """AD mode: returns empty list when memberOf is empty.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) user_entry = MagicMock() user_entry.memberOf.values = [] conn = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == [] def test_ad_mode_missing_member_of(self) -> None: """AD mode: returns empty list when memberOf attribute is missing.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) # User entry without memberOf attribute user_entry = MagicMock(spec=[]) # No attributes conn = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == [] def test_posix_mode_searches_for_groups(self) -> None: """POSIX mode: searches group base DNs for membership.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=("ou=groups,dc=example,dc=com",), group_search_filter="(&(objectClass=posixGroup)(memberUid=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) # Mock connection with search results conn = MagicMock() group_entry = MagicMock() group_entry.entry_dn = "cn=developers,ou=groups,dc=example,dc=com" conn.entries = [group_entry] user_entry = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == ["cn=developers,ou=groups,dc=example,dc=com"] conn.search.assert_called_once() def test_posix_mode_no_groups_found(self) -> None: """POSIX mode: returns empty list when no groups found.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=("ou=groups,dc=example,dc=com",), group_search_filter="(&(objectClass=posixGroup)(memberUid=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) conn = MagicMock() conn.entries = [] user_entry = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == [] def test_posix_mode_search_error_logged(self, caplog: LogCaptureFixture) -> None: """POSIX mode: logs warning on search error, returns empty list.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=("ou=groups,dc=example,dc=com",), group_search_filter="(&(objectClass=posixGroup)(memberUid=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) conn = MagicMock() conn.search.side_effect = LDAPException("Connection error") user_entry = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert groups == [] assert "LDAP group search failed" in caplog.text def test_posix_mode_aggregates_from_multiple_bases(self) -> None: """POSIX mode: aggregates groups from multiple search bases.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=( "ou=engineering,dc=example,dc=com", "ou=projects,dc=example,dc=com", ), group_search_filter="(&(objectClass=posixGroup)(memberUid=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) conn = MagicMock() # Simulate different groups returned per search base group1 = MagicMock() group1.entry_dn = "cn=developers,ou=engineering,dc=example,dc=com" group2 = MagicMock() group2.entry_dn = "cn=phoenix,ou=projects,dc=example,dc=com" call_count = 0 def search_side_effect(**kwargs: Any) -> None: nonlocal call_count if call_count == 0: conn.entries = [group1] else: conn.entries = [group2] call_count += 1 conn.search.side_effect = search_side_effect user_entry = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) assert conn.search.call_count == 2 assert len(groups) == 2 assert "cn=developers,ou=engineering,dc=example,dc=com" in groups assert "cn=phoenix,ou=projects,dc=example,dc=com" in groups def test_posix_mode_partial_failure_returns_successful_results( self, caplog: LogCaptureFixture ) -> None: """POSIX mode: returns groups from successful searches even if some bases fail.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=( "ou=unreachable,dc=example,dc=com", "ou=groups,dc=example,dc=com", ), group_search_filter="(&(objectClass=posixGroup)(memberUid=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) conn = MagicMock() group_entry = MagicMock() group_entry.entry_dn = "cn=developers,ou=groups,dc=example,dc=com" call_count = 0 def search_side_effect(**kwargs: Any) -> None: nonlocal call_count if call_count == 0: call_count += 1 raise LDAPException("Connection to unreachable base failed") conn.entries = [group_entry] call_count += 1 conn.search.side_effect = search_side_effect user_entry = MagicMock() groups = authenticator._get_user_groups( conn, user_entry, "uid=test,ou=users,dc=example,dc=com" ) # Should return groups from the successful search assert groups == ["cn=developers,ou=groups,dc=example,dc=com"] assert conn.search.call_count == 2 assert "LDAP group search failed" in caplog.text def test_posix_mode_escapes_user_dn_in_filter(self) -> None: """SECURITY: User DN with special chars is escaped in group search filter.""" config = LDAPConfig( hosts=("ldap.example.com",), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_search_base_dns=("ou=groups,dc=example,dc=com",), group_search_filter="(&(objectClass=posixGroup)(member=%s))", group_role_mappings=({"group_dn": "*", "role": "VIEWER"},), ) authenticator = LDAPAuthenticator(config) conn = MagicMock() conn.entries = [] user_entry = MagicMock() # DN with LDAP filter special characters that could cause injection malicious_dn = "cn=user(admin)*,ou=users,dc=example,dc=com" authenticator._get_user_groups(conn, user_entry, malicious_dn) # Verify search was called with escaped DN conn.search.assert_called_once() call_kwargs = conn.search.call_args[1] search_filter = call_kwargs["search_filter"] # Parentheses and asterisk should be escaped per RFC 4515 assert "\\28" in search_filter # ( escaped assert "\\29" in search_filter # ) escaped assert "\\2a" in search_filter # * escaped # Original unescaped chars should NOT appear in filter context assert "(admin)" not in search_filter class TestTLSConfiguration: """Test TLS mode configuration.""" def test_ldaps_mode_sets_use_ssl(self) -> None: """Test LDAPS mode enables use_ssl on Server.""" config = LDAPConfig( hosts=("ldap.example.com",), port=636, tls_mode="ldaps", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) authenticator = LDAPAuthenticator(config) # Check server configuration server = authenticator.servers[0] assert server.ssl is True # use_ssl=True for LDAPS def test_starttls_mode_no_use_ssl(self) -> None: """Test STARTTLS mode does NOT enable use_ssl.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="starttls", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) authenticator = LDAPAuthenticator(config) # Check server configuration server = authenticator.servers[0] assert server.ssl is False # use_ssl=False for STARTTLS def test_plaintext_mode_no_tls_config(self) -> None: """Test plaintext mode has no TLS configuration.""" config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="none", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) authenticator = LDAPAuthenticator(config) # Check server configuration server = authenticator.servers[0] assert server.ssl is False assert server.tls is None class TestMultiServerFailover: """Test failover behavior with multiple LDAP servers.""" def test_multiple_servers_created(self) -> None: """Test multiple servers are created from comma-separated hosts.""" config = LDAPConfig( hosts=("ldap1.example.com", "ldap2.example.com", "ldap3.example.com"), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) authenticator = LDAPAuthenticator(config) assert len(authenticator.servers) == 3 assert authenticator.servers[0].host == "ldap1.example.com" assert authenticator.servers[1].host == "ldap2.example.com" assert authenticator.servers[2].host == "ldap3.example.com" def test_whitespace_stripped_from_hosts(self) -> None: """Test whitespace is stripped from host entries.""" config = LDAPConfig( hosts=("ldap1.example.com", "ldap2.example.com", "ldap3.example.com"), user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", group_role_mappings=(), ) authenticator = LDAPAuthenticator(config) assert len(authenticator.servers) == 3 assert all(not server.host.startswith(" ") for server in authenticator.servers) assert all(not server.host.endswith(" ") for server in authenticator.servers) class TestDNCanonicalization: """Test RFC 4514-compliant DN canonicalization.""" def test_case_normalization(self) -> None: """DNs are case-insensitive per RFC 4514.""" dn1 = "cn=John,ou=Users,dc=Example,dc=com" dn2 = "CN=john,OU=users,DC=example,DC=com" dn3 = "Cn=JOHN,Ou=USERS,Dc=EXAMPLE,Dc=COM" canonical1 = canonicalize_dn(dn1) canonical2 = canonicalize_dn(dn2) canonical3 = canonicalize_dn(dn3) # All variations should map to same canonical form assert canonical1 == canonical2 == canonical3 # Should be fully lowercase assert canonical1 == "cn=john,ou=users,dc=example,dc=com" def test_whitespace_normalization(self) -> None: """Whitespace around = and , should be stripped.""" dn_compact = "cn=John,ou=Users,dc=Example,dc=com" dn_spaces = "cn = John , ou = Users , dc = Example , dc = com" dn_mixed_spaces = "cn= John ,ou =Users,dc=Example, dc=com" canonical_compact = canonicalize_dn(dn_compact) canonical_spaces = canonicalize_dn(dn_spaces) canonical_mixed = canonicalize_dn(dn_mixed_spaces) # All should normalize to same form assert canonical_compact == canonical_spaces == canonical_mixed assert canonical_compact == "cn=john,ou=users,dc=example,dc=com" def test_multi_valued_rdn_ordering(self) -> None: """Multi-valued RDN components should be sorted alphabetically. RFC 4514 allows multi-valued RDNs (e.g., cn=John+email=john@corp.com). Different LDAP servers may return components in different orders, so we sort them for deterministic comparison. """ # Same RDN, different component ordering dn1 = "cn=John Smith+email=john@corp.com,ou=Users,dc=Example,dc=com" dn2 = "email=john@corp.com+cn=John Smith,ou=Users,dc=Example,dc=com" canonical1 = canonicalize_dn(dn1) canonical2 = canonicalize_dn(dn2) # Should normalize to same form with sorted attributes (cn before email) assert canonical1 == canonical2 assert canonical1 == "cn=john smith+email=john@corp.com,ou=users,dc=example,dc=com" def test_multi_valued_rdn_three_components(self) -> None: """Multi-valued RDN with 3+ components should be sorted.""" dn1 = "cn=John+sn=Smith+uid=jsmith,ou=Users,dc=Example,dc=com" dn2 = "uid=jsmith+cn=John+sn=Smith,ou=Users,dc=Example,dc=com" dn3 = "sn=Smith+uid=jsmith+cn=John,ou=Users,dc=Example,dc=com" canonical1 = canonicalize_dn(dn1) canonical2 = canonicalize_dn(dn2) canonical3 = canonicalize_dn(dn3) # All should normalize to alphabetically sorted form: cn, sn, uid assert canonical1 == canonical2 == canonical3 assert canonical1 == "cn=john+sn=smith+uid=jsmith,ou=users,dc=example,dc=com" def test_escaped_characters_preserved(self) -> None: """Escaped special characters should be preserved.""" # DN with escaped comma in CN dn = "cn=Smith\\, John,ou=Users,dc=Example,dc=com" canonical = canonicalize_dn(dn) # Escaped comma should be preserved (but lowercased) assert canonical == "cn=smith\\, john,ou=users,dc=example,dc=com" def test_multiple_escaped_characters(self) -> None: """Multiple types of escaped characters.""" dn = "cn=user\\+name\\,test,ou=us\\=ers,dc=br\\\\anch,dc=com" canonical = canonicalize_dn(dn) assert canonical is not None # All escapes should be preserved assert "\\+" in canonical assert "\\," in canonical assert "\\=" in canonical assert "\\\\" in canonical def test_empty_dn(self) -> None: """Empty DN (root DSE) should be handled gracefully.""" canonical = canonicalize_dn("") assert canonical == "" def test_single_rdn(self) -> None: """Single RDN without domain components.""" dn = "cn=John" canonical = canonicalize_dn(dn) assert canonical == "cn=john" def test_real_world_active_directory_dn(self) -> None: """Typical Active Directory DN format.""" dn = "CN=John Smith,OU=Engineering,OU=Employees,DC=corp,DC=example,DC=com" canonical = canonicalize_dn(dn) assert canonical == "cn=john smith,ou=engineering,ou=employees,dc=corp,dc=example,dc=com" def test_posix_ldap_dn(self) -> None: """Typical POSIX LDAP DN format with uid.""" dn = "uid=jsmith,ou=people,dc=example,dc=com" canonical = canonicalize_dn(dn) assert canonical == "uid=jsmith,ou=people,dc=example,dc=com" def test_email_in_dn(self) -> None: """DN containing email address.""" dn = "email=john.smith@corp.com,ou=users,dc=example,dc=com" canonical = canonicalize_dn(dn) # Email should be lowercased assert canonical == "email=john.smith@corp.com,ou=users,dc=example,dc=com" def test_special_characters_in_value(self) -> None: """Values with spaces, hyphens, dots should be preserved.""" dn = "cn=John Q. Smith-Johnson,ou=IT Dept,dc=example,dc=com" canonical = canonicalize_dn(dn) assert canonical == "cn=john q. smith-johnson,ou=it dept,dc=example,dc=com" def test_unicode_dn(self) -> None: """DN with non-ASCII characters.""" dn = "cn=José García,ou=Users,dc=example,dc=com" canonical = canonicalize_dn(dn) # Unicode should be lowercased assert canonical == "cn=josé garcía,ou=users,dc=example,dc=com" def test_idempotency(self) -> None: """Canonicalizing a canonical DN should be idempotent.""" dn = "CN=John,OU=Users,DC=Example,DC=com" canonical1 = canonicalize_dn(dn) assert canonical1 is not None canonical2 = canonicalize_dn(canonical1) assert canonical2 is not None canonical3 = canonicalize_dn(canonical2) assert canonical1 == canonical2 == canonical3 def test_invalid_dn_returns_none(self) -> None: """Invalid DN syntax should return None to avoid inconsistent canonicalization.""" invalid_dn = "not_a_valid_dn" canonical = canonicalize_dn(invalid_dn) # Should return None instead of falling back to lowercase # This ensures consistent behavior and forces callers to handle the error assert canonical is None def test_duplicate_prevention_scenario(self) -> None: """Real-world scenario: Same user, different DN formatting from AD.""" # First login: standard format dn1 = "CN=John Smith,OU=Engineering,DC=corp,DC=example,DC=com" # Second login: different casing from replica DC dn2 = "cn=john smith,ou=engineering,dc=corp,dc=example,dc=com" # Third login: extra whitespace dn3 = "CN = John Smith , OU = Engineering , DC = corp , DC = example , DC = com" canonical1 = canonicalize_dn(dn1) canonical2 = canonicalize_dn(dn2) canonical3 = canonicalize_dn(dn3) # All should map to same canonical form → same database row assert canonical1 == canonical2 == canonical3 class TestSocketLeakPrevention: """Test that LDAP connections are properly closed to prevent file descriptor leaks.""" @pytest.fixture def config(self) -> LDAPConfig: """Minimal LDAP configuration for testing.""" return LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="starttls", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) @pytest.fixture def authenticator(self, config: LDAPConfig) -> LDAPAuthenticator: """Create authenticator instance.""" return LDAPAuthenticator(config) def test_verify_user_password_closes_socket_on_bind_failure( self, authenticator: LDAPAuthenticator ) -> None: """Socket must be closed even when bind() fails (wrong password).""" mock_server = MagicMock() mock_conn = MagicMock() mock_conn.bound = False # Simulate bind failure mock_conn.open = MagicMock() mock_conn.start_tls = MagicMock() mock_conn.bind = MagicMock() mock_conn.unbind = MagicMock() with patch("phoenix.server.ldap.Connection", return_value=mock_conn): result = authenticator._verify_user_password( mock_server, "uid=test,dc=example,dc=com", "wrong_password" ) # Verify bind was attempted assert not result mock_conn.bind.assert_called_once() # CRITICAL: Socket must be closed to prevent FD leak (via unbind) mock_conn.unbind.assert_called_once() def test_verify_user_password_closes_socket_on_start_tls_failure( self, authenticator: LDAPAuthenticator ) -> None: """Socket must be closed when start_tls() raises.""" mock_server = MagicMock() mock_conn = MagicMock() mock_conn.bound = False mock_conn.open = MagicMock() mock_conn.start_tls = MagicMock(side_effect=LDAPException("TLS handshake failed")) mock_conn.unbind = MagicMock() with patch("phoenix.server.ldap.Connection", return_value=mock_conn): with pytest.raises(LDAPException): authenticator._verify_user_password( mock_server, "uid=test,dc=example,dc=com", "password123" ) # CRITICAL: Socket must be closed even though start_tls() raised (via unbind) mock_conn.unbind.assert_called_once() def test_verify_user_password_closes_socket_on_open_failure( self, authenticator: LDAPAuthenticator ) -> None: """Socket must be closed when open() raises.""" mock_server = MagicMock() mock_conn = MagicMock() mock_conn.bound = False mock_conn.open = MagicMock(side_effect=LDAPException("Connection refused")) mock_conn.unbind = MagicMock() with patch("phoenix.server.ldap.Connection", return_value=mock_conn): with pytest.raises(LDAPException): authenticator._verify_user_password( mock_server, "uid=test,dc=example,dc=com", "password123" ) # CRITICAL: Socket must be closed even though open() raised (via unbind) mock_conn.unbind.assert_called_once() def test_establish_connection_closes_socket_on_anonymous_bind_failure( self, authenticator: LDAPAuthenticator ) -> None: """Anonymous bind must close socket when start_tls() fails.""" # Remove bind credentials to force anonymous bind path authenticator.config = LDAPConfig( hosts=("ldap.example.com",), port=389, tls_mode="starttls", user_search_base_dns=("ou=users,dc=example,dc=com",), user_search_filter="(uid=%s)", attr_email="mail", attr_display_name="displayName", attr_member_of="memberOf", group_role_mappings=( {"group_dn": "cn=admins,ou=groups,dc=example,dc=com", "role": "ADMIN"}, ), ) mock_server = MagicMock() mock_conn = MagicMock() mock_conn.open = MagicMock() mock_conn.start_tls = MagicMock(side_effect=LDAPException("TLS handshake failed")) mock_conn.unbind = MagicMock() with patch("phoenix.server.ldap.Connection", return_value=mock_conn): with pytest.raises(LDAPException): authenticator._establish_connection(mock_server) # CRITICAL: Socket must be closed when start_tls() fails (via unbind) mock_conn.unbind.assert_called_once() class TestNullEmailMarker: """Test null email marker helpers for LDAP users without email attributes.""" def test_marker_prefix_format(self) -> None: """NULL_EMAIL_MARKER_PREFIX uses PUA character U+E000.""" assert NULL_EMAIL_MARKER_PREFIX == "\ue000NULL(stopgap)" def test_generate_marker_format_and_determinism(self) -> None: """Generated marker: prefix + 32-char MD5 hash, deterministic, case-insensitive.""" # Different casings of same UUID should produce identical marker ids = ["550e8400-e29b-41d4-a716-446655440000", "550E8400-E29B-41D4-A716-446655440000"] results = [generate_null_email_marker(uid) for uid in ids] assert results[0] == results[1] # Case-insensitive assert results[0].startswith(NULL_EMAIL_MARKER_PREFIX) assert len(results[0]) == len(NULL_EMAIL_MARKER_PREFIX) + 32 # prefix + MD5 # Different IDs produce different markers other = generate_null_email_marker("6ba7b810-9dad-11d1-80b4-00c04fd430c8") assert other != results[0] @pytest.mark.parametrize("invalid_id", ["", None]) def test_generate_marker_rejects_empty(self, invalid_id: str | None) -> None: """Empty or None unique_id raises ValueError.""" with pytest.raises(ValueError, match="unique_id is required"): generate_null_email_marker(invalid_id) # type: ignore[arg-type] @pytest.mark.parametrize( "email,expected", [ # Markers → True (NULL_EMAIL_MARKER_PREFIX, True), (NULL_EMAIL_MARKER_PREFIX + "abc123", True), # Real emails → False ("user@example.com", False), ("admin@corp.local", False), # Edge cases → False ("", False), # Similar but not markers → False ("NULL123456789abcdef", False), # Missing PUA ("\ue001NULL123456", False), # Wrong PUA ("prefix\ue000NULL(stopgap)", False), # Not at start ], ) def test_is_null_email_marker(self, email: str, expected: bool) -> None: """is_null_email_marker correctly identifies markers vs real emails.""" assert is_null_email_marker(email) is expected

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server