test_auth.py•105 kB
from __future__ import annotations
import string
from collections import defaultdict
from collections.abc import Iterator, Sequence
from contextlib import AbstractContextManager
from dataclasses import replace
from datetime import datetime, timedelta, timezone
from functools import partial
from random import choice
from secrets import token_hex
from typing import (
Any,
Generic,
Optional,
TypeVar,
)
from urllib.error import URLError
from urllib.request import urlopen
import bs4
import jwt
import pytest
import smtpdfix
from httpx import HTTPStatusError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult
from strawberry.relay import GlobalID
from phoenix.auth import sanitize_email
from phoenix.server.api.exceptions import Unauthorized
from phoenix.server.api.input_types.UserRoleInput import UserRoleInput
from .._helpers import (
_ADMIN,
_ADMIN_ONLY_ENDPOINTS,
_COMMON_RESOURCE_ENDPOINTS,
_DEFAULT_ADMIN,
_DENIED,
_EXPECTATION_401,
_EXPECTATION_404,
_MEMBER,
_OK,
_OK_OR_DENIED,
_SYSTEM_USER_GID,
_VIEWER,
_VIEWER_BLOCKED_WRITE_OPERATIONS,
_AccessToken,
_AdminSecret,
_ApiKey,
_AppInfo,
_create_api_key,
_create_user,
_delete_users,
_Email,
_ExistingSpan,
_Expectation,
_export_embeddings,
_extract_html,
_GetUser,
_GqlId,
_Headers,
_httpx_client,
_initiate_password_reset,
_log_in,
_log_out,
_LoggedInUser,
_OIDCServer,
_Password,
_patch_user,
_patch_viewer,
_Profile,
_randomize_casing,
_RefreshToken,
_RoleOrUser,
_SpanExporterFactory,
_Username,
_will_be_asked_to_reset_password,
)
NOW = datetime.now(timezone.utc)
_decode_jwt = partial(jwt.decode, options=dict(verify_signature=False))
_TokenT = TypeVar("_TokenT", _AccessToken, _RefreshToken)
class TestOIDC:
"""Tests for OpenID Connect (OIDC) authentication flow.
This class tests the OIDC sign-in and sign-up processes, including:
- User authentication via OIDC
- New user creation during OIDC sign-in
- Token generation and validation
- Handling of conflicts with existing users
- Configuration options like allow_sign_up
- Error handling for invalid credentials
"""
@pytest.mark.parametrize("allow_sign_up", [True, False])
async def test_sign_in(
self,
allow_sign_up: bool,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test the complete OIDC sign-in flow with different allow_sign_up settings.
This test verifies:
1. The OAuth2 flow redirects correctly to the OIDC provider
2. When allow_sign_up is True:
- A new user is created with MEMBER role
- Access and refresh tokens are generated
- Subsequent OIDC flows generate new tokens for the same user
3. When allow_sign_up is False:
- Users are redirected to login with an error message
- No access tokens are granted
- If a user without a password exists, they can still sign in
"""
client = _httpx_client(_app)
url = (
f"oauth2/{_oidc_server}/login"
if allow_sign_up
else f"oauth2/{_oidc_server}_no_sign_up/login"
)
# Start the OAuth2 flow
response = client.post(url)
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify required OAuth2 cookies are set (non-PKCE: state and nonce only)
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" not in cookies # PKCE not enabled
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert _oidc_server.user_email, "Fixture should have initialized a (random) user"
assert (email := sanitize_email(_oidc_server.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
# Complete the flow by calling the token endpoint
response = client.get(callback_url)
if not allow_sign_up:
# Verify that user is redirected to /login
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
# Verify sensitive cookies are cleaned up even on error
set_cookie_headers = response.headers.get_list("set-cookie")
assert any(
"phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers
)
assert any(
"phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers
)
# Create the user without password
# Casing should not matter
case_insensitive_email = _randomize_casing(email)
admin.create_user(
_app, profile=_Profile(case_insensitive_email, "", token_hex(8)), local=False
)
# If user go through OIDC flow again, access should be granted
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected not to /login
assert response.status_code == 302
assert "/login" not in response.headers["location"]
# Verify we got access
assert (access_token := response.cookies.get("phoenix-access-token"))
assert (refresh_token := response.cookies.get("phoenix-refresh-token"))
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
# If user go through OIDC flow again, new access token should be created
response = _httpx_client(_app, cookies=cookies).get(callback_url)
assert (new_access_token := response.cookies.get("phoenix-access-token"))
assert (new_refresh_token := response.cookies.get("phoenix-refresh-token"))
assert new_access_token != access_token
assert new_refresh_token != refresh_token
return
# Verify we got access
assert response.status_code == 302
assert (access_token := response.cookies.get("phoenix-access-token"))
assert (refresh_token := response.cookies.get("phoenix-refresh-token"))
# Verify sensitive cookies are cleaned up after successful authentication
set_cookie_headers = response.headers.get_list("set-cookie")
assert any("phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any("phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers)
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
# If user go through OIDC flow again, new access token should be created
response = _httpx_client(_app, cookies=cookies).get(callback_url)
assert (new_access_token := response.cookies.get("phoenix-access-token"))
assert (new_refresh_token := response.cookies.get("phoenix-refresh-token"))
assert new_access_token != access_token
assert new_refresh_token != refresh_token
@pytest.mark.parametrize("allow_sign_up", [True, False])
async def test_sign_in_conflict_for_local_user_with_password(
self,
allow_sign_up: bool,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test OIDC sign-in when a user with the same email already exists with password authentication.
This test verifies:
1. The system detects the email conflict with an existing user
2. The user is redirected to the login page with an appropriate error message
3. No access tokens are granted to the OIDC user
4. The existing user's credentials remain unchanged
5. This behavior is consistent regardless of the allow_sign_up setting
"""
client = _httpx_client(_app)
url = (
f"oauth2/{_oidc_server}/login"
if allow_sign_up
else f"oauth2/{_oidc_server}_no_sign_up/login"
)
# Start the OAuth2 flow
response = client.post(url)
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify required OAuth2 cookies are set (non-PKCE: state and nonce only)
assert "phoenix-oauth2-state" in response.cookies
assert "phoenix-oauth2-nonce" in response.cookies
assert "phoenix-oauth2-code-verifier" not in response.cookies # PKCE not enabled
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert (email := _oidc_server.user_email)
admin = _DEFAULT_ADMIN.log_in(_app)
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email not in users
# Create the user with password
admin.create_user(_app, profile=_Profile(email, token_hex(8), token_hex(8)))
# Verify that user is redirected to /login
response = client.get(callback_url)
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
# Verify sensitive cookies are cleaned up after conflict error
set_cookie_headers = response.headers.get_list("set-cookie")
assert any("phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any("phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers)
async def test_state_mismatch_is_rejected(
self,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that state parameter mismatch is rejected (CSRF protection).
This test verifies that an attacker cannot intercept the authorization code
and use it with a different state value. The state parameter binds the
authorization request to the token request, preventing CSRF attacks.
"""
client = _httpx_client(_app)
# Start the OAuth2 flow
response = client.post(f"oauth2/{_oidc_server}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
cookies = dict(response.cookies)
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Tamper with the state parameter in the callback URL
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
parsed = urlparse(callback_url)
query_params = parse_qs(parsed.query)
query_params["state"] = ["tampered_state_value"]
tampered_url = urlunparse(
(
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
urlencode(query_params, doseq=True),
parsed.fragment,
)
)
# Try to complete the flow with tampered state
response = _httpx_client(_app, cookies=cookies).get(tampered_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
assert "error=" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
async def test_missing_state_cookie_is_rejected(
self,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that missing state cookie causes proper error handling.
This verifies that if a user's cookies are deleted or expire before
completing the OAuth2 flow, the system handles it gracefully.
"""
client = _httpx_client(_app)
# Start the OAuth2 flow
response = client.post(f"oauth2/{_oidc_server}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Try to complete the flow WITHOUT cookies (simulating deleted cookies)
response = _httpx_client(_app).get(callback_url)
# Verify proper error handling (422 Unprocessable Entity for missing cookie)
assert response.status_code == 422
async def test_unsafe_return_url_is_rejected(
self,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that absolute URLs in returnUrl parameter are rejected (open redirect protection).
This test verifies protection against open redirect attacks where an attacker
could trick users into authenticating and then redirect them to a malicious site.
"""
client = _httpx_client(_app)
# Try to start OAuth2 flow with an absolute URL (potential open redirect)
response = client.post(
f"oauth2/{_oidc_server}/login",
params={"returnUrl": "https://evil.com/phishing"},
)
assert response.status_code == 302
auth_url = response.headers["location"]
cookies = dict(response.cookies)
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Complete the flow
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected to login with error about unsafe URL
assert response.status_code == 307
assert "/login" in response.headers["location"]
assert "unsafe" in response.headers["location"].lower()
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
async def test_unknown_idp_is_rejected(
self,
_app: _AppInfo,
) -> None:
"""Test that requests to unknown identity providers are rejected.
This verifies that the system validates the IDP name and returns
a proper redirect to the login page with an error code.
"""
client = _httpx_client(_app)
# Try to start OAuth2 flow with unknown IDP
response = client.post("oauth2/non_existent_idp/login")
# Should redirect to /login with error code
assert response.status_code == 307
assert "/login" in response.headers["location"]
assert "error=unknown_idp" in response.headers["location"]
async def test_cookie_security_attributes(
self,
_oidc_server: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that OAuth2 cookies have proper security attributes.
This verifies that cookies are set with HttpOnly, SameSite, and appropriate
Max-Age values to protect against XSS and CSRF attacks.
"""
client = _httpx_client(_app)
# Start the OAuth2 flow
response = client.post(f"oauth2/{_oidc_server}/login")
assert response.status_code == 302
# Verify cookie security attributes
set_cookie_headers = response.headers.get_list("set-cookie")
# Check state cookie
state_cookie = next((h for h in set_cookie_headers if "phoenix-oauth2-state=" in h), None)
assert state_cookie is not None
assert "HttpOnly" in state_cookie
assert "SameSite=lax" in state_cookie or "SameSite=Lax" in state_cookie
assert "Max-Age=900" in state_cookie # 15 minutes
# Check nonce cookie
nonce_cookie = next((h for h in set_cookie_headers if "phoenix-oauth2-nonce=" in h), None)
assert nonce_cookie is not None
assert "HttpOnly" in nonce_cookie
assert "SameSite=lax" in nonce_cookie or "SameSite=Lax" in nonce_cookie
assert "Max-Age=900" in nonce_cookie
async def test_oidc_with_groups_access_granted(
self,
_oidc_server_standard_with_groups: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test standard OIDC with group-based access control - user HAS matching group.
This test verifies:
1. OIDC server returns groups claim ["engineering", "operations"]
2. Phoenix extracts groups using JMESPath (groups)
3. Phoenix checks against ALLOWED_GROUPS ("engineering,admin")
4. User has "engineering" → access GRANTED
"""
client = _httpx_client(_app)
# Start the OAuth2 OIDC flow with groups
response = client.post(f"oauth2/{_oidc_server_standard_with_groups}_granted/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify OAuth2 state and nonce cookies are set (no code_verifier for standard OIDC)
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" not in cookies # OIDC doesn't use PKCE
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert _oidc_server_standard_with_groups.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_standard_with_groups.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify we got access tokens (user has matching group)
assert response.status_code == 302
assert response.cookies.get("phoenix-access-token")
assert response.cookies.get("phoenix-refresh-token")
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
async def test_oidc_with_groups_access_denied(
self,
_oidc_server_standard_with_groups: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test standard OIDC with group-based access control - user does NOT have matching group.
This test verifies:
1. OIDC server returns groups claim ["engineering", "operations"]
2. Phoenix extracts groups using JMESPath (groups)
3. Phoenix checks against ALLOWED_GROUPS ("admin,sales")
4. User has NO matching groups → access DENIED
"""
client = _httpx_client(_app)
# Start the OAuth2 OIDC flow with groups
response = client.post(f"oauth2/{_oidc_server_standard_with_groups}_denied/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify OAuth2 state and nonce cookies are set (no code_verifier for standard OIDC)
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" not in cookies # OIDC doesn't use PKCE
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
# Verify that the user was NOT created
assert _oidc_server_standard_with_groups.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_standard_with_groups.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
class TestPKCE:
"""Test PKCE (Proof Key for Code Exchange) OAuth2 flow.
These tests verify that Phoenix correctly handles PKCE for both:
- Public clients that cannot securely store a client_secret
- Confidential clients using PKCE for defense-in-depth security
PKCE adds code_challenge/code_verifier validation to protect against
authorization code interception attacks.
"""
async def test_pkce_public_client_flow(
self,
_oidc_server_pkce_public: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test PKCE flow with a public client (no client_secret).
This test verifies:
1. Phoenix generates code_verifier and code_challenge
2. OIDC server receives code_challenge in authorization request
3. Phoenix sends code_verifier in token request (no client_secret)
4. Server validates code_verifier matches code_challenge
5. User is successfully authenticated and tokens are issued
6. Sensitive cookies (state, nonce, code_verifier) are cleaned up
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow
response = client.post(f"oauth2/{_oidc_server_pkce_public}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify all required OAuth2 cookies are set
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" in cookies
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert _oidc_server_pkce_public.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_pkce_public.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify we got access tokens
assert response.status_code == 302
assert response.cookies.get("phoenix-access-token")
assert response.cookies.get("phoenix-refresh-token")
# Verify sensitive cookies are cleaned up (security: remove ephemeral crypto material)
# Check Set-Cookie headers for deletion (Max-Age=0)
set_cookie_headers = response.headers.get_list("set-cookie")
assert any("phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any("phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any(
"phoenix-oauth2-code-verifier=" in h and "Max-Age=0" in h for h in set_cookie_headers
)
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
async def test_pkce_confidential_client_flow(
self,
_oidc_server_pkce_confidential: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test PKCE flow with a confidential client (defense-in-depth).
This test verifies defense-in-depth: both client_secret AND code_verifier
are validated. Even if the client_secret is compromised, the attacker
cannot exchange an intercepted authorization code without the code_verifier.
This test verifies:
1. Phoenix generates code_verifier and code_challenge (PKCE)
2. Phoenix sends client_secret (traditional OAuth2)
3. Server validates BOTH client_secret AND code_verifier
4. User is successfully authenticated and tokens are issued
5. Sensitive cookies (state, nonce, code_verifier) are cleaned up
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow with confidential client
response = client.post(f"oauth2/{_oidc_server_pkce_confidential}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify all required OAuth2 cookies are set
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" in cookies
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert _oidc_server_pkce_confidential.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_pkce_confidential.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify we got access tokens
assert response.status_code == 302
assert response.cookies.get("phoenix-access-token")
assert response.cookies.get("phoenix-refresh-token")
# Verify sensitive cookies are cleaned up (security: remove ephemeral crypto material)
# Check Set-Cookie headers for deletion (Max-Age=0)
set_cookie_headers = response.headers.get_list("set-cookie")
assert any("phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any("phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any(
"phoenix-oauth2-code-verifier=" in h and "Max-Age=0" in h for h in set_cookie_headers
)
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
async def test_pkce_code_verifier_mismatch_rejected(
self,
_oidc_server_pkce_public: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that invalid code_verifier is rejected.
This test verifies that if an attacker intercepts the authorization code
and tries to exchange it with a wrong code_verifier, the server rejects it.
Security scenario: An attacker intercepts the authorization code from the
redirect URL but doesn't have the code_verifier from the cookie. The server
should reject the token exchange request.
Also verifies that sensitive cookies are cleaned up even on error.
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow
response = client.post(f"oauth2/{_oidc_server_pkce_public}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify all required OAuth2 cookies are set
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" in cookies
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Try to complete the flow with state and nonce but WITHOUT code_verifier
# This simulates an attacker who intercepts the callback URL and has the state/nonce
# cookies (from the same browser session) but doesn't have the code_verifier
cookies_without_verifier = {k: v for k, v in cookies.items() if "code-verifier" not in k}
response = _httpx_client(_app, cookies=cookies_without_verifier).get(callback_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
# Verify sensitive cookies are cleaned up even on error (security: prevent reuse)
# Check Set-Cookie headers for deletion (Max-Age=0)
set_cookie_headers = response.headers.get_list("set-cookie")
assert any("phoenix-oauth2-state=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any("phoenix-oauth2-nonce=" in h and "Max-Age=0" in h for h in set_cookie_headers)
assert any(
"phoenix-oauth2-code-verifier=" in h and "Max-Age=0" in h for h in set_cookie_headers
)
async def test_pkce_with_groups_access_granted(
self,
_oidc_server_pkce_with_groups: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test PKCE with group-based access control - user HAS matching group.
This test verifies:
1. OIDC server returns groups claim ["engineering", "operations"]
2. Phoenix extracts groups using JMESPath (groups)
3. Phoenix checks against ALLOWED_GROUPS ("engineering,admin")
4. User has "engineering" → access GRANTED
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow with groups
response = client.post(f"oauth2/{_oidc_server_pkce_with_groups}_granted/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify all required OAuth2 cookies are set
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" in cookies
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Verify that the user is not already created
assert _oidc_server_pkce_with_groups.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_pkce_with_groups.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify we got access tokens (user has matching group)
assert response.status_code == 302
assert response.cookies.get("phoenix-access-token")
assert response.cookies.get("phoenix-refresh-token")
# Verify that the user was created
users = {u.profile.email: u for u in admin.list_users(_app)}
assert email in users
assert users[email].role is UserRoleInput.MEMBER
async def test_pkce_with_groups_access_denied(
self,
_oidc_server_pkce_with_groups: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test PKCE with group-based access control - user does NOT have matching group.
This test verifies:
1. OIDC server returns groups claim ["engineering", "operations"]
2. Phoenix extracts groups using JMESPath (groups)
3. Phoenix checks against ALLOWED_GROUPS ("admin,sales")
4. User has NO matching groups → access DENIED
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow with groups
response = client.post(f"oauth2/{_oidc_server_pkce_with_groups}_denied/login")
assert response.status_code == 302
auth_url = response.headers["location"]
# Verify all required OAuth2 cookies are set
cookies = dict(response.cookies)
assert "phoenix-oauth2-state" in cookies
assert "phoenix-oauth2-nonce" in cookies
assert "phoenix-oauth2-code-verifier" in cookies
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Complete the flow by calling the token endpoint
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
# Verify that the user was NOT created
assert _oidc_server_pkce_with_groups.user_email, (
"Fixture should have initialized a (random) user"
)
assert (email := sanitize_email(_oidc_server_pkce_with_groups.user_email))
admin = _DEFAULT_ADMIN.log_in(_app)
users = {sanitize_email(u.profile.email): u for u in admin.list_users(_app)}
assert email not in users
async def test_pkce_wrong_code_verifier_is_rejected(
self,
_oidc_server_pkce_public: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that wrong code_verifier is rejected (not just missing).
This test verifies that an attacker who intercepts the authorization code
and tries to use their own code_verifier will be rejected by the OIDC server.
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow
response = client.post(f"oauth2/{_oidc_server_pkce_public}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
cookies = dict(response.cookies)
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Replace code_verifier with a wrong value
cookies["phoenix-oauth2-code-verifier"] = "wrong_code_verifier_value_12345"
# Try to complete the flow with wrong code_verifier
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
assert "error=" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
async def test_pkce_missing_code_verifier_cookie_is_rejected(
self,
_oidc_server_pkce_public: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that missing code_verifier cookie in PKCE flow causes error.
This verifies that for PKCE-enabled flows, the code_verifier cookie
is required and the system properly rejects requests without it.
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow
response = client.post(f"oauth2/{_oidc_server_pkce_public}/login")
assert response.status_code == 302
auth_url = response.headers["location"]
cookies = dict(response.cookies)
# Follow the redirect to the OIDC server
response = client.get(auth_url)
assert response.status_code == 302
callback_url = response.headers["location"]
# Remove code_verifier cookie (keep state and nonce)
cookies.pop("phoenix-oauth2-code-verifier", None)
# Try to complete the flow without code_verifier
response = _httpx_client(_app, cookies=cookies).get(callback_url)
# Verify that user is redirected to login with error
assert response.status_code == 307
assert "/login" in response.headers["location"]
# Verify no access is granted
assert not response.cookies.get("phoenix-access-token")
assert not response.cookies.get("phoenix-refresh-token")
async def test_pkce_code_verifier_cookie_security_attributes(
self,
_oidc_server_pkce_public: _OIDCServer,
_app: _AppInfo,
) -> None:
"""Test that PKCE code_verifier cookie has proper security attributes.
This verifies that the code_verifier cookie (which contains sensitive
cryptographic material) is set with HttpOnly, SameSite, and appropriate
Max-Age values.
"""
client = _httpx_client(_app)
# Start the OAuth2 PKCE flow
response = client.post(f"oauth2/{_oidc_server_pkce_public}/login")
assert response.status_code == 302
# Verify cookie security attributes
set_cookie_headers = response.headers.get_list("set-cookie")
# Check code_verifier cookie
verifier_cookie = next(
(h for h in set_cookie_headers if "phoenix-oauth2-code-verifier=" in h), None
)
assert verifier_cookie is not None
assert "HttpOnly" in verifier_cookie
assert "SameSite=lax" in verifier_cookie or "SameSite=Lax" in verifier_cookie
assert "Max-Age=900" in verifier_cookie # 15 minutes
class TestMockOIDCServer:
"""Tests for mock _OIDCServer behavior (not Phoenix).
These tests verify that our mock OIDC server correctly simulates the distinction
between PKCE and standard OIDC flows. This ensures that other integration tests
are actually testing Phoenix's behavior correctly by providing realistic mock
server responses.
"""
async def test_pkce_server_rejects_non_pkce_flow(
self,
_oidc_server_pkce_public: _OIDCServer,
) -> None:
"""Test that mock PKCE-enabled server rejects token requests without code_verifier.
This verifies that the mock _OIDCServer, when configured with PKCE enabled,
properly rejects token exchange requests that don't include the PKCE code_verifier,
even if they have a valid authorization code. This ensures the mock server
accurately simulates real OIDC provider behavior for testing purposes.
"""
import httpx
# Get authorization code by doing the auth flow
client = httpx.Client(verify=False)
# Start auth request with code_challenge
from base64 import urlsafe_b64encode
from hashlib import sha256
code_verifier = "test_verifier_1234567890_abcdefghijklmnopqrstuvwxyz"
code_challenge = (
urlsafe_b64encode(sha256(code_verifier.encode()).digest()).decode().rstrip("=")
)
# Get authorization code from server
auth_response = client.get(
f"{_oidc_server_pkce_public.base_url}/auth",
params={
"client_id": _oidc_server_pkce_public.client_id,
"response_type": "code",
"redirect_uri": "http://localhost/callback",
"state": "test_state",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
},
)
assert auth_response.status_code == 302
# Extract authorization code from redirect
from urllib.parse import parse_qs, urlparse
callback_url = auth_response.headers["location"]
query_params = parse_qs(urlparse(callback_url).query)
auth_code = query_params["code"][0]
# Try to exchange code for token WITHOUT code_verifier (standard OIDC flow)
token_response = client.post(
f"{_oidc_server_pkce_public.base_url}/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "http://localhost/callback",
"client_id": _oidc_server_pkce_public.client_id,
# NOT including code_verifier - this should be rejected
},
)
# Server should reject the request
assert token_response.status_code == 400
error_data = token_response.json()
assert error_data.get("error") == "invalid_request"
assert "code_verifier" in error_data.get("error_description", "").lower()
async def test_standard_oidc_server_rejects_pkce_parameters(
self,
_oidc_server_standard: _OIDCServer,
) -> None:
"""Test that mock standard OIDC server rejects token requests with code_verifier.
This verifies that the mock _OIDCServer, when configured for standard OIDC (not PKCE),
properly rejects token exchange requests that include PKCE parameters (code_verifier)
when PKCE is not enabled. This ensures the mock server accurately simulates real
OIDC provider behavior for testing purposes.
"""
import httpx
# Get authorization code by doing standard OAuth flow (no PKCE)
client = httpx.Client(verify=False)
# Get authorization code from server (standard flow, no code_challenge)
auth_response = client.get(
f"{_oidc_server_standard.base_url}/auth",
params={
"client_id": _oidc_server_standard.client_id,
"response_type": "code",
"redirect_uri": "http://localhost/callback",
"state": "test_state",
# NO code_challenge - standard OIDC flow
},
)
assert auth_response.status_code == 302
# Extract authorization code from redirect
from urllib.parse import parse_qs, urlparse
callback_url = auth_response.headers["location"]
query_params = parse_qs(urlparse(callback_url).query)
auth_code = query_params["code"][0]
# Try to exchange code for token WITH code_verifier (PKCE flow on non-PKCE server)
token_response = client.post(
f"{_oidc_server_standard.base_url}/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "http://localhost/callback",
"client_id": _oidc_server_standard.client_id,
"client_secret": _oidc_server_standard.client_secret,
"code_verifier": "some_pkce_verifier_12345", # Should be rejected
},
)
# Server should reject the request
assert token_response.status_code == 400
error_data = token_response.json()
assert error_data.get("error") == "invalid_request"
assert "code_verifier" in error_data.get("error_description", "").lower()
class TestTLS:
def test_non_tls_client_cannot_connect(self, _app: _AppInfo) -> None:
with pytest.raises(URLError) as e:
urlopen(_app.base_url)
assert "SSL" in str(e.value), f"Expected SSL error, got: {e.value}"
class TestOriginAndReferer:
@pytest.mark.parametrize(
"headers,expectation",
[
[dict(), _OK],
[dict(origin="http://localhost"), _OK],
[dict(referer="http://localhost/xyz"), _OK],
[dict(origin="http://xyz.com"), _EXPECTATION_401],
[dict(referer="http://xyz.com/xyz"), _EXPECTATION_401],
[dict(origin="http://xyz.com", referer="http://localhost/xyz"), _EXPECTATION_401],
[dict(origin="http://localhost", referer="http://xyz.com/xyz"), _EXPECTATION_401],
],
)
def test_csrf_origin_validation(
self,
headers: dict[str, str],
expectation: AbstractContextManager[Any],
_app: _AppInfo,
) -> None:
resp = _httpx_client(_app, headers=headers).get("/healthz")
with expectation:
resp.raise_for_status()
class TestLogIn:
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_can_log_in(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
u.log_in(_app)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_can_log_in_more_than_once_simultaneously(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
for _ in range(10):
u.log_in(_app)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_can_log_in_with_case_insensitive_email(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
username, password = token_hex(8), token_hex(8)
email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com")
profile = _Profile(email=email, password=password, username=username)
u = _get_user(_app, role_or_user, profile=profile)
case_insensitive_email = _randomize_casing(u.email)
_log_in(_app, u.password, email=case_insensitive_email)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_cannot_log_in_with_empty_password(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
with _EXPECTATION_401:
_log_in(_app, "", email=u.email)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_cannot_log_in_with_wrong_password(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
assert (wrong_password := next(_passwords)) != u.password
with _EXPECTATION_401:
_log_in(_app, wrong_password, email=u.email)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_cannot_log_in_with_deleted_user(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
user = _get_user(_app, role_or_user)
_delete_users(_app, _app.admin_secret, users=[user])
with _EXPECTATION_401:
user.log_in(_app)
class TestWelcomeEmail:
@pytest.mark.parametrize("role", list(UserRoleInput))
@pytest.mark.parametrize("send_welcome_email", [True, False])
def test_welcome_email_is_sent(
self,
role: UserRoleInput,
send_welcome_email: bool,
_get_user: _GetUser,
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
email = f"{token_hex(16)}@{token_hex(16)}.com"
profile = _Profile(email=email, password=token_hex(8), username=token_hex(8))
u = _create_user(
_app,
_get_user(_app, _ADMIN),
role=role,
profile=profile,
send_welcome_email=send_welcome_email,
)
if send_welcome_email:
assert _smtpd.messages
assert (msg := _smtpd.messages[-1])["to"] == u.email
assert (soup := _extract_html(msg))
assert isinstance((link := soup.find(id="welcome-url")), bs4.Tag)
assert isinstance((url := link.get("href")), str)
assert url == _app.base_url
else:
assert not _smtpd.messages or _smtpd.messages[-1]["to"] != u.email
class TestPasswordReset:
def test_initiate_password_reset_does_not_reveal_whether_user_exists(
self,
_emails: Iterator[_Email],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
email = next(_emails)
assert not _initiate_password_reset(_app, email, _smtpd, should_receive_email=False)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_initiate_password_reset_does_not_change_existing_password(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
assert u.initiate_password_reset(_app, _smtpd)
u.log_in(_app)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_initiate_password_reset_with_case_insensitive_email(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
username, password = token_hex(8), token_hex(8)
email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com")
profile = _Profile(email=email, password=password, username=username)
u = _get_user(_app, role_or_user, profile=profile)
case_insensitive_email = _randomize_casing(u.email)
assert _initiate_password_reset(_app, case_insensitive_email, _smtpd)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_password_reset_can_be_initiated_multiple_times(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
new_password = next(_passwords)
assert new_password != u.password
tokens = [u.initiate_password_reset(_app, _smtpd) for _ in range(2)]
assert sum(map(bool, tokens)) > 1
for i, token in enumerate(tokens):
assert token
if i < len(tokens) - 1:
with _EXPECTATION_401:
token.reset(_app, new_password)
continue
# only the last one works
token.reset(_app, new_password)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_password_reset_can_be_initiated_immediately_after_password_reset(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
new_password = next(_passwords)
assert new_password != u.password
assert (token := u.initiate_password_reset(_app, _smtpd))
token.reset(_app, new_password)
assert u.initiate_password_reset(_app, _smtpd)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_password_reset_token_is_single_use(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
new_password = next(_passwords)
assert new_password != u.password
newer_password = next(_passwords)
assert newer_password != new_password
assert (token := u.initiate_password_reset(_app, _smtpd))
token.reset(_app, new_password)
with _EXPECTATION_401:
token.reset(_app, newer_password)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_initiate_password_reset_and_then_reset_password_using_token_from_email(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
logged_in_user.visit(_app)
assert (token := u.initiate_password_reset(_app, _smtpd))
new_password = next(_passwords)
assert new_password != u.password
token.reset(_app, new_password)
with _EXPECTATION_401:
# old password should no longer work
u.log_in(_app)
# old logged-in tokens should no longer work
logged_in_user.visit(_app, 401)
# new password should work
new_profile = replace(u.profile, password=new_password)
new_u = replace(u, profile=new_profile)
new_u.log_in(_app)
assert not _will_be_asked_to_reset_password(_app, new_u)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_deleted_user_will_not_receive_email_after_initiating_password_reset(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
logged_in_user.visit(_app)
_DEFAULT_ADMIN.delete_users(_app, u)
assert not u.initiate_password_reset(_app, _smtpd, should_receive_email=False)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_deleted_user_cannot_reset_password_using_token_from_email(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_smtpd: smtpdfix.AuthController,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
logged_in_user.visit(_app)
assert (token := u.initiate_password_reset(_app, _smtpd))
new_password = next(_passwords)
assert new_password != u.password
_DEFAULT_ADMIN.delete_users(_app, u)
with _EXPECTATION_401:
token.reset(_app, new_password)
class TestLogOut:
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_can_log_out(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_users = [u.log_in(_app) for _ in range(2)]
for logged_in_user in logged_in_users:
logged_in_user.visit(_app)
logged_in_users[0].log_out(_app)
for logged_in_user in logged_in_users:
logged_in_user.visit(_app, 401)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_can_log_out_with_only_refresh_token(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
refresh_token = u.log_in(_app).tokens.refresh_token
# Explicitly check for 302 response from logout
client = _httpx_client(_app, refresh_token)
response = client.get("auth/logout", follow_redirects=False)
assert response.status_code == 302
# Optionally, check the redirect location
assert response.headers["location"] in ("/login", "/logout")
def test_log_out_does_not_raise_exception(self, _app: _AppInfo) -> None:
_log_out(_app)
class TestLoggedInTokens:
class _JtiSet(Generic[_TokenT]):
def __init__(self) -> None:
self._set: set[str] = set()
def add(self, token: _TokenT) -> None:
assert (jti := _decode_jwt(token)["jti"]) not in self._set
self._set.add(jti)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_logged_in_tokens_should_change_after_log_out(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
access_tokens = self._JtiSet[_AccessToken]()
refresh_tokens = self._JtiSet[_RefreshToken]()
u = _get_user(_app, role_or_user)
for _ in range(2):
logged_in_user = u.log_in(_app)
access_tokens.add(logged_in_user.tokens.access_token)
refresh_tokens.add(logged_in_user.tokens.refresh_token)
logged_in_user.log_out(_app)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_logged_in_tokens_should_differ_between_users(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
access_tokens = self._JtiSet[_AccessToken]()
refresh_tokens = self._JtiSet[_RefreshToken]()
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
access_tokens.add(logged_in_user.tokens.access_token)
refresh_tokens.add(logged_in_user.tokens.refresh_token)
logged_in_user.log_out(_app)
other_user = _get_user(_app, role)
logged_in_user = other_user.log_in(_app)
access_tokens.add(logged_in_user.tokens.access_token)
refresh_tokens.add(logged_in_user.tokens.refresh_token)
logged_in_user.log_out(_app)
def test_corrupt_tokens_are_not_accepted(self, _app: _AppInfo) -> None:
parts = _DEFAULT_ADMIN.log_in(_app).tokens.access_token.split(".")
# delete last 3 characters because base64 could have up to 2 padding characters
bad_headers = _AccessToken(f"{parts[0][:-3]}.{parts[1]}.{parts[2]}")
with _EXPECTATION_401:
_create_api_key(_app, bad_headers)
bad_payload = _AccessToken(f"{parts[0]}.{parts[1][:-3]}.{parts[2]}")
with _EXPECTATION_401:
_create_api_key(_app, bad_payload)
class TestRefreshToken:
@pytest.mark.parametrize("role_or_user", list(UserRoleInput))
def test_end_to_end_credentials_flow(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_users: defaultdict[int, dict[int, _LoggedInUser]] = defaultdict(dict)
# user logs into first browser
logged_in_users[0][0] = u.log_in(_app)
# tokens are refreshed in the first browser
logged_in_users[0][1] = logged_in_users[0][0].refresh(_app)
# user can visit the app
logged_in_users[0][1].visit(_app)
# refresh token is good for one use only
with pytest.raises(HTTPStatusError):
logged_in_users[0][0].refresh(_app)
# original access token is invalid after refresh
logged_in_users[0][0].visit(_app, 401)
# user logs into second browser
logged_in_users[1][0] = u.log_in(_app)
# user can visit the app
logged_in_users[1][0].visit(_app)
# user logs out in first browser
logged_in_users[0][1].log_out(_app)
# user is logged out of both browsers
logged_in_users[0][1].visit(_app, 401)
logged_in_users[1][0].visit(_app, 401)
class TestCreateUser:
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_cannot_create_user_without_access(
self,
role: UserRoleInput,
_profiles: Iterator[_Profile],
_app: _AppInfo,
) -> None:
profile = next(_profiles)
with _EXPECTATION_401:
_create_user(_app, role=role, profile=profile)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_only_admin_can_create_user(
self,
role_or_user: UserRoleInput,
role: UserRoleInput,
expectation: AbstractContextManager[Optional[Unauthorized]],
_get_user: _GetUser,
_profiles: Iterator[_Profile],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
profile = next(_profiles)
with expectation as e:
new_user = logged_in_user.create_user(_app, role, profile=profile)
if not e:
new_user.log_in(_app)
assert _will_be_asked_to_reset_password(_app, new_user)
@pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN])
def test_cannot_create_duplicate_user_with_different_email_case(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
admin = _get_user(_app, role_or_user).log_in(_app)
# Create first user
username, password = token_hex(8), token_hex(8)
email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com")
profile = _Profile(email=email, password=password, username=username)
admin.create_user(_app, profile=profile)
# Try to create second user with same email but different case
case_different_profile = replace(profile, email=_randomize_casing(profile.email))
with pytest.raises(Exception): # Should fail due to duplicate email
admin.create_user(_app, profile=case_different_profile)
class TestPatchViewer:
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_cannot_patch_viewer_without_access(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
with _EXPECTATION_401:
_patch_viewer(_app, None, u.password, new_username="new_username")
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_cannot_change_password_without_current_password(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
new_password = next(_passwords)
with pytest.raises(Exception):
_patch_viewer(_app, logged_in_user, None, new_password=new_password)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_cannot_change_password_with_wrong_current_password(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
assert (wrong_password := next(_passwords)) != logged_in_user.password
new_password = next(_passwords)
with pytest.raises(Exception):
_patch_viewer(_app, logged_in_user, wrong_password, new_password=new_password)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_change_password(
self,
role: UserRoleInput,
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role)
logged_in_user = u.log_in(_app)
new_password = f"new_password_{next(_passwords)}"
assert new_password != logged_in_user.password
_patch_viewer(
_app,
(old_token := logged_in_user.tokens),
(old_password := logged_in_user.password),
new_password=new_password,
)
another_password = f"another_password_{next(_passwords)}"
with _EXPECTATION_401:
# old tokens should no longer work
_patch_viewer(_app, old_token, new_password, new_password=another_password)
with _EXPECTATION_401:
# old password should no longer work
u.log_in(_app)
new_profile = replace(u.profile, password=new_password)
new_u = replace(u, profile=new_profile)
new_tokens = new_u.log_in(_app)
assert not _will_be_asked_to_reset_password(_app, new_u)
with pytest.raises(Exception):
# old password should no longer work, even with new tokens
_patch_viewer(_app, new_tokens, old_password, new_password=another_password)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_change_username(
self,
role: UserRoleInput,
_get_user: _GetUser,
_usernames: Iterator[_Username],
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role)
logged_in_user = u.log_in(_app)
new_username = f"new_username_{next(_usernames)}"
_patch_viewer(_app, logged_in_user, None, new_username=new_username)
another_username = f"another_username_{next(_usernames)}"
wrong_password = next(_passwords)
assert wrong_password != logged_in_user.password
_patch_viewer(_app, logged_in_user, wrong_password, new_username=another_username)
class TestPatchUser:
@pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN])
@pytest.mark.parametrize("new_role", list(UserRoleInput))
def test_cannot_change_role_of_default_admin(
self,
role_or_user: _RoleOrUser,
new_role: UserRoleInput,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with pytest.raises(Exception, match="role"):
logged_in_user.patch_user(_app, _DEFAULT_ADMIN, new_role=new_role)
def test_admin_cannot_change_role_for_self(
self,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, _ADMIN)
logged_in_user = u.log_in(_app)
with pytest.raises(Exception, match="role"):
logged_in_user.patch_user(_app, u, new_role=_MEMBER)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
@pytest.mark.parametrize("new_role", list(UserRoleInput))
def test_only_admin_can_change_role_for_non_self(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
new_role: UserRoleInput,
expectation: AbstractContextManager[Optional[Unauthorized]],
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role)
assert non_self.gid != logged_in_user.gid
with _EXPECTATION_401:
_patch_user(_app, non_self, new_role=new_role)
with expectation:
logged_in_user.patch_user(_app, non_self, new_role=new_role)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_only_admin_can_change_password_for_non_self(
self,
role_or_user: UserRoleInput,
role: UserRoleInput,
expectation: AbstractContextManager[Optional[Unauthorized]],
_get_user: _GetUser,
_passwords: Iterator[_Password],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role)
assert non_self.gid != logged_in_user.gid
old_password = non_self.password
new_password = f"new_password_{next(_passwords)}"
assert new_password != old_password
with _EXPECTATION_401:
_patch_user(_app, non_self, new_password=new_password)
with expectation as e:
logged_in_user.patch_user(_app, non_self, new_password=new_password)
if e:
# password should still work
non_self.log_in(_app)
return
with _EXPECTATION_401:
# old password should no longer work
non_self.log_in(_app)
new_profile = replace(non_self.profile, password=new_password)
new_non_self = replace(non_self, profile=new_profile)
new_non_self.log_in(_app)
assert _will_be_asked_to_reset_password(_app, new_non_self)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_only_admin_can_change_username_for_non_self(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
expectation: AbstractContextManager[Optional[Unauthorized]],
_get_user: _GetUser,
_usernames: Iterator[_Username],
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role)
assert non_self.gid != logged_in_user.gid
old_username = non_self.username
new_username = f"new_username_{next(_usernames)}"
assert new_username != old_username
with _EXPECTATION_401:
_patch_user(_app, non_self, new_username=new_username)
with expectation:
logged_in_user.patch_user(_app, non_self, new_username=new_username)
@pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN])
@pytest.mark.parametrize("old_role", list(UserRoleInput))
@pytest.mark.parametrize("new_role", list(UserRoleInput))
def test_user_is_logged_out_when_role_changes(
self,
role_or_user: _RoleOrUser,
old_role: UserRoleInput,
new_role: UserRoleInput,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
"""Test that changing a user's role invalidates their existing tokens.
This is a security test to ensure that when a user's role changes,
their old tokens (which contain the old role) are immediately invalidated.
This prevents privilege escalation/retention vulnerabilities.
"""
if old_role == new_role:
pytest.skip("Skipping test when old_role == new_role")
# Create user with old_role and log them in
user = _get_user(_app, old_role)
logged_in_user = user.log_in(_app)
old_tokens = logged_in_user.tokens
# Verify user can access with old tokens
logged_in_user.visit(_app)
# Admin changes user's role
_patch_user(_app, user, _app.admin_secret, new_role=new_role)
# Old tokens should no longer work (user should be logged out)
logged_in_user.visit(_app, 401)
# User needs to log in again
new_logged_in_user = user.log_in(_app)
new_tokens = new_logged_in_user.tokens
# New tokens should be different
assert new_tokens.access_token != old_tokens.access_token
assert new_tokens.refresh_token != old_tokens.refresh_token
# New tokens should work
new_logged_in_user.visit(_app)
class TestDeleteUsers:
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_MEMBER, _DENIED),
(_ADMIN, pytest.raises(Exception)),
(_DEFAULT_ADMIN, pytest.raises(Exception)),
],
)
def test_cannot_delete_system_user(
self,
role_or_user: UserRoleInput,
expectation: _Expectation,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with expectation:
logged_in_user.delete_users(_app, _SYSTEM_USER_GID)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_MEMBER, _DENIED),
(_ADMIN, pytest.raises(Exception, match="Cannot delete the default admin user")),
(_DEFAULT_ADMIN, pytest.raises(Exception)),
],
)
def test_cannot_delete_default_admin_user(
self,
role_or_user: _RoleOrUser,
expectation: _Expectation,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with expectation:
logged_in_user.delete_users(_app, _DEFAULT_ADMIN)
_DEFAULT_ADMIN.log_in(_app)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_only_admin_can_delete_users(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role)
with expectation as e:
logged_in_user.delete_users(_app, non_self)
if e:
non_self.log_in(_app)
else:
with _EXPECTATION_401:
non_self.log_in(_app)
@pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN])
def test_error_is_raised_when_deleting_a_non_existent_user_id(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
phantom = _GqlId(GlobalID(type_name="User", node_id=str(999_999_999)))
user = _get_user(_app)
with pytest.raises(Exception, match="Some user IDs could not be found"):
logged_in_user.delete_users(_app, phantom, user)
user.log_in(_app)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_user_deletion_deletes_all_tokens(
self,
role: UserRoleInput,
_get_user: _GetUser,
_spans: Sequence[ReadableSpan],
_app: _AppInfo,
) -> None:
user = _get_user(_app, role)
logged_in_user = user.log_in(_app)
tokens = logged_in_user.tokens
logged_in_user.visit(_app)
_delete_users(_app, _app.admin_secret, users=[user])
with _EXPECTATION_401:
tokens.refresh(_app)
logged_in_user.visit(_app, 401)
class TestCreateApiKey:
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _OK),
(_MEMBER, _OK),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
def test_create_user_api_key(
self,
role_or_user: _RoleOrUser,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with expectation:
logged_in_user.create_api_key(_app)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
def test_only_admin_can_create_system_api_key(
self,
role_or_user: _RoleOrUser,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with expectation:
logged_in_user.create_api_key(_app, "System")
class TestDeleteApiKey:
@pytest.mark.parametrize("role_or_user", [_VIEWER, _MEMBER, _ADMIN, _DEFAULT_ADMIN])
def test_delete_user_api_key(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
api_key = logged_in_user.create_api_key(_app)
logged_in_user.delete_api_key(_app, api_key)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", [_MEMBER, _ADMIN])
def test_only_admin_can_delete_user_api_key_for_non_self(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role).log_in(_app)
assert non_self.gid != logged_in_user.gid
api_key = non_self.create_api_key(_app)
with expectation:
logged_in_user.delete_api_key(_app, api_key)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
def test_only_admin_can_delete_system_api_key(
self,
role_or_user: _RoleOrUser,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
api_key = _DEFAULT_ADMIN.create_api_key(_app, "System")
with expectation:
logged_in_user.delete_api_key(_app, api_key)
class TestGraphQLQuery:
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize(
"query",
[
"query{users{edges{node{id}}}}",
"query{userApiKeys{id}}",
"query{systemApiKeys{id}}",
],
)
def test_only_admin_can_list_users_and_api_keys(
self,
role_or_user: _RoleOrUser,
query: str,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
with expectation:
logged_in_user.gql(_app, query)
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_can_query_user_node_for_self(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
query = 'query{node(id:"' + u.gid + '"){__typename}}'
logged_in_user.gql(_app, query)
@pytest.mark.parametrize(
"role_or_user,expectation",
[
(_VIEWER, _DENIED),
(_MEMBER, _DENIED),
(_ADMIN, _OK),
(_DEFAULT_ADMIN, _OK),
],
)
@pytest.mark.parametrize("role", list(UserRoleInput))
def test_only_admin_can_query_user_node_for_non_self(
self,
role_or_user: _RoleOrUser,
role: UserRoleInput,
expectation: _OK_OR_DENIED,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, role_or_user)
logged_in_user = u.log_in(_app)
non_self = _get_user(_app, role)
query = 'query{node(id:"' + non_self.gid + '"){__typename}}'
with expectation:
logged_in_user.gql(_app, query)
class TestSpanExporters:
@pytest.mark.parametrize(
"use_api_key,expires_at,expected",
[
(True, NOW + timedelta(days=1), SpanExportResult.SUCCESS),
(True, None, SpanExportResult.SUCCESS),
(True, NOW, SpanExportResult.FAILURE),
(False, None, SpanExportResult.FAILURE),
],
)
def test_api_key(
self,
use_api_key: bool,
expires_at: Optional[datetime],
expected: SpanExportResult,
_span_exporter: _SpanExporterFactory,
_spans: Sequence[ReadableSpan],
_app: _AppInfo,
) -> None:
headers: Optional[_Headers] = None
api_key: Optional[_ApiKey] = None
if use_api_key:
api_key = _DEFAULT_ADMIN.create_api_key(_app, "System", expires_at=expires_at)
# Must use all lower case for `authorization` because
# otherwise it would crash the gRPC receiver.
headers = dict(authorization=f"Bearer {api_key}")
export = _span_exporter(_app, headers=headers).export
for _ in range(2):
assert export(_spans) is expected
if api_key and expected is SpanExportResult.SUCCESS:
_DEFAULT_ADMIN.delete_api_key(_app, api_key)
assert export(_spans) is SpanExportResult.FAILURE
@pytest.mark.parametrize(
"use_admin_secret,expected",
[
(True, SpanExportResult.SUCCESS),
(False, SpanExportResult.FAILURE),
],
)
def test_admin_secret(
self,
use_admin_secret: bool,
expected: SpanExportResult,
_span_exporter: _SpanExporterFactory,
_spans: Sequence[ReadableSpan],
_app: _AppInfo,
) -> None:
if use_admin_secret:
assert (api_key := _app.admin_secret)
else:
api_key = _AdminSecret("")
# Must use all lower case for `authorization` because
# otherwise it would crash the gRPC receiver.
headers = dict(authorization=f"Bearer {str(api_key)}")
export = _span_exporter(_app, headers=headers).export
assert export(_spans) is expected
class TestEmbeddingsRestApi:
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_authenticated_users_can_access_route(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
user = _get_user(_app, role_or_user)
logged_in_user = user.log_in(_app)
with _EXPECTATION_404: # no files have been exported
logged_in_user.export_embeddings(_app, "embeddings")
def test_unauthenticated_requests_receive_401(self, _app: _AppInfo) -> None:
with _EXPECTATION_401:
_export_embeddings(_app, None, filename="embeddings")
class TestPrompts:
def test_authenticated_users_are_recorded_in_prompts(
self,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
u = _get_user(_app, _MEMBER)
logged_in_user = u.log_in(_app)
# Create new prompt
response, _ = logged_in_user.gql(
_app,
query="""
mutation CreateChatPromptMutation($input: CreateChatPromptInput!) {
createChatPrompt(input: $input) {
id
promptVersions {
edges {
promptVersion: node {
user {
id
}
}
}
}
}
}
""",
variables={
"input": {
"name": "prompt-name",
"description": "prompt-description",
"promptVersion": {
"description": "prompt-version-description",
"templateFormat": "MUSTACHE",
"template": {
"messages": [
{
"role": "USER",
"content": [{"text": {"text": "hello world"}}],
}
]
},
"invocationParameters": {"temperature": 0.4},
"modelProvider": "OPENAI",
"modelName": "o1-mini",
},
}
},
)
prompt_id = response["data"]["createChatPrompt"]["id"]
prompt_versions = response["data"]["createChatPrompt"]["promptVersions"]["edges"]
assert len(prompt_versions) == 1
user = prompt_versions[0]["promptVersion"]["user"]
assert user is not None
assert user["id"] == logged_in_user.gid
# Create new version for existing prompt
response, _ = logged_in_user.gql(
_app,
query="""
mutation CreateChatPromptVersionMutation($input: CreateChatPromptVersionInput!) {
createChatPromptVersion(input: $input) {
promptVersions {
edges {
promptVersion: node {
user {
id
}
}
}
}
}
}
""",
variables={
"input": {
"promptId": prompt_id,
"promptVersion": {
"description": "new-version-description",
"templateFormat": "MUSTACHE",
"template": {
"messages": [
{
"role": "USER",
"content": [{"text": {"text": "new version"}}],
}
]
},
"invocationParameters": {"temperature": 0.4},
"modelProvider": "OPENAI",
"modelName": "o1-mini",
},
}
},
)
# Verify both versions record the user
prompt_versions = response["data"]["createChatPromptVersion"]["promptVersions"]["edges"]
assert len(prompt_versions) == 2
for version in prompt_versions:
user = version["promptVersion"]["user"]
assert user is not None
assert user["id"] == logged_in_user.gid
class TestSpanAnnotations:
QUERY = """
mutation CreateSpanAnnotations($input: [CreateSpanAnnotationInput!]!) {
createSpanAnnotations(input: $input) {
spanAnnotations {
...SpanAnnotationFields
}
}
}
mutation PatchSpanAnnotations($input: [PatchAnnotationInput!]!) {
patchSpanAnnotations(input: $input) {
spanAnnotations {
...SpanAnnotationFields
}
}
}
mutation DeleteSpanAnnotations($input: DeleteAnnotationsInput!) {
deleteSpanAnnotations(input: $input) {
spanAnnotations {
...SpanAnnotationFields
}
}
}
query GetSpanAnnotation($annotationId: ID!) {
spanAnnotation: node(id: $annotationId) {
... on SpanAnnotation {
...SpanAnnotationFields
}
}
}
fragment SpanAnnotationFields on SpanAnnotation {
id
name
score
label
explanation
annotatorKind
metadata
source
identifier
spanId
user {
id
email
username
}
}
"""
async def test_other_users_cannot_patch_and_only_creator_or_admin_can_delete(
self,
_existing_spans: Sequence[_ExistingSpan],
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
assert _existing_spans, "At least one existing span is required for this test"
span_gid, *_ = choice(_existing_spans)
annotation_creator = _get_user(_app, _MEMBER)
logged_in_annotation_creator = annotation_creator.log_in(_app)
member = _get_user(_app, _MEMBER)
logged_in_member = member.log_in(_app)
admin = _get_user(_app, _ADMIN)
logged_in_admin = admin.log_in(_app)
# Create span annotation
name = token_hex(8)
response, _ = logged_in_annotation_creator.gql(
_app,
query=self.QUERY,
operation_name="CreateSpanAnnotations",
variables={
"input": {
"spanId": str(span_gid),
"name": name,
"annotatorKind": "HUMAN",
"label": "correct",
"score": 1,
"explanation": "explanation",
"metadata": {},
"identifier": "identifier",
"source": "APP",
}
},
)
span_annotations = response["data"]["createSpanAnnotations"]["spanAnnotations"]
assert len(span_annotations) == 1
original_span_annotation = span_annotations[0]
annotation_id = original_span_annotation["id"]
# Only the user who created the annotation can patch
for user in [logged_in_member, logged_in_admin]:
with pytest.raises(RuntimeError) as exc_info:
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="PatchSpanAnnotations",
variables={
"input": {
"annotationId": annotation_id,
"name": f"patched-{name}",
"annotatorKind": "LLM",
"label": "incorrect",
"score": 0,
"explanation": "patched-explanation",
"metadata": {"patched": "key"},
"identifier": "patched-identifier",
}
},
)
assert "At least one span annotation is not associated with the current user." in str(
exc_info.value
)
# Check that the annotation remains unchanged
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="GetSpanAnnotation",
variables={"annotationId": annotation_id},
)
span_annotation = response["data"]["spanAnnotation"]
assert span_annotation == original_span_annotation
# Member who did not create the annotation cannot delete
with pytest.raises(RuntimeError) as exc_info:
logged_in_member.gql(
_app,
query=self.QUERY,
operation_name="DeleteSpanAnnotations",
variables={
"input": {
"annotationIds": [annotation_id],
}
},
)
assert "At least one span annotation is not associated with the current user." in str(
exc_info.value
)
# Check that the annotation remains unchanged
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="GetSpanAnnotation",
variables={"annotationId": annotation_id},
)
span_annotation = response["data"]["spanAnnotation"]
assert span_annotation == original_span_annotation
# Admin can delete
response, _ = logged_in_admin.gql(
_app,
query=self.QUERY,
operation_name="DeleteSpanAnnotations",
variables={
"input": {
"annotationIds": [annotation_id],
}
},
)
class TestTraceAnnotations:
QUERY = """
mutation CreateTraceAnnotations($input: [CreateTraceAnnotationInput!]!) {
createTraceAnnotations(input: $input) {
traceAnnotations {
...TraceAnnotationFields
}
}
}
mutation PatchTraceAnnotations($input: [PatchAnnotationInput!]!) {
patchTraceAnnotations(input: $input) {
traceAnnotations {
...TraceAnnotationFields
}
}
}
mutation DeleteTraceAnnotations($input: DeleteAnnotationsInput!) {
deleteTraceAnnotations(input: $input) {
traceAnnotations {
...TraceAnnotationFields
}
}
}
query GetTraceAnnotation($annotationId: ID!) {
traceAnnotation: node(id: $annotationId) {
... on TraceAnnotation {
...TraceAnnotationFields
}
}
}
fragment TraceAnnotationFields on TraceAnnotation {
id
name
score
label
explanation
annotatorKind
metadata
source
identifier
trace {
traceId
}
user {
id
email
username
}
}
"""
async def test_other_users_cannot_patch_and_only_creator_or_admin_can_delete(
self,
_existing_spans: Sequence[_ExistingSpan],
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
assert _existing_spans, "At least one existing span is required for this test"
existing_span = choice(_existing_spans)
trace_gid = existing_span.trace.id
annotation_creator = _get_user(_app, _MEMBER)
logged_in_annotation_creator = annotation_creator.log_in(_app)
member = _get_user(_app, _MEMBER)
logged_in_member = member.log_in(_app)
admin = _get_user(_app, _ADMIN)
logged_in_admin = admin.log_in(_app)
# Create trace annotation
response, _ = logged_in_annotation_creator.gql(
_app,
query=self.QUERY,
operation_name="CreateTraceAnnotations",
variables={
"input": {
"traceId": str(trace_gid),
"name": "trace-annotation-name",
"annotatorKind": "HUMAN",
"label": "correct",
"score": 1,
"explanation": "explanation",
"metadata": {},
"identifier": "identifier",
"source": "APP",
}
},
)
trace_annotations = response["data"]["createTraceAnnotations"]["traceAnnotations"]
assert len(trace_annotations) == 1
original_trace_annotation = trace_annotations[0]
annotation_id = original_trace_annotation["id"]
# Only the user who created the annotation can patch
for user in [logged_in_member, logged_in_admin]:
with pytest.raises(RuntimeError) as exc_info:
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="PatchTraceAnnotations",
variables={
"input": {
"annotationId": annotation_id,
"name": "patched-trace-annotation-name",
"annotatorKind": "LLM",
"label": "incorrect",
"score": 0,
"explanation": "patched-explanation",
"metadata": {"patched": "key"},
"identifier": "patched-identifier",
}
},
)
assert "At least one trace annotation is not associated with the current user." in str(
exc_info.value
)
# Check that the annotation remains unchanged
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="GetTraceAnnotation",
variables={"annotationId": annotation_id},
)
trace_annotation = response["data"]["traceAnnotation"]
assert trace_annotation == original_trace_annotation
# Member who did not create the annotation cannot delete
with pytest.raises(RuntimeError) as exc_info:
logged_in_member.gql(
_app,
query=self.QUERY,
operation_name="DeleteTraceAnnotations",
variables={
"input": {
"annotationIds": [annotation_id],
}
},
)
assert (
"At least one trace annotation is not associated with the current user "
"and the current user is not an admin." in str(exc_info.value)
)
# Check that the annotation remains unchanged
response, _ = user.gql(
_app,
query=self.QUERY,
operation_name="GetTraceAnnotation",
variables={"annotationId": annotation_id},
)
trace_annotation = response["data"]["traceAnnotation"]
assert trace_annotation == original_trace_annotation
# Admin can delete
response, _ = logged_in_admin.gql(
_app,
query=self.QUERY,
operation_name="DeleteTraceAnnotations",
variables={
"input": {
"annotationIds": [annotation_id],
}
},
)
class TestApiAccessViaCookiesOrApiKeys:
"""Tests REST API v1 access control using both cookie and API key authentication.
This test suite verifies that access restrictions are enforced consistently across
all user roles (Admin, Member, Viewer, Default Admin) at the v1 router level,
regardless of authentication method:
- Cookie-based authentication (access tokens from login)
- API key authentication (Bearer tokens)
The comprehensive test parametrized by role validates 54 total endpoints/operations:
Common Read Resources (28 GET endpoints):
- All roles have identical read access
- Covers: Projects, Datasets, Experiments, Prompts, Annotation Configs,
Evaluations, Spans, Annotations
- Tests both valid responses (200) and error cases (404, 422)
Admin-Only Endpoints (5 operations):
- Admins: Access granted (may get 200, 404, 422 based on request validity)
- Members/Viewers: Always 403 Forbidden
- User management: GET/POST/DELETE on /v1/users
- Project management: PUT/DELETE on /v1/projects
Write Operations (21 POST/PUT/DELETE operations):
- Viewers: Always 403 Forbidden (read-only access)
- Admins/Members: Access granted (may get 400, 404, 415, 422 based on request validity)
Error Handling:
- Invalid ID format: Returns 422 for GlobalID parsing errors
- Missing required headers/body: Returns 400 or 422
- Unsupported media type: Returns 415
- Resource not found: Returns 404
- Dynamic test IDs using token_hex(4) for test isolation
"""
@pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN])
def test_role_based_access_control(
self,
role_or_user: _RoleOrUser,
_get_user: _GetUser,
_app: _AppInfo,
) -> None:
"""Test role-based access control across all v1 API endpoints with cookies and API keys.
This comprehensive test verifies access control for all user roles (Admin, Member, Viewer,
Default Admin) across three categories of endpoints, using both authentication methods:
1. Cookie-based authentication (session tokens)
2. API key authentication (Bearer tokens)
Test Coverage:
Common Read Resources (28 GET endpoints):
- All roles should have identical read access to common resources
- Covers: Projects, Datasets, Experiments, Prompts, Annotation Configs,
Evaluations, Spans, and Annotations
- Tests both valid responses (200) and error cases (404, 422)
Admin-Only Endpoints (5 operations):
- Admins: Access granted (may get 200, 404, 422 based on request validity)
- Members/Viewers: Always 403 Forbidden (authorization denied)
- User management: GET /v1/users, POST /v1/users, DELETE /v1/users/{id}
- Project management: PUT /v1/projects/{id}, DELETE /v1/projects/{id}
Write Operations (21 POST/PUT/DELETE operations):
- Viewers: Always 403 Forbidden (read-only access)
- Admins/Members: Access granted (may get 400, 404, 415, 422 based on request validity)
- Covers: Datasets, Experiments, Prompts, Annotations, Evaluations,
Spans, Traces, and Project creation (POST /v1/projects)
This verifies that authorization is enforced consistently regardless of
authentication method. Each role maintains the same permissions whether
using cookies or API keys.
"""
user = _get_user(_app, role_or_user)
logged_in_user = user.log_in(_app)
api_key = logged_in_user.create_api_key(_app)
tokens = logged_in_user.tokens
is_admin = user.role is UserRoleInput.ADMIN or role_or_user is _DEFAULT_ADMIN
is_viewer = user.role is UserRoleInput.VIEWER
for client in (_httpx_client(_app, tokens), _httpx_client(_app, api_key)):
# Test 1: Common read resources - all roles should have identical access
for expected_status_code, method, endpoint in _COMMON_RESOURCE_ENDPOINTS:
assert expected_status_code not in (401, 403), (
f"Test misconfiguration: expected_status_code should not be "
f"401 or 403 (got {expected_status_code} for {method} {endpoint})"
)
endpoint = endpoint.format(token_hex(4))
response = client.request(method, endpoint)
assert response.status_code == expected_status_code, (
f"Expected {expected_status_code} but got {response.status_code} for {endpoint}"
)
# Test 2: Admin-only endpoints - only admins should have access
for expected_status_code, method, endpoint in _ADMIN_ONLY_ENDPOINTS:
endpoint = endpoint.format(token_hex(4))
response = client.request(method, endpoint)
if is_admin:
assert response.status_code == expected_status_code, (
f"Admin expected {expected_status_code} but got {response.status_code} "
f"for {method} {endpoint}"
)
else:
assert response.status_code == 403, (
f"Non-admin expected 403 but got {response.status_code} "
f"for {method} {endpoint}"
)
# Test 3: Write operations - viewers blocked, admins/members have access
for expected_status_code, method, endpoint in _VIEWER_BLOCKED_WRITE_OPERATIONS:
endpoint = endpoint.format(token_hex(4))
response = client.request(method, endpoint)
if is_viewer:
assert response.status_code == 403, (
f"Viewer expected 403 but got {response.status_code} for {method} {endpoint}"
)
else:
assert response.status_code == expected_status_code, (
f"Admin/Member expected {expected_status_code} but got {response.status_code} "
f"for {method} {endpoint}"
)