Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_oauth2.py33.3 kB
from secrets import token_hex from typing import Any, Optional import pytest from sqlalchemy import insert, select from starlette.types import ASGIApp from phoenix.db import models from phoenix.server.api.routers.oauth2 import ( InvalidUserInfo, MissingEmailScope, SignInNotAllowed, UserInfo, _get_existing_oauth2_user, _parse_user_info, ) from phoenix.server.types import DbSessionFactory @pytest.mark.parametrize( "user,oauth2_client_id,user_info,allowed", [ # Test Case: User with password hash cannot sign in with OAuth2 # Verifies that users who have set a password must use password authentication # and cannot switch to OAuth2 authentication. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=b"password_hash", password_salt=b"password_salt", reset_password=False, oauth2_client_id=None, oauth2_user_id=None, auth_method="LOCAL", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), False, id="user_with_password_hash", ), # Test Case: User with matching OAuth2 credentials can sign in # Verifies that users with matching OAuth2 credentials can successfully sign in # without any credential updates needed. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_matching_oauth2_credentials", ), # Test Case: User with different OAuth2 client ID can sign in # Verifies that users found by email can have their OAuth2 client ID updated # when signing in with a different client ID. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="987654321098-xyzdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_different_oauth2_client_id", ), # Test Case: User with different OAuth2 user ID cannot sign in # Verifies that users cannot sign in when their OAuth2 user ID doesn't match, # even if the client ID matches. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890987654321", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), False, id="user_with_different_oauth2_user_id", ), # Test Case: User with missing OAuth2 client ID can sign in # Verifies that users found by email can have their OAuth2 client ID set # when signing in for the first time with OAuth2. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id=None, oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_missing_oauth2_client_id", ), # Test Case: User with missing OAuth2 user ID can sign in # Verifies that users can sign in when their OAuth2 user ID is missing, # if the client ID matches. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id=None, auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_missing_oauth2_user_id", ), # Test Case: User with missing OAuth2 client ID but different user ID can sign in # Verifies that users found by email can have both their OAuth2 client ID and user ID # updated when signing in for the first time with OAuth2. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id=None, oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890987654321", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_missing_oauth2_client_id_and_different_user_id", ), # Test Case: User with missing OAuth2 user ID but different client ID can sign in # Verifies that users found by email can have both their OAuth2 client ID and user ID # updated when signing in with different credentials. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="987654321098-xyzdef.apps.googleusercontent.com", oauth2_user_id=None, auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_with_missing_oauth2_user_id_and_different_client_id", ), # Test Case: User found by email with no OAuth2 credentials can sign in # Verifies that users found by email can have their OAuth2 credentials set # when signing in for the first time with OAuth2. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id=None, oauth2_user_id=None, auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_found_by_email_no_oauth2_credentials", ), # Test Case: User found by email with matching OAuth2 credentials can sign in # Verifies that users found by email with matching OAuth2 credentials # can sign in without any credential updates. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_found_by_email_matching_oauth2_credentials", ), # Test Case: User found by email with different OAuth2 credentials can sign in # Verifies that users found by email can have their OAuth2 credentials updated # when signing in with different credentials. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="987654321098-xyzdef.apps.googleusercontent.com", oauth2_user_id="118234567890987654321", auth_method="OAUTH2", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), True, id="user_found_by_email_different_oauth2_credentials", ), # Test Case: User with updated profile picture can sign in # Verifies that users can have their profile picture URL updated # when signing in with a new URL. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", profile_picture_url="https://old-picture.com/avatar.jpg", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url="https://new-picture.com/avatar.jpg", ), True, id="user_with_updated_profile_picture", ), # Test Case: User with changed OAuth2 client ID and profile picture can sign in # Verifies that users can have both their OAuth2 client ID and profile picture # updated simultaneously when signing in. pytest.param( dict( user_role_id=1, username=token_hex(8), password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="987654321098-xyzdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", profile_picture_url="https://old-picture.com/avatar.jpg", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url="https://new-picture.com/avatar.jpg", ), True, id="user_with_changed_client_id_and_profile_picture", ), # Test Case: User with removed profile picture can sign in # Verifies that users can have their profile picture URL removed (set to None) # when signing in with no profile picture URL provided. pytest.param( dict( user_role_id=1, username=f"test_username{token_hex(8)}", password_hash=None, password_salt=None, reset_password=False, oauth2_client_id="123456789012-abcdef.apps.googleusercontent.com", oauth2_user_id="118234567890123456789", auth_method="OAUTH2", profile_picture_url="https://example.com/avatar.jpg", ), "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=f"test_username{token_hex(8)}", profile_picture_url=None, ), True, id="user_with_removed_profile_picture", ), # Test Case: Non-existent user cannot sign in # Verifies that sign-in is rejected when no user is found # by either OAuth2 credentials or email. pytest.param( None, "123456789012-abcdef.apps.googleusercontent.com", UserInfo( idp_user_id="118234567890123456789", email=f"{token_hex(8)}@example.com", username=None, profile_picture_url=None, ), False, id="non_existent_user", ), ], ) async def test_get_existing_oauth2_user( app: ASGIApp, db: DbSessionFactory, user: Optional[dict[str, Any]], oauth2_client_id: str, user_info: UserInfo, allowed: bool, ) -> None: """Test the OAuth2 user sign-in and update process. This test verifies the behavior of _get_existing_oauth2_user function, which handles: 1. OAuth2 user authentication 2. User profile updates (profile picture) 3. OAuth2 credential updates (client_id, user_id) 4. Error cases and rejections The test covers various scenarios: - Users with/without passwords - Users with matching/different OAuth2 credentials - Users found by OAuth2 credentials or email - Profile information updates - Error conditions Args: app: The FastAPI application instance db: Database session factory user: Optional existing user in the database oauth2_client_id: OAuth2 client ID to use for sign-in user_info: User information from the OAuth2 provider allowed: Whether the sign-in should be allowed """ if user: async with db() as session: # For some strange reason PostgreSQL insists on UPDATE instead of # INSERT when using session.add(user), so we INSERT manually. await session.execute( insert(models.User).values( email=user_info.email, user_role_id=user.get("user_role_id"), username=user.get("username"), password_hash=user.get("password_hash"), password_salt=user.get("password_salt"), reset_password=user.get("reset_password"), oauth2_client_id=user.get("oauth2_client_id"), oauth2_user_id=user.get("oauth2_user_id"), auth_method=user.get("auth_method"), ) ) async with db() as session: if not user or not allowed: with pytest.raises(SignInNotAllowed): await _get_existing_oauth2_user( session, oauth2_client_id=oauth2_client_id, user_info=user_info, ) return oauth2_user = await _get_existing_oauth2_user( session, oauth2_client_id=oauth2_client_id, user_info=user_info, ) # Verify the returned user object has the correct OAuth2 credentials assert oauth2_user assert oauth2_user.oauth2_client_id == oauth2_client_id assert oauth2_user.oauth2_user_id == user_info.idp_user_id # Verify the database state after the update async with db() as session: db_user = await session.scalar(select(models.User).filter_by(email=user_info.email)) assert db_user # Verify OAuth2 credentials are correctly updated assert db_user.oauth2_client_id == oauth2_client_id assert db_user.oauth2_user_id == user_info.idp_user_id # Verify profile picture URL is updated if provided in user_info if user_info.profile_picture_url is not None: assert db_user.profile_picture_url == user_info.profile_picture_url # Verify username remains unchanged assert db_user.username == user.get("username") class TestParseUserInfo: """Test suite for _parse_user_info with real-world IDP examples.""" def test_google_id_token(self) -> None: """Test parsing a typical Google ID token.""" google_token = { "sub": "118234567890123456789", "email": "user@example.com", "email_verified": True, "name": "John Doe", "picture": "https://lh3.googleusercontent.com/a/default-user", "given_name": "John", "family_name": "Doe", "locale": "en", "iat": 1700000000, "exp": 1700003600, } result = _parse_user_info(google_token) assert result.idp_user_id == "118234567890123456789" assert result.email == "user@example.com" assert result.username == "John Doe" assert result.profile_picture_url == "https://lh3.googleusercontent.com/a/default-user" assert result.claims["email_verified"] is True assert result.claims["given_name"] == "John" def test_auth0_id_token(self) -> None: """Test parsing an Auth0 ID token with custom namespace claims.""" auth0_token = { "sub": "auth0|507f1f77bcf86cd799439011", "email": "user@example.com", "email_verified": True, "name": "Jane Smith", "picture": "https://s.gravatar.com/avatar/example.png", "nickname": "jane", "https://myapp.com/groups": ["admin", "users"], "https://myapp.com/roles": ["editor"], "iss": "https://tenant.auth0.com/", "aud": "client_id_here", } result = _parse_user_info(auth0_token) assert result.idp_user_id == "auth0|507f1f77bcf86cd799439011" assert result.email == "user@example.com" assert result.username == "Jane Smith" assert result.claims["https://myapp.com/groups"] == ["admin", "users"] assert result.claims["nickname"] == "jane" def test_okta_id_token(self) -> None: """Test parsing an Okta ID token.""" okta_token = { "sub": "00u1234567890abcdef", "email": "employee@company.com", "email_verified": True, "name": "Alice Johnson", "preferred_username": "alice.johnson@company.com", "given_name": "Alice", "family_name": "Johnson", "zoneinfo": "America/Los_Angeles", "locale": "en-US", "groups": ["Everyone", "Developers", "Engineering"], } result = _parse_user_info(okta_token) assert result.idp_user_id == "00u1234567890abcdef" assert result.email == "employee@company.com" assert result.username == "Alice Johnson" assert result.claims["groups"] == ["Everyone", "Developers", "Engineering"] assert result.claims["preferred_username"] == "alice.johnson@company.com" def test_azure_ad_id_token(self) -> None: """Test parsing a Microsoft Azure AD ID token.""" azure_token = { "sub": "AAAAAAAAAAAAAAAAAAAAAIkzqFVrSaSaFHy782bbtaQ", "email": "user@contoso.com", "name": "Bob Williams", "oid": "00000000-0000-0000-66f3-3332eca7ea81", "preferred_username": "user@contoso.com", "tid": "9188040d-6c67-4c5b-b112-36a304b66dad", "unique_name": "user@contoso.com", "roles": ["Admin", "User"], } result = _parse_user_info(azure_token) assert result.idp_user_id == "AAAAAAAAAAAAAAAAAAAAAIkzqFVrSaSaFHy782bbtaQ" assert result.email == "user@contoso.com" assert result.username == "Bob Williams" assert result.claims["oid"] == "00000000-0000-0000-66f3-3332eca7ea81" assert result.claims["roles"] == ["Admin", "User"] def test_keycloak_id_token(self) -> None: """Test parsing a Keycloak ID token.""" keycloak_token = { "sub": "f:1234abcd-56ef-78gh-90ij-klmnopqrstuv:john", "email": "john@keycloak.local", "email_verified": True, "name": "John Keycloak", "preferred_username": "john", "given_name": "John", "family_name": "Keycloak", "resource_access": { "phoenix": {"roles": ["admin", "developer"]}, "account": {"roles": ["view-profile"]}, }, "realm_access": {"roles": ["offline_access", "uma_authorization"]}, } result = _parse_user_info(keycloak_token) assert result.idp_user_id == "f:1234abcd-56ef-78gh-90ij-klmnopqrstuv:john" assert result.email == "john@keycloak.local" assert result.username == "John Keycloak" assert result.claims["resource_access"]["phoenix"]["roles"] == ["admin", "developer"] def test_aws_cognito_id_token(self) -> None: """Test parsing an AWS Cognito ID token.""" cognito_token = { "sub": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "email": "user@cognito.local", "email_verified": True, "cognito:username": "user123", "cognito:groups": ["Administrators", "PowerUsers"], "given_name": "Cognito", "family_name": "User", "name": "Cognito User", } result = _parse_user_info(cognito_token) assert result.idp_user_id == "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" assert result.email == "user@cognito.local" assert result.username == "Cognito User" assert result.claims["cognito:groups"] == ["Administrators", "PowerUsers"] def test_gitlab_id_token(self) -> None: """Test parsing a GitLab ID token.""" gitlab_token = { "sub": "1234567", "email": "developer@gitlab.com", "email_verified": True, "name": "GitLab Developer", "nickname": "gitdev", "picture": "https://gitlab.com/uploads/-/system/user/avatar/1234567/avatar.png", "groups_direct": ["engineering/platform", "engineering/backend"], } result = _parse_user_info(gitlab_token) assert result.idp_user_id == "1234567" assert result.email == "developer@gitlab.com" assert result.username == "GitLab Developer" assert result.claims["groups_direct"] == ["engineering/platform", "engineering/backend"] def test_integer_sub_claim(self) -> None: """Test that integer sub claims are converted to strings (pragmatic compatibility).""" token_with_int_sub = { "sub": 123456789, # Some IDPs might send this as an integer "email": "user@example.com", "name": "Test User", } result = _parse_user_info(token_with_int_sub) assert result.idp_user_id == "123456789" assert isinstance(result.idp_user_id, str) def test_minimal_valid_token(self) -> None: """Test parsing a minimal valid token (only required fields).""" minimal_token = { "sub": "minimal-user-id", "email": "minimal@example.com", } result = _parse_user_info(minimal_token) assert result.idp_user_id == "minimal-user-id" assert result.email == "minimal@example.com" assert result.username is None assert result.profile_picture_url is None def test_missing_sub_claim(self) -> None: """Test that missing sub claim raises InvalidUserInfo.""" token_without_sub = { "email": "user@example.com", "name": "Test User", } with pytest.raises(InvalidUserInfo, match="Missing required 'sub' claim"): _parse_user_info(token_without_sub) def test_none_sub_claim(self) -> None: """Test that None sub claim raises InvalidUserInfo.""" token_with_none_sub = { "sub": None, "email": "user@example.com", } with pytest.raises(InvalidUserInfo, match="Missing required 'sub' claim"): _parse_user_info(token_with_none_sub) def test_empty_sub_claim(self) -> None: """Test that empty/whitespace sub claim raises InvalidUserInfo.""" token_with_empty_sub = { "sub": " ", "email": "user@example.com", } with pytest.raises(InvalidUserInfo, match="'sub' claim cannot be empty"): _parse_user_info(token_with_empty_sub) def test_invalid_sub_type(self) -> None: """Test that invalid sub claim type raises InvalidUserInfo.""" token_with_dict_sub = { "sub": {"id": "12345"}, # Invalid: should be string or int "email": "user@example.com", } with pytest.raises(InvalidUserInfo, match="Invalid 'sub' claim type"): _parse_user_info(token_with_dict_sub) def test_missing_email(self) -> None: """Test that missing email raises MissingEmailScope.""" token_without_email = { "sub": "user-123", "name": "Test User", } with pytest.raises(MissingEmailScope, match="Missing or invalid 'email' claim"): _parse_user_info(token_without_email) def test_none_email(self) -> None: """Test that None email raises MissingEmailScope.""" token_with_none_email = { "sub": "user-123", "email": None, } with pytest.raises(MissingEmailScope, match="Missing or invalid 'email' claim"): _parse_user_info(token_with_none_email) def test_empty_email(self) -> None: """Test that empty/whitespace email raises MissingEmailScope.""" token_with_empty_email = { "sub": "user-123", "email": " ", } with pytest.raises(MissingEmailScope, match="Missing or invalid 'email' claim"): _parse_user_info(token_with_empty_email) def test_invalid_email_type(self) -> None: """Test that non-string email raises MissingEmailScope.""" token_with_int_email = { "sub": "user-123", "email": 12345, } with pytest.raises(MissingEmailScope, match="Missing or invalid 'email' claim"): _parse_user_info(token_with_int_email) def test_non_string_name_gracefully_ignored(self) -> None: """Test that non-string name values are gracefully ignored.""" token_with_int_name = { "sub": "user-123", "email": "user@example.com", "name": 12345, # Invalid type } result = _parse_user_info(token_with_int_name) assert result.username is None # Should be ignored, not crash def test_non_string_picture_gracefully_ignored(self) -> None: """Test that non-string picture values are gracefully ignored.""" token_with_dict_picture = { "sub": "user-123", "email": "user@example.com", "picture": {"url": "https://example.com"}, # Invalid type } result = _parse_user_info(token_with_dict_picture) assert result.profile_picture_url is None # Should be ignored, not crash def test_claims_filtering_removes_empty_values(self) -> None: """Test that empty/None values are filtered from claims.""" token_with_empty_values: dict[str, Any] = { "sub": "user-123", "email": "user@example.com", "name": "Test User", "empty_string": "", "whitespace_string": " ", "none_value": None, "empty_list": [], "empty_dict": {}, "valid_number": 42, "valid_bool": True, "valid_list": ["item"], } result = _parse_user_info(token_with_empty_values) # Empty values should be filtered out assert "empty_string" not in result.claims assert "whitespace_string" not in result.claims assert "none_value" not in result.claims assert "empty_list" not in result.claims assert "empty_dict" not in result.claims # Valid values should be preserved assert result.claims["valid_number"] == 42 assert result.claims["valid_bool"] is True assert result.claims["valid_list"] == ["item"] def test_complex_nested_claims_preserved(self) -> None: """Test that complex nested structures in claims are preserved.""" token_with_nested = { "sub": "user-123", "email": "user@example.com", "roles": { "organization": { "teams": [ {"name": "engineering", "role": "member"}, {"name": "platform", "role": "admin"}, ] } }, } result = _parse_user_info(token_with_nested) assert "roles" in result.claims assert result.claims["roles"]["organization"]["teams"][0]["name"] == "engineering" def test_whitespace_handling_in_sub(self) -> None: """Test that leading/trailing whitespace in sub is trimmed.""" token_with_whitespace_sub = { "sub": " user-123 ", "email": "user@example.com", } result = _parse_user_info(token_with_whitespace_sub) assert result.idp_user_id == "user-123" # Whitespace trimmed def test_unicode_and_special_characters(self) -> None: """Test handling of Unicode and special characters in claims.""" token_with_unicode = { "sub": "user-123", "email": "用户@example.com", "name": "José García-Müller", "locale": "zh-CN", } result = _parse_user_info(token_with_unicode) assert result.email == "用户@example.com" assert result.username == "José García-Müller" assert result.claims["locale"] == "zh-CN" def test_special_characters_in_claim_keys(self) -> None: """ Test that claims with special characters in keys are preserved. These require quoted identifiers in JMESPath (e.g., "cognito:groups"). """ token_with_special_keys = { "sub": "user-123", "email": "user@example.com", # Auth0 style - URL namespace "https://myapp.com/groups": ["admin", "users"], # AWS Cognito style - colon separator "cognito:groups": ["Administrators"], # Keycloak style - nested with special chars "resource_access": {"my-app": {"roles": ["developer"]}}, } result = _parse_user_info(token_with_special_keys) # Verify special character keys are preserved in claims assert result.claims["https://myapp.com/groups"] == ["admin", "users"] assert result.claims["cognito:groups"] == ["Administrators"] assert result.claims["resource_access"]["my-app"]["roles"] == ["developer"]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server