Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_oauth2.py28.1 kB
"""Unit tests for OAuth2Client.""" from typing import Any import pytest from phoenix.config import OAuth2ClientConfig from phoenix.server.oauth2 import OAuth2Client, OAuth2Clients # Common test configuration constants _OAUTH2_CONFIG_DEFAULTS: dict[str, Any] = { "idp_name": "test", "idp_display_name": "Test IDP", "client_id": "test_client_id", "client_secret": "test_secret", "oidc_config_url": "https://test.example.com/.well-known/openid-configuration", "allow_sign_up": False, "auto_login": False, "use_pkce": False, "token_endpoint_auth_method": None, "scopes": "openid email profile", } _OAUTH2_CLIENT_DEFAULTS: dict[str, Any] = { "name": "test", "client_id": "test_id", "client_secret": "test_secret", "server_metadata_url": "https://test.example.com/.well-known/openid-configuration", "display_name": "Test IDP", "allow_sign_up": False, "auto_login": False, } class TestOAuth2ClientJMESPathValidation: """Test that JMESPath expressions are validated at startup.""" @pytest.mark.parametrize( "groups_attribute_path", [ pytest.param("groups", id="simple_path"), pytest.param("resource_access.phoenix.roles", id="nested_path"), pytest.param('"cognito:groups"', id="quoted_identifier_colon"), pytest.param('"https://myapp.com/groups"', id="quoted_identifier_url"), pytest.param("teams[*].name", id="array_projection"), pytest.param('resource_access."my-app".roles', id="nested_with_quoted_segment"), ], ) def test_valid_jmespath_expressions(self, groups_attribute_path: str) -> None: """Test that valid JMESPath expressions are accepted at startup.""" config = OAuth2ClientConfig( **_OAUTH2_CONFIG_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=["admin"], ) clients = OAuth2Clients() clients.add_client(config) # Should not raise @pytest.mark.parametrize( "groups_attribute_path", [ pytest.param("cognito:groups", id="missing_quotes_colon"), pytest.param("https://myapp.com/groups", id="missing_quotes_url"), pytest.param("[invalid syntax!", id="malformed_bracket"), pytest.param("groups[*", id="unclosed_bracket"), ], ) def test_invalid_jmespath_expressions(self, groups_attribute_path: str) -> None: """Test that invalid JMESPath expressions are rejected at startup.""" config = OAuth2ClientConfig( **_OAUTH2_CONFIG_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=["admin"], ) clients = OAuth2Clients() with pytest.raises(ValueError, match="Invalid JMESPath expression"): clients.add_client(config) def test_helpful_error_message_on_invalid_jmespath(self) -> None: """Test that error message includes helpful hints about quoting special characters.""" config = OAuth2ClientConfig( **{**_OAUTH2_CONFIG_DEFAULTS, "idp_name": "auth0", "idp_display_name": "Auth0"}, groups_attribute_path="https://myapp.com/groups", # Invalid - needs quotes allowed_groups=["admin"], ) clients = OAuth2Clients() with pytest.raises(ValueError) as exc_info: clients.add_client(config) error_message = str(exc_info.value) assert "Invalid JMESPath expression" in error_message assert "https://myapp.com/groups" in error_message assert "double quotes" in error_message assert '"cognito:groups"' in error_message # Example in hint assert '"https://myapp.com/groups"' in error_message # Example in hint def test_no_validation_when_groups_attribute_path_is_none(self) -> None: """Test that client creation works when groups_attribute_path is not configured.""" config = OAuth2ClientConfig( **_OAUTH2_CONFIG_DEFAULTS, groups_attribute_path=None, # No group-based access control allowed_groups=[], ) clients = OAuth2Clients() clients.add_client(config) # Should not raise class TestHasSufficientClaims: """Test has_sufficient_claims method for determining if UserInfo call is needed.""" def test_sufficient_when_email_present_no_group_control(self) -> None: """Test that claims are sufficient when email is present and no group control.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123", "email": "user@example.com"} assert client.has_sufficient_claims(claims) is True def test_sufficient_when_email_and_groups_present(self) -> None: """Test that claims are sufficient when both email and required groups present.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin", "users"], ) claims = {"sub": "user123", "email": "user@example.com", "groups": ["users"]} assert client.has_sufficient_claims(claims) is True def test_insufficient_when_email_missing(self) -> None: """Test that claims are insufficient when email is missing.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123"} # No email assert client.has_sufficient_claims(claims) is False def test_insufficient_when_email_is_none(self) -> None: """Test that claims are insufficient when email is None.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123", "email": None} assert client.has_sufficient_claims(claims) is False def test_insufficient_when_email_is_empty_string(self) -> None: """Test that claims are insufficient when email is empty string.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123", "email": ""} assert client.has_sufficient_claims(claims) is False def test_insufficient_when_email_is_whitespace(self) -> None: """Test that claims are insufficient when email is whitespace only.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123", "email": " "} assert client.has_sufficient_claims(claims) is False def test_insufficient_when_email_is_not_string(self) -> None: """Test that claims are insufficient when email is not a string.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=[], ) claims = {"sub": "user123", "email": 12345} assert client.has_sufficient_claims(claims) is False def test_insufficient_when_groups_missing_and_required(self) -> None: """Test that claims are insufficient when groups are missing but required.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin", "users"], ) claims = {"sub": "user123", "email": "user@example.com"} # No groups assert client.has_sufficient_claims(claims) is False def test_insufficient_when_groups_empty_and_required(self) -> None: """Test that claims are insufficient when groups array is empty but required.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin", "users"], ) claims = {"sub": "user123", "email": "user@example.com", "groups": []} assert client.has_sufficient_claims(claims) is False def test_insufficient_when_email_missing_even_with_groups(self) -> None: """Test that email is required even when groups are present.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) claims = {"sub": "user123", "groups": ["admin"]} # Groups present but no email assert client.has_sufficient_claims(claims) is False def test_sufficient_with_nested_groups(self) -> None: """Test that claims are sufficient with nested group paths.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="resource_access.phoenix.roles", allowed_groups=["admin"], ) claims = { "sub": "user123", "email": "user@example.com", "resource_access": {"phoenix": {"roles": ["admin", "developer"]}}, } assert client.has_sufficient_claims(claims) is True def test_insufficient_with_nested_groups_missing(self) -> None: """Test that claims are insufficient when nested groups are missing.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="resource_access.phoenix.roles", allowed_groups=["admin"], ) claims = { "sub": "user123", "email": "user@example.com", "resource_access": {}, # Missing phoenix key } assert client.has_sufficient_claims(claims) is False def test_sufficient_with_quoted_jmespath(self) -> None: """Test that claims are sufficient with quoted JMESPath (special characters).""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path='"cognito:groups"', allowed_groups=["Administrators"], ) claims = { "sub": "user123", "email": "user@example.com", "cognito:groups": ["Administrators", "PowerUsers"], } assert client.has_sufficient_claims(claims) is True class TestOAuth2ClientAccessValidation: """Test group-based access control validation.""" @pytest.mark.parametrize( ("groups_attribute_path", "allowed_groups", "user_claims"), [ pytest.param( "groups", ["admin", "users"], {"sub": "user123", "email": "user@example.com", "groups": ["users", "developers"]}, id="single_matching_group", ), pytest.param( "groups", ["admin", "powerusers"], { "sub": "user123", "email": "admin@example.com", "groups": ["admin", "powerusers", "support"], }, id="multiple_matching_groups", ), pytest.param( "groups", ["1", "True"], {"sub": "user123", "email": "user@example.com", "groups": [1, True, "other"]}, id="type_normalization", ), ], ) def test_access_granted( self, groups_attribute_path: str, allowed_groups: list[str], user_claims: dict[str, Any] ) -> None: """Test that access is granted when user belongs to at least one allowed group.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=allowed_groups, ) # Should not raise client.validate_access(user_claims) @pytest.mark.parametrize( ("groups_attribute_path", "allowed_groups", "user_claims"), [ pytest.param( "groups", ["admin", "powerusers"], {"sub": "user123", "email": "user@example.com", "groups": ["guest", "readonly"]}, id="no_matching_groups", ), pytest.param( "groups", ["admin"], {"sub": "user123", "email": "user@example.com"}, # No groups claim id="missing_groups_claim", ), pytest.param( "groups", ["admin"], {"sub": "user123", "email": "user@example.com", "groups": []}, id="empty_groups_array", ), pytest.param( "groups", ["Admin"], {"sub": "user123", "email": "user@example.com", "groups": ["admin"]}, id="case_sensitive_mismatch", ), pytest.param( "teams[*].name", ["engineering", "product"], { "sub": "user123", "email": "support@example.com", "teams": [{"id": "3", "name": "support"}, {"id": "4", "name": "sales"}], }, id="array_projection_no_match", ), pytest.param( "nonexistent.path.to.groups", ["admin"], {"sub": "user123", "email": "user@example.com", "other": "data"}, id="nonexistent_jmespath_path", ), ], ) def test_access_denied( self, groups_attribute_path: str, allowed_groups: list[str], user_claims: dict[str, Any] ) -> None: """Test that access is denied when user doesn't meet group requirements.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=allowed_groups, ) with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) def test_no_validation_when_disabled_no_allowed_groups(self) -> None: """Test that validation is skipped when group-based access control is disabled.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, # Disabled - no path configured allowed_groups=[], ) user_claims = { "sub": "user123", "email": "user@example.com", "groups": ["guest"], # Groups present but validation is disabled } # Should not raise - validation is disabled client.validate_access(user_claims) def test_invalid_configuration_raises_error(self) -> None: """Test that invalid configuration raises ValueError at initialization.""" with pytest.raises( ValueError, match="groups_attribute_path must be specified when allowed_groups is configured", ): OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=None, allowed_groups=["admin"], ) def test_groups_attribute_path_without_allowed_groups_raises_error(self) -> None: """Test that groups_attribute_path without allowed_groups raises ValueError (fail-closed).""" with pytest.raises( ValueError, match="allowed_groups must be specified when groups_attribute_path is configured", ): OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=[], ) def test_groups_attribute_path_with_empty_allowed_groups_raises_error(self) -> None: """Test that groups_attribute_path with list of empty strings raises ValueError.""" with pytest.raises( ValueError, match="allowed_groups must be specified when groups_attribute_path is configured", ): OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["", " ", ""], # All empty/whitespace - will be filtered out ) def test_empty_string_groups_attribute_path_normalized(self) -> None: """Test that empty string groups_attribute_path is normalized to None.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="", allowed_groups=[], ) # Test that validation is skipped when groups_attribute_path is None user_claims = {"groups": ["admin"]} client.validate_access(user_claims) # Should not raise def test_whitespace_groups_attribute_path_normalized(self) -> None: """Test that whitespace-only groups_attribute_path is normalized to None.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=" ", allowed_groups=[], ) # Test that validation is skipped when groups_attribute_path is None user_claims = {"groups": ["admin"]} client.validate_access(user_claims) # Should not raise def test_empty_strings_in_allowed_groups_removed(self) -> None: """Test that empty strings in allowed_groups are removed.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin", "", "user", " "], ) # Test that validation works with cleaned groups user_claims = {"groups": ["admin"]} client.validate_access(user_claims) # Should not raise def test_duplicate_groups_removed(self) -> None: """Test that duplicate groups in allowed_groups are removed.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin", "user", "admin", "user", "admin"], ) # Test that validation works with deduplicated groups user_claims = {"groups": ["admin"]} client.validate_access(user_claims) # Should not raise def test_normalization_removes_duplicates(self) -> None: """Test that group normalization removes duplicates.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["z", "a", "z", "b", "a", "c"], ) # Test that validation works with deduplicated groups user_claims = {"groups": ["a"]} client.validate_access(user_claims) # Should not raise @pytest.mark.parametrize( ("groups_attribute_path", "allowed_groups", "user_claims"), [ pytest.param( "resource_access.phoenix.roles", ["admin", "user"], { "sub": "user123", "email": "user@example.com", "resource_access": {"phoenix": {"roles": ["admin", "developer"]}}, }, id="keycloak_nested_structure", ), pytest.param( '"cognito:groups"', ["Administrators", "PowerUsers"], { "sub": "user123", "email": "admin@example.com", "cognito:groups": ["Administrators", "Developers"], }, id="cognito_special_chars_in_key", ), ], ) def test_complex_jmespath_integration( self, groups_attribute_path: str, allowed_groups: list[str], user_claims: dict[str, Any], ) -> None: """Integration test: Complex JMESPath patterns work end-to-end with real IDP claim structures.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=allowed_groups, ) # Should not raise - validates JMESPath extraction + access control together client.validate_access(user_claims) @pytest.mark.parametrize( ("groups_attribute_path", "allowed_groups", "user_claims"), [ pytest.param( "groups", ["admin"], {"sub": "user123", "email": "user@example.com", "groups": ["admin", None, "user"]}, id="null_values_in_array", ), pytest.param( "groups", ["管理者"], {"sub": "user123", "email": "user@example.com", "groups": ["管理者", "ユーザー"]}, id="unicode_group_names", ), pytest.param( "groups", ["admin"], { "sub": "user123", "email": "user@example.com", "groups": [{"nested": "value"}, "admin", {"other": "obj"}], }, id="mixed_valid_invalid_items", ), ], ) def test_edge_case_claim_values_granted( self, groups_attribute_path: str, allowed_groups: list[str], user_claims: dict[str, Any], ) -> None: """Test that edge case claim values are handled gracefully (null, empty, unicode, nested objects).""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path=groups_attribute_path, allowed_groups=allowed_groups, ) # Should not raise - normalization filters out invalid items, keeps valid ones client.validate_access(user_claims) def test_empty_strings_do_not_grant_access(self) -> None: """Test that empty strings in groups array don't match any allowed groups.""" client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) user_claims = { "sub": "user123", "email": "user@example.com", "groups": ["", "guest"], # Empty string preserved but shouldn't match 'admin' } with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) @pytest.mark.parametrize( ("user_claims", "should_grant_access"), [ pytest.param( {"sub": "user123", "email": "user@example.com", "groups": "admin"}, True, id="string_instead_of_array", ), pytest.param( {"sub": "user123", "email": "user@example.com", "groups": 123}, False, id="number_instead_of_array", ), pytest.param( {"sub": "user123", "email": "user@example.com", "groups": True}, False, id="boolean_instead_of_array", ), pytest.param( {"sub": "user123", "email": "user@example.com", "groups": {"admin": True}}, False, id="dict_instead_of_array", ), ], ) def test_type_confusion_from_jmespath( self, user_claims: dict[str, Any], should_grant_access: bool ) -> None: """ SECURITY: Test that non-array types returned by JMESPath are handled safely. Malicious or misconfigured IDPs might return unexpected types. The normalization logic must handle these safely to prevent authorization bypass. """ client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) if should_grant_access: # Should not raise - string "admin" normalized to ["admin"] client.validate_access(user_claims) else: # Should deny - cannot normalize to matching groups with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) def test_whitespace_requires_exact_match(self) -> None: """ SECURITY: Whitespace is NOT trimmed - exact match required. This prevents potential confusion attacks where " admin " could be used to bypass "admin" restrictions. Trade-off: legitimate users with whitespace in group names from IDP will be denied. """ client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) # User group with leading/trailing whitespace should NOT match user_claims = { "sub": "user123", "email": "user@example.com", "groups": [" admin ", "user"], } with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) @pytest.mark.parametrize( "malicious_group", [ pytest.param("admin\x00", id="null_byte"), pytest.param("admin\n", id="newline"), pytest.param("admin\t", id="tab"), pytest.param("admin\r", id="carriage_return"), pytest.param("аdmin", id="cyrillic_lookalike"), # Cyrillic 'а' U+0430 ], ) def test_special_characters_do_not_match(self, malicious_group: str) -> None: """ SECURITY: Control characters and lookalike Unicode should NOT match ASCII group names. Prevents injection attacks and homograph attacks where visually similar characters could be used to bypass authorization. """ client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], # ASCII "admin" ) user_claims = { "sub": "user123", "email": "user@example.com", "groups": [malicious_group], } # Should deny - exact byte-level match required with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) def test_nested_arrays_are_skipped(self) -> None: """ SECURITY: Nested arrays in JMESPath results are ignored (not flattened). Prevents exploitation via deeply nested structures that could cause unexpected behavior or DoS. """ client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) # Simulates JMESPath returning nested structure (rare but possible) # Since we can't easily make JMESPath return nested arrays, we test # that the normalization handles them correctly user_claims = { "sub": "user123", "email": "user@example.com", "groups": [["admin"], ["user"]], # Nested arrays } # Should deny - nested arrays are skipped, results in empty user_groups with pytest.raises(PermissionError, match="Access denied"): client.validate_access(user_claims) def test_large_number_of_groups_handled(self) -> None: """ SECURITY: Verify large number of groups doesn't cause DoS. While not a strict security test, ensures the implementation can handle realistic loads (e.g., enterprise users in many groups). """ client = OAuth2Client( **_OAUTH2_CLIENT_DEFAULTS, groups_attribute_path="groups", allowed_groups=["admin"], ) # Simulate user in 1000 groups (realistic for enterprise AD/LDAP) user_claims = { "sub": "user123", "email": "user@example.com", "groups": [f"group{i}" for i in range(1000)] + ["admin"], } # Should grant access - "admin" is in the list client.validate_access(user_claims)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server