Skip to main content
Glama
Arize-ai

@arizeai/phoenix-mcp

Official
by Arize-ai
test_ldap.py39.2 kB
"""Integration tests for LDAP authentication. These tests use a real in-process mock LDAP server that implements the LDAP protocol, similar to how OIDC tests use _OIDCServer. The mock server runs in a separate thread and listens on a dynamically allocated port. """ from __future__ import annotations from dataclasses import dataclass from secrets import token_hex from typing import Optional from phoenix.server.api.input_types.UserRoleInput import UserRoleInput from tests.integration._mock_ldap_server import _LDAPServer from .._helpers import ( _AppInfo, _delete_users, _httpx_client, _list_users, _User, ) # Test constants _ADMIN_GROUP = "cn=admins,ou=groups,dc=example,dc=com" _MEMBER_GROUP = "cn=members,ou=groups,dc=example,dc=com" _DEFAULT_PASSWORD = "password123" @dataclass class LDAPTestUser: """Test user specification for LDAP tests.""" username: str password: str email: str display_name: str groups: list[str] expected_role: UserRoleInput def _create_test_user( ldap_server: _LDAPServer, suffix: str, base_username: str, role: UserRoleInput, groups: Optional[list[str]] = None, ) -> LDAPTestUser: """Create a test user with unique identifiers for isolation. Args: ldap_server: Mock LDAP server to add user to suffix: Unique suffix (e.g., token_hex(4)) base_username: Base username before suffix role: Expected Phoenix role groups: LDAP groups (defaults to role-appropriate groups) Returns: LDAPTestUser specification """ if groups is None: role_to_groups = { UserRoleInput.ADMIN: [_ADMIN_GROUP], UserRoleInput.MEMBER: [_MEMBER_GROUP], UserRoleInput.VIEWER: [], # Wildcard } groups = role_to_groups.get(role, []) username = f"{base_username}_{suffix}" email = f"{base_username}_{suffix}@example.com" display_name = base_username.replace("_", " ").title() ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name=display_name, groups=groups, ) return LDAPTestUser( username=username, password=_DEFAULT_PASSWORD, email=email, display_name=display_name, groups=groups, expected_role=role, ) def _get_user_by_email(app: _AppInfo, email: str) -> Optional[_User]: """Get user by email from the user list.""" users = {u.profile.email: u for u in _list_users(app, app.admin_secret)} return users.get(email) def _verify_ldap_login_success( status_code: int, access_token: Optional[str], refresh_token: Optional[str], ) -> None: """Verify LDAP login was successful and tokens were issued. Checks: - HTTP 204 status - Access token present and non-empty - Refresh token present and non-empty """ assert status_code == 204, f"Expected 204, got {status_code}" assert access_token is not None, "Access token must be present" assert refresh_token is not None, "Refresh token must be present" assert len(access_token) > 10, f"Access token too short: {len(access_token)} chars" assert len(refresh_token) > 10, f"Refresh token too short: {len(refresh_token)} chars" def _verify_user_created( app: _AppInfo, test_user: LDAPTestUser, expected_username: Optional[str] = None, ) -> _User: """Verify user was created in Phoenix DB with correct attributes. Args: app: Application info test_user: Test user specification expected_username: Expected username (defaults to test_user.display_name) Returns: The created user object """ user = _get_user_by_email(app, test_user.email) assert user is not None, f"User not found: {test_user.email}" assert user.role == test_user.expected_role, ( f"Expected role {test_user.expected_role}, got {user.role}" ) expected_username = expected_username or test_user.display_name assert user.profile.username == expected_username, ( f"Expected username '{expected_username}', got '{user.profile.username}'" ) return user def _ldap_login( app: _AppInfo, username: str, password: str ) -> tuple[int, Optional[str], Optional[str]]: """Perform LDAP login and return response details. Returns: Tuple of (status_code, access_token, refresh_token) """ client = _httpx_client(app) response = client.post( "/auth/ldap/login", json={"username": username, "password": password}, ) return ( response.status_code, response.cookies.get("phoenix-access-token"), response.cookies.get("phoenix-refresh-token"), ) class TestLDAPAuthentication: """Test LDAP authentication - core flows and security.""" async def test_authentication_and_role_mapping( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test successful login assigns role from group membership.""" suffix = token_hex(4) # Admin user (in admins group) admin = _create_test_user(_ldap_server, suffix, "admin", UserRoleInput.ADMIN) status, access_token, refresh_token = _ldap_login(_app, admin.username, admin.password) _verify_ldap_login_success(status, access_token, refresh_token) admin_user = _verify_user_created(_app, admin) # Member user (in members group) member = _create_test_user(_ldap_server, suffix, "member", UserRoleInput.MEMBER) status, access_token, refresh_token = _ldap_login(_app, member.username, member.password) _verify_ldap_login_success(status, access_token, refresh_token) member_user = _verify_user_created(_app, member) # Viewer user (no groups → wildcard) viewer = _create_test_user(_ldap_server, suffix, "viewer", UserRoleInput.VIEWER, groups=[]) status, access_token, refresh_token = _ldap_login(_app, viewer.username, viewer.password) _verify_ldap_login_success(status, access_token, refresh_token) viewer_user = _verify_user_created(_app, viewer) _delete_users( _app, _app.admin_secret, users=[admin_user.gid, member_user.gid, viewer_user.gid] ) async def test_invalid_credentials_rejected( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test login fails for wrong password, nonexistent user, and empty credentials.""" user = _create_test_user(_ldap_server, token_hex(4), "user", UserRoleInput.ADMIN) # Wrong password assert _ldap_login(_app, user.username, "wrong")[0] == 401 # Nonexistent user assert _ldap_login(_app, "nonexistent", "pass")[0] == 401 # Empty credentials assert _ldap_login(_app, "", "")[0] == 401 async def test_role_syncs_on_subsequent_login( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test role updates from LDAP groups but username remains stable.""" suffix = token_hex(4) username, email = f"sync_{suffix}", f"sync_{suffix}@example.com" # First login with members group (mapped to MEMBER) _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="Original Name", groups=[_MEMBER_GROUP], ) status, _, _ = _ldap_login(_app, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app, email) assert user is not None and user.role == UserRoleInput.MEMBER # LDAP changes: promoted to admin, display name changed _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="New Name", groups=[_ADMIN_GROUP], ) status, _, _ = _ldap_login(_app, username, _DEFAULT_PASSWORD) assert status == 204 # Verify: role updated, username stable updated = _get_user_by_email(_app, email) assert updated is not None assert updated.gid == user.gid # Same user assert updated.role == UserRoleInput.ADMIN # Role synced assert updated.profile.username == "Original Name" # Username stable _delete_users(_app, _app.admin_secret, users=[user.gid]) async def test_injection_prevention(self, _app: _AppInfo, _ldap_server: _LDAPServer) -> None: """Test LDAP injection attempts are rejected.""" payloads = [ "*", # Wildcard "admin*", # Wildcard suffix "*(objectClass=*)", # Filter injection "admin)(|(objectClass=*", # Filter escape "admin\x00injected", # Null byte injection "admin\ninjected", # Newline injection "admin\r\ninjected", # CRLF injection ")(cn=*", # DN injection ] for payload in payloads: assert _ldap_login(_app, payload, _DEFAULT_PASSWORD)[0] == 401 async def test_unicode_credentials(self, _app: _AppInfo, _ldap_server: _LDAPServer) -> None: """Test login with Unicode username/password.""" suffix = token_hex(4) email = f"unicode_{suffix}@example.com" _ldap_server.add_user( username="用户名", password="密码123", email=email, display_name="Unicode User", groups=[_MEMBER_GROUP], ) status, access_token, refresh_token = _ldap_login(_app, "用户名", "密码123") _verify_ldap_login_success(status, access_token, refresh_token) user = _get_user_by_email(_app, email) assert user is not None _delete_users(_app, _app.admin_secret, users=[user.gid]) async def test_special_characters_in_password( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test login with special characters in password (quotes, backslashes, etc.).""" suffix = token_hex(4) email = f"special_{suffix}@example.com" special_password = r'p@ss"word\with\'special<chars>&more!' _ldap_server.add_user( username=f"special_{suffix}", password=special_password, email=email, display_name="Special User", groups=[_MEMBER_GROUP], ) status, access_token, refresh_token = _ldap_login( _app, f"special_{suffix}", special_password ) _verify_ldap_login_success(status, access_token, refresh_token) user = _get_user_by_email(_app, email) assert user is not None _delete_users(_app, _app.admin_secret, users=[user.gid]) async def test_missing_email_rejected(self, _app: _AppInfo, _ldap_server: _LDAPServer) -> None: """Test login fails when LDAP user has no email.""" suffix = token_hex(4) _ldap_server.add_user( username=f"noemail_{suffix}", password=_DEFAULT_PASSWORD, email="", display_name="No Email", groups=[_ADMIN_GROUP], ) assert _ldap_login(_app, f"noemail_{suffix}", _DEFAULT_PASSWORD)[0] == 401 async def test_missing_display_name_uses_fallback( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test missing displayName falls back to email prefix.""" suffix = token_hex(4) email = f"noname_{suffix}@example.com" _ldap_server.add_user( username=f"noname_{suffix}", password=_DEFAULT_PASSWORD, email=email, display_name="", groups=[_MEMBER_GROUP], ) status, _, _ = _ldap_login(_app, f"noname_{suffix}", _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app, email) assert user is not None assert user.profile.username == f"noname_{suffix}" # Fallback to email prefix _delete_users(_app, _app.admin_secret, users=[user.gid]) async def test_multiple_groups_uses_first_match( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test user in multiple groups gets role from first matching mapping. group_role_mappings is evaluated in order: ADMIN, MEMBER, VIEWER (wildcard). User is in both MEMBER and ADMIN groups, but ADMIN mapping is checked first. """ suffix = token_hex(4) email = f"multi_{suffix}@example.com" _ldap_server.add_user( username=f"multi_{suffix}", password=_DEFAULT_PASSWORD, email=email, display_name="Multi", groups=[_MEMBER_GROUP, _ADMIN_GROUP], # Both groups ) status, _, _ = _ldap_login(_app, f"multi_{suffix}", _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app, email) assert user is not None assert user.role == UserRoleInput.ADMIN # ADMIN mapping evaluated before MEMBER _delete_users(_app, _app.admin_secret, users=[user.gid]) class TestLDAPUserIdentificationStrategies: """Test LDAP user identification strategies. Phoenix supports two modes: 1. Simple Mode (default): Email is the identifier. Email changes create new users. 2. Enterprise Mode (PHOENIX_LDAP_ATTR_UNIQUE_ID): Stable ID is the identifier. Email changes preserve identity. """ def test_email_change_creates_new_user_without_unique_id( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Simple mode: email change creates a new user (email is the identifier).""" suffix = token_hex(4) username = f"simple_{suffix}" email_v1 = f"simple_v1_{suffix}@example.com" email_v2 = f"simple_v2_{suffix}@example.com" # First login with email_v1 _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email_v1, display_name="User", groups=[_ADMIN_GROUP], ) status, access_token, refresh_token = _ldap_login(_app, username, _DEFAULT_PASSWORD) _verify_ldap_login_success(status, access_token, refresh_token) user_v1 = _get_user_by_email(_app, email_v1) assert user_v1 is not None # Email changes in LDAP, login again _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email_v2, display_name="User", groups=[_ADMIN_GROUP], ) status, access_token, refresh_token = _ldap_login(_app, username, _DEFAULT_PASSWORD) _verify_ldap_login_success(status, access_token, refresh_token) # Result: NEW user created (email is the identifier) user_v2 = _get_user_by_email(_app, email_v2) assert user_v2 is not None assert user_v2.gid != user_v1.gid, "Different user when email changes" # Cleanup both users _delete_users(_app, _app.admin_secret, users=[user_v1.gid, user_v2.gid]) def test_email_change_preserves_identity_with_unique_id( self, _app_ldap_unique_id: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Enterprise mode: email change preserves identity (unique_id is the identifier). Also tests migration: admin pre-provisions user, first login migrates to unique_id, subsequent email change still preserves identity. """ app = _app_ldap_unique_id suffix = token_hex(4) username = f"enterprise_{suffix}" email_v1 = f"enterprise_v1_{suffix}@example.com" email_v2 = f"enterprise_v2_{suffix}@example.com" # Admin pre-provisions user (no unique_id in DB yet) graphql_client = _httpx_client(app, app.admin_secret) username = f"Pre-Provisioned {suffix}" response = graphql_client.post( "/graphql", json={ "query": """ mutation($email: String!, $username: String!, $role: UserRoleInput!) { createUser(input: { email: $email, username: $username, role: $role, authMethod: LDAP }) { user { id } } } """, "variables": {"email": email_v1, "username": username, "role": "MEMBER"}, }, ) assert response.status_code == 200 response_json = response.json() assert not response_json.get("errors") # First login (migrates pre-provisioned user to unique_id) _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email_v1, display_name=f"User {token_hex(4)}", groups=[_ADMIN_GROUP], ) status, access_token, refresh_token = _ldap_login(app, username, _DEFAULT_PASSWORD) _verify_ldap_login_success(status, access_token, refresh_token) user_v1 = _get_user_by_email(app, email_v1) assert user_v1 is not None assert user_v1.role == UserRoleInput.ADMIN, "Role updated from LDAP groups" assert user_v1.username == username, "Username stable from pre-provisioning" # Email changes in LDAP, login again _ldap_server.add_user( username=username, # Same username = same entryUUID password=_DEFAULT_PASSWORD, email=email_v2, display_name=f"User {token_hex(4)}", groups=[_MEMBER_GROUP], ) status, access_token, refresh_token = _ldap_login(app, username, _DEFAULT_PASSWORD) _verify_ldap_login_success(status, access_token, refresh_token) # Result: SAME user (unique_id is the identifier) user_v2 = _get_user_by_email(app, email_v2) assert user_v2 is not None assert user_v2.gid == user_v1.gid, "Same user when unique_id is configured" assert user_v2.role == UserRoleInput.MEMBER assert user_v2.username == username # Old email no longer exists assert _get_user_by_email(app, email_v1) is None # Cleanup _delete_users(app, app.admin_secret, users=[user_v2.gid]) class TestLDAPGraphQLIntegration: """Test GraphQL integration for LDAP users.""" async def test_ldap_user_graphql_auth_method( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test that GraphQL correctly exposes LDAP users with authMethod='LDAP'. Verifies the GraphQL resolver translates the database storage (OAuth2 with marker) to the semantic AuthMethod.LDAP for the frontend. """ test_user = _create_test_user( _ldap_server, token_hex(4), "graphql_user", UserRoleInput.ADMIN ) # Step 1: Create LDAP user via login status, access_token, refresh_token = _ldap_login( _app, test_user.username, test_user.password ) _verify_ldap_login_success(status, access_token, refresh_token) # Step 2: Query GraphQL to verify authMethod is 'LDAP' graphql_query = """ query { users { edges { user: node { email authMethod } } } } """ graphql_client = _httpx_client(_app, _app.admin_secret) graphql_response = graphql_client.post( "/graphql", json={"query": graphql_query}, ) assert graphql_response.status_code == 200 graphql_data = graphql_response.json() assert not graphql_data.get("errors"), graphql_data.get("errors") data = graphql_data # Find our LDAP user in the response users = data["data"]["users"]["edges"] ldap_user_data = next((u for u in users if u["user"]["email"] == test_user.email), None) assert ldap_user_data is not None, f"User {test_user.email} not found in GraphQL response" # The key assertion: GraphQL should expose authMethod as 'LDAP' (not 'OAUTH2') assert ldap_user_data["user"]["authMethod"] == "LDAP", ( f"Expected authMethod='LDAP', got '{ldap_user_data['user']['authMethod']}'" ) # Cleanup user = _get_user_by_email(_app, test_user.email) if user: _delete_users(_app, _app.admin_secret, users=[user.gid]) class TestLDAPSecurityIsolation: """Test that LDAP, OAuth2, and LOCAL auth methods are isolated from each other.""" async def test_local_user_protected_from_ldap_login( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """LOCAL user cannot be hijacked via LDAP login with same email.""" suffix = token_hex(4) email = f"local_{suffix}@example.com" # Create LOCAL user graphql_client = _httpx_client(_app, _app.admin_secret) resp = graphql_client.post( "/graphql", json={ "query": """mutation($i: CreateUserInput!) { createUser(input: $i) { user { id } } }""", "variables": { "i": { "email": email, "username": "Local", "role": "MEMBER", "authMethod": "LOCAL", "password": "pass123", } }, }, ) local_user_id = resp.json()["data"]["createUser"]["user"]["id"] # Add same email to LDAP _ldap_server.add_user( username=f"local_{suffix}", password="ldappass", email=email, display_name="LDAP", groups=[_ADMIN_GROUP], ) # LDAP login should fail (LOCAL user protected) assert _ldap_login(_app, f"local_{suffix}", "ldappass")[0] == 401 _delete_users(_app, _app.admin_secret, users=[local_user_id]) async def test_ldap_user_protected_from_password_login( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """LDAP user cannot login via password endpoint (no password_hash).""" user = _create_test_user(_ldap_server, token_hex(4), "ldap", UserRoleInput.ADMIN) status, _, _ = _ldap_login(_app, user.username, user.password) assert status == 204 # Password login should fail client = _httpx_client(_app) resp = client.post("/auth/login", json={"email": user.email, "password": user.password}) assert resp.status_code == 401 db_user = _get_user_by_email(_app, user.email) assert db_user is not None _delete_users(_app, _app.admin_secret, users=[db_user.gid]) async def test_oauth2_user_protected_from_ldap_login( self, _app: _AppInfo, _ldap_server: _LDAPServer ) -> None: """OAuth2 user cannot be hijacked via LDAP login with same email.""" suffix = token_hex(4) email = f"oauth_{suffix}@example.com" # Create OAuth2 user via REST API client = _httpx_client(_app, _app.admin_secret) resp = client.post( "/v1/users", json={ "user": { "email": email, "username": f"oauth_{suffix}", "role": "VIEWER", "auth_method": "OAUTH2", "oauth2_client_id": "google", "oauth2_user_id": f"google-{suffix}", }, "send_welcome_email": False, }, ) assert resp.status_code == 201 oauth_user_id = resp.json()["data"]["id"] # Add same email to LDAP _ldap_server.add_user( username=f"oauth_{suffix}", password=_DEFAULT_PASSWORD, email=email, display_name="LDAP", groups=[_ADMIN_GROUP], ) # LDAP login should fail (OAuth2 user protected) assert _ldap_login(_app, f"oauth_{suffix}", _DEFAULT_PASSWORD)[0] == 401 client.delete(f"/v1/users/{oauth_user_id}") class TestLDAPConfiguration: """Test LDAP configuration-specific behaviors.""" def test_ldap_allow_sign_up_false_with_email_lookup( self, _app_ldap_no_sign_up: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test email-based user lookup when allow_sign_up=false. Critical behavior for allow_sign_up=false: 1. Admin creates LDAP user with email "john@example.com" 2. User logs in via LDAP with any username 3. Phoenix gets email from LDAP authentication 4. Phoenix looks up user by email → finds user 5. Login succeeds and attributes synced from LDAP """ # Step 1: Admin pre-creates LDAP user (only needs email) suffix = token_hex(4) ldap_username = f"jdoe_{suffix}" # What user will login with email = f"john_{suffix}@example.com" # Admin creates user via GraphQL with wrong username graphql_client = _httpx_client(_app_ldap_no_sign_up, _app_ldap_no_sign_up.admin_secret) create_response = graphql_client.post( "/graphql", json={ "query": """ mutation CreateUser($input: CreateUserInput!) { createUser(input: $input) { user { id email username authMethod } } } """, "variables": { "input": { "email": email, "username": "John Doe", # Display name "role": "MEMBER", "authMethod": "LDAP", }, }, }, ) assert create_response.status_code == 200 create_json = create_response.json() assert not create_json.get("errors"), create_json.get("errors") user_data = create_json["data"]["createUser"]["user"] assert user_data["authMethod"] == "LDAP" created_user_gid = user_data["id"] # Step 2: Add user to LDAP with matching email _ldap_server.add_user( username=ldap_username, password=_DEFAULT_PASSWORD, email=email, display_name="John Smith, Ph.D.", groups=[_ADMIN_GROUP], ) # Step 3: User logs in via LDAP (email lookup finds pre-created user) status, access_token, refresh_token = _ldap_login( _app_ldap_no_sign_up, ldap_username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) # Step 4: Verify username remains stable (not synced from LDAP) users = _list_users(_app_ldap_no_sign_up, _app_ldap_no_sign_up.admin_secret) updated_user = next((u for u in users if u.profile.email == email), None) assert updated_user is not None # Username stays stable from admin creation (prevents collisions on displayName changes) assert updated_user.profile.username == "John Doe" assert updated_user.role == UserRoleInput.ADMIN # Step 5: Verify subsequent logins work status, access_token, refresh_token = _ldap_login( _app_ldap_no_sign_up, ldap_username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) _delete_users( _app_ldap_no_sign_up, _app_ldap_no_sign_up.admin_secret, users=[created_user_gid] ) class TestLDAPPosixGroupSearch: """Test LDAP group search (no memberOf attribute). Tests group lookup via search filter instead of reading memberOf from user entry. Uses (member=%s) where %s is replaced with the login username directly (no GROUP_SEARCH_FILTER_USER_ATTR configured). Note: For true POSIX RFC 2307 memberUid testing with GROUP_SEARCH_FILTER_USER_ATTR, see TestLDAPPosixMemberUidGroupSearch. """ def test_posix_role_from_group_search( self, _app_ldap_posix: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test role assignment via POSIX group search (no memberOf attribute).""" suffix = token_hex(4) username = f"posix_{suffix}" email = f"posix_{suffix}@example.com" # User without memberOf _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="POSIX", groups=[], ) # Add to admins group with username as member _ldap_server.add_group(cn="admins", members=[username]) status, _, _ = _ldap_login(_app_ldap_posix, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app_ldap_posix, email) assert user is not None assert user.role == UserRoleInput.ADMIN _delete_users(_app_ldap_posix, _app_ldap_posix.admin_secret, users=[user.gid]) def test_posix_wildcard_when_no_groups( self, _app_ldap_posix: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test wildcard role when user is in no groups.""" suffix = token_hex(4) username = f"posix_none_{suffix}" email = f"posix_none_{suffix}@example.com" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="No Groups", groups=[], ) # Don't add to any group status, _, _ = _ldap_login(_app_ldap_posix, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app_ldap_posix, email) assert user is not None assert user.role == UserRoleInput.VIEWER # Wildcard _delete_users(_app_ldap_posix, _app_ldap_posix.admin_secret, users=[user.gid]) def test_posix_username_case_insensitive( self, _app_ldap_posix: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test username matching in group search is case-insensitive.""" suffix = token_hex(4) username = f"posix_case_{suffix}" email = f"posix_case_{suffix}@example.com" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="Case", groups=[], ) # Group has UPPERCASE username, user logs in with lowercase _ldap_server.add_group(cn="admins", members=[username.upper()]) status, _, _ = _ldap_login(_app_ldap_posix, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app_ldap_posix, email) assert user is not None assert user.role == UserRoleInput.ADMIN # Should match despite case _delete_users(_app_ldap_posix, _app_ldap_posix.admin_secret, users=[user.gid]) class TestLDAPPosixMemberUidGroupSearch: """Test LDAP POSIX memberUid group search with GROUP_SEARCH_FILTER_USER_ATTR. This tests the code path where Phoenix must read a specific attribute (e.g., uid) from the user entry to substitute into the group search filter. Configuration: - GROUP_SEARCH_FILTER: (memberUid=%s) - GROUP_SEARCH_FILTER_USER_ATTR: uid The mock LDAP server respects the requested attributes list, so these tests properly catch bugs where required attributes (like uid) aren't being requested. This caught a real bug where group_search_filter_user_attr wasn't included in the LDAP search attribute list. """ def test_memberuid_role_from_group_search( self, _app_ldap_posix_memberuid: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test role assignment when GROUP_SEARCH_FILTER_USER_ATTR is configured.""" suffix = token_hex(4) username = f"memberuid_{suffix}" email = f"memberuid_{suffix}@example.com" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="MemberUid User", groups=[], # No memberOf - relies on group search ) # Add to admins group with username (not DN) _ldap_server.add_group(cn="admins", members=[username]) status, access_token, refresh_token = _ldap_login( _app_ldap_posix_memberuid, username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) user = _get_user_by_email(_app_ldap_posix_memberuid, email) assert user is not None assert user.role == UserRoleInput.ADMIN, ( f"Expected ADMIN role from memberUid group search, got {user.role}. " "This indicates GROUP_SEARCH_FILTER_USER_ATTR is not working correctly." ) _delete_users( _app_ldap_posix_memberuid, _app_ldap_posix_memberuid.admin_secret, users=[user.gid] ) def test_memberuid_member_role( self, _app_ldap_posix_memberuid: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test MEMBER role assignment via memberUid group search.""" suffix = token_hex(4) username = f"memberuid_member_{suffix}" email = f"memberuid_member_{suffix}@example.com" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="Member User", groups=[], ) _ldap_server.add_group(cn="members", members=[username]) status, _, _ = _ldap_login(_app_ldap_posix_memberuid, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app_ldap_posix_memberuid, email) assert user is not None assert user.role == UserRoleInput.MEMBER _delete_users( _app_ldap_posix_memberuid, _app_ldap_posix_memberuid.admin_secret, users=[user.gid] ) def test_memberuid_wildcard_when_no_groups( self, _app_ldap_posix_memberuid: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test wildcard role when user is in no groups (memberUid mode).""" suffix = token_hex(4) username = f"memberuid_none_{suffix}" email = f"memberuid_none_{suffix}@example.com" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email=email, display_name="No Groups", groups=[], ) # Don't add to any group status, _, _ = _ldap_login(_app_ldap_posix_memberuid, username, _DEFAULT_PASSWORD) assert status == 204 user = _get_user_by_email(_app_ldap_posix_memberuid, email) assert user is not None assert user.role == UserRoleInput.VIEWER # Wildcard _delete_users( _app_ldap_posix_memberuid, _app_ldap_posix_memberuid.admin_secret, users=[user.gid] ) class TestLDAPNoEmailMode: """Test LDAP no-email mode (null email markers). When PHOENIX_LDAP_ATTR_EMAIL="" is configured, Phoenix generates null email markers for users instead of reading email from LDAP. Users are identified by their unique_id (entryUUID). This mode is useful for LDAP directories that don't have email attributes. """ def _get_user_with_username(self, app: _AppInfo, username: str) -> Optional[_User]: """Find user by checking for null email marker containing username hash.""" users = _list_users(app, app.admin_secret) for user in users: if user.profile.username == username: return user return None def test_login_creates_user_with_null_email_marker( self, _app_ldap_no_email: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test login in no-email mode creates user with null email marker.""" suffix = token_hex(4) username = f"noemail_{suffix}" display_name = f"No Email User {suffix}" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email="ignored@example.com", # This email is ignored in no-email mode display_name=display_name, groups=[_ADMIN_GROUP], ) status, access_token, refresh_token = _ldap_login( _app_ldap_no_email, username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) # Find the user - should have null email marker user = self._get_user_with_username(_app_ldap_no_email, display_name) assert user is not None, "User with null email marker should be created" assert user.profile.email == "", "User should have null email on first login" assert user.role == UserRoleInput.ADMIN _delete_users(_app_ldap_no_email, _app_ldap_no_email.admin_secret, users=[user.gid]) def test_subsequent_login_finds_same_user( self, _app_ldap_no_email: _AppInfo, _ldap_server: _LDAPServer ) -> None: """Test subsequent login in no-email mode finds the same user by unique_id.""" suffix = token_hex(4) username = f"noemail_same_{suffix}" display_name = f"Same User {suffix}" _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email="ignored@example.com", display_name=display_name, groups=[_MEMBER_GROUP], ) # First login status, access_token, refresh_token = _ldap_login( _app_ldap_no_email, username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) user1 = self._get_user_with_username(_app_ldap_no_email, display_name) assert user1 is not None, "User with same username should be found on first login" assert user1.profile.email == "", "User should have null email on first login" assert user1.role is UserRoleInput.MEMBER, "User should have MEMBER role on first login" # Role changes in LDAP, login again _ldap_server.add_user( username=username, password=_DEFAULT_PASSWORD, email="ignored@example.com", display_name=display_name, groups=[_ADMIN_GROUP], ) # Second login - should find same user status, access_token, refresh_token = _ldap_login( _app_ldap_no_email, username, _DEFAULT_PASSWORD ) _verify_ldap_login_success(status, access_token, refresh_token) user2 = self._get_user_with_username(_app_ldap_no_email, display_name) assert user2 is not None, "User with same username should be found on subsequent login" assert user2.profile.email == "", "User should have null email on subsequent login" assert user2.role is UserRoleInput.ADMIN, "User should have ADMIN role on subsequent login" assert user2.gid == user1.gid, "Same user should be found on subsequent login" _delete_users(_app_ldap_no_email, _app_ldap_no_email.admin_secret, users=[user1.gid])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server