Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_auth.py62.6 kB
from __future__ import annotations import string from collections import defaultdict from collections.abc import Iterator, Sequence from contextlib import AbstractContextManager from dataclasses import replace from datetime import datetime, timedelta, timezone from functools import partial from random import choice from secrets import token_hex from typing import ( Any, Generic, Optional, TypeVar, ) from urllib.error import URLError from urllib.request import urlopen import bs4 import jwt import pytest import smtpdfix from httpx import HTTPStatusError from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult from strawberry.relay import GlobalID from phoenix.server.api.exceptions import Unauthorized from phoenix.server.api.input_types.UserRoleInput import UserRoleInput from .._helpers import ( _ADMIN, _ADMIN_ONLY_ENDPOINTS, _COMMON_RESOURCE_ENDPOINTS, _DEFAULT_ADMIN, _DENIED, _EXPECTATION_401, _EXPECTATION_404, _MEMBER, _OK, _OK_OR_DENIED, _SYSTEM_USER_GID, _VIEWER, _VIEWER_BLOCKED_WRITE_OPERATIONS, _AccessToken, _AdminSecret, _ApiKey, _AppInfo, _create_api_key, _create_user, _delete_users, _Email, _ExistingSpan, _Expectation, _export_embeddings, _extract_html, _GetUser, _GqlId, _Headers, _httpx_client, _initiate_password_reset, _log_in, _log_out, _LoggedInUser, _Password, _patch_user, _patch_viewer, _Profile, _randomize_casing, _RefreshToken, _RoleOrUser, _SpanExporterFactory, _Username, _will_be_asked_to_reset_password, ) NOW = datetime.now(timezone.utc) _decode_jwt = partial(jwt.decode, options=dict(verify_signature=False)) _TokenT = TypeVar("_TokenT", _AccessToken, _RefreshToken) class TestTLS: def test_non_tls_client_cannot_connect(self, _app: _AppInfo) -> None: with pytest.raises(URLError) as e: urlopen(_app.base_url) assert "SSL" in str(e.value), f"Expected SSL error, got: {e.value}" class TestOriginAndReferer: @pytest.mark.parametrize( "headers,expectation", [ [dict(), _OK], [dict(origin="http://localhost"), _OK], [dict(referer="http://localhost/xyz"), _OK], [dict(origin="http://xyz.com"), _EXPECTATION_401], [dict(referer="http://xyz.com/xyz"), _EXPECTATION_401], [dict(origin="http://xyz.com", referer="http://localhost/xyz"), _EXPECTATION_401], [dict(origin="http://localhost", referer="http://xyz.com/xyz"), _EXPECTATION_401], ], ) def test_csrf_origin_validation( self, headers: dict[str, str], expectation: AbstractContextManager[Any], _app: _AppInfo, ) -> None: resp = _httpx_client(_app, headers=headers).get("/healthz") with expectation: resp.raise_for_status() class TestLogIn: @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_can_log_in( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) u.log_in(_app) @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_can_log_in_more_than_once_simultaneously( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) for _ in range(10): u.log_in(_app) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_can_log_in_with_case_insensitive_email( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: username, password = token_hex(8), token_hex(8) email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com") profile = _Profile(email=email, password=password, username=username) u = _get_user(_app, role_or_user, profile=profile) case_insensitive_email = _randomize_casing(u.email) _log_in(_app, u.password, email=case_insensitive_email) @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_cannot_log_in_with_empty_password( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) with _EXPECTATION_401: _log_in(_app, "", email=u.email) @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_cannot_log_in_with_wrong_password( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) assert (wrong_password := next(_passwords)) != u.password with _EXPECTATION_401: _log_in(_app, wrong_password, email=u.email) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_cannot_log_in_with_deleted_user( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: user = _get_user(_app, role_or_user) _delete_users(_app, _app.admin_secret, users=[user]) with _EXPECTATION_401: user.log_in(_app) class TestWelcomeEmail: @pytest.mark.parametrize("role", list(UserRoleInput)) @pytest.mark.parametrize("send_welcome_email", [True, False]) def test_welcome_email_is_sent( self, role: UserRoleInput, send_welcome_email: bool, _get_user: _GetUser, _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: email = f"{token_hex(16)}@{token_hex(16)}.com" profile = _Profile(email=email, password=token_hex(8), username=token_hex(8)) u = _create_user( _app, _get_user(_app, _ADMIN), role=role, profile=profile, send_welcome_email=send_welcome_email, ) if send_welcome_email: assert _smtpd.messages assert (msg := _smtpd.messages[-1])["to"] == u.email assert (soup := _extract_html(msg)) assert isinstance((link := soup.find(id="welcome-url")), bs4.Tag) assert isinstance((url := link.get("href")), str) assert url == _app.base_url else: assert not _smtpd.messages or _smtpd.messages[-1]["to"] != u.email class TestPasswordReset: def test_initiate_password_reset_does_not_reveal_whether_user_exists( self, _emails: Iterator[_Email], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: email = next(_emails) assert not _initiate_password_reset(_app, email, _smtpd, should_receive_email=False) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_initiate_password_reset_does_not_change_existing_password( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) assert u.initiate_password_reset(_app, _smtpd) u.log_in(_app) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_initiate_password_reset_with_case_insensitive_email( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: username, password = token_hex(8), token_hex(8) email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com") profile = _Profile(email=email, password=password, username=username) u = _get_user(_app, role_or_user, profile=profile) case_insensitive_email = _randomize_casing(u.email) assert _initiate_password_reset(_app, case_insensitive_email, _smtpd) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_password_reset_can_be_initiated_multiple_times( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) new_password = next(_passwords) assert new_password != u.password tokens = [u.initiate_password_reset(_app, _smtpd) for _ in range(2)] assert sum(map(bool, tokens)) > 1 for i, token in enumerate(tokens): assert token if i < len(tokens) - 1: with _EXPECTATION_401: token.reset(_app, new_password) continue # only the last one works token.reset(_app, new_password) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_password_reset_can_be_initiated_immediately_after_password_reset( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) new_password = next(_passwords) assert new_password != u.password assert (token := u.initiate_password_reset(_app, _smtpd)) token.reset(_app, new_password) assert u.initiate_password_reset(_app, _smtpd) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_password_reset_token_is_single_use( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) new_password = next(_passwords) assert new_password != u.password newer_password = next(_passwords) assert newer_password != new_password assert (token := u.initiate_password_reset(_app, _smtpd)) token.reset(_app, new_password) with _EXPECTATION_401: token.reset(_app, newer_password) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_initiate_password_reset_and_then_reset_password_using_token_from_email( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) logged_in_user.visit(_app) assert (token := u.initiate_password_reset(_app, _smtpd)) new_password = next(_passwords) assert new_password != u.password token.reset(_app, new_password) with _EXPECTATION_401: # old password should no longer work u.log_in(_app) # old logged-in tokens should no longer work logged_in_user.visit(_app, 401) # new password should work new_profile = replace(u.profile, password=new_password) new_u = replace(u, profile=new_profile) new_u.log_in(_app) assert not _will_be_asked_to_reset_password(_app, new_u) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_deleted_user_will_not_receive_email_after_initiating_password_reset( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) logged_in_user.visit(_app) _DEFAULT_ADMIN.delete_users(_app, u) assert not u.initiate_password_reset(_app, _smtpd, should_receive_email=False) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_deleted_user_cannot_reset_password_using_token_from_email( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _smtpd: smtpdfix.AuthController, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) logged_in_user.visit(_app) assert (token := u.initiate_password_reset(_app, _smtpd)) new_password = next(_passwords) assert new_password != u.password _DEFAULT_ADMIN.delete_users(_app, u) with _EXPECTATION_401: token.reset(_app, new_password) class TestLogOut: @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_can_log_out( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_users = [u.log_in(_app) for _ in range(2)] for logged_in_user in logged_in_users: logged_in_user.visit(_app) logged_in_users[0].log_out(_app) for logged_in_user in logged_in_users: logged_in_user.visit(_app, 401) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_can_log_out_with_only_refresh_token( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) refresh_token = u.log_in(_app).tokens.refresh_token # Explicitly check for 302 response from logout client = _httpx_client(_app, refresh_token) response = client.get("auth/logout", follow_redirects=False) assert response.status_code == 302 # Optionally, check the redirect location assert response.headers["location"] in ("/login", "/logout") def test_log_out_does_not_raise_exception(self, _app: _AppInfo) -> None: _log_out(_app) class TestLoggedInTokens: class _JtiSet(Generic[_TokenT]): def __init__(self) -> None: self._set: set[str] = set() def add(self, token: _TokenT) -> None: assert (jti := _decode_jwt(token)["jti"]) not in self._set self._set.add(jti) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_logged_in_tokens_should_change_after_log_out( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: access_tokens = self._JtiSet[_AccessToken]() refresh_tokens = self._JtiSet[_RefreshToken]() u = _get_user(_app, role_or_user) for _ in range(2): logged_in_user = u.log_in(_app) access_tokens.add(logged_in_user.tokens.access_token) refresh_tokens.add(logged_in_user.tokens.refresh_token) logged_in_user.log_out(_app) @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_logged_in_tokens_should_differ_between_users( self, role_or_user: _RoleOrUser, role: UserRoleInput, _get_user: _GetUser, _app: _AppInfo, ) -> None: access_tokens = self._JtiSet[_AccessToken]() refresh_tokens = self._JtiSet[_RefreshToken]() u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) access_tokens.add(logged_in_user.tokens.access_token) refresh_tokens.add(logged_in_user.tokens.refresh_token) logged_in_user.log_out(_app) other_user = _get_user(_app, role) logged_in_user = other_user.log_in(_app) access_tokens.add(logged_in_user.tokens.access_token) refresh_tokens.add(logged_in_user.tokens.refresh_token) logged_in_user.log_out(_app) def test_corrupt_tokens_are_not_accepted(self, _app: _AppInfo) -> None: parts = _DEFAULT_ADMIN.log_in(_app).tokens.access_token.split(".") # delete last 3 characters because base64 could have up to 2 padding characters bad_headers = _AccessToken(f"{parts[0][:-3]}.{parts[1]}.{parts[2]}") with _EXPECTATION_401: _create_api_key(_app, bad_headers) bad_payload = _AccessToken(f"{parts[0]}.{parts[1][:-3]}.{parts[2]}") with _EXPECTATION_401: _create_api_key(_app, bad_payload) class TestRefreshToken: @pytest.mark.parametrize("role_or_user", list(UserRoleInput)) def test_end_to_end_credentials_flow( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_users: defaultdict[int, dict[int, _LoggedInUser]] = defaultdict(dict) # user logs into first browser logged_in_users[0][0] = u.log_in(_app) # tokens are refreshed in the first browser logged_in_users[0][1] = logged_in_users[0][0].refresh(_app) # user can visit the app logged_in_users[0][1].visit(_app) # refresh token is good for one use only with pytest.raises(HTTPStatusError): logged_in_users[0][0].refresh(_app) # original access token is invalid after refresh logged_in_users[0][0].visit(_app, 401) # user logs into second browser logged_in_users[1][0] = u.log_in(_app) # user can visit the app logged_in_users[1][0].visit(_app) # user logs out in first browser logged_in_users[0][1].log_out(_app) # user is logged out of both browsers logged_in_users[0][1].visit(_app, 401) logged_in_users[1][0].visit(_app, 401) class TestCreateUser: @pytest.mark.parametrize("role", list(UserRoleInput)) def test_cannot_create_user_without_access( self, role: UserRoleInput, _profiles: Iterator[_Profile], _app: _AppInfo, ) -> None: profile = next(_profiles) with _EXPECTATION_401: _create_user(_app, role=role, profile=profile) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_only_admin_can_create_user( self, role_or_user: UserRoleInput, role: UserRoleInput, expectation: AbstractContextManager[Optional[Unauthorized]], _get_user: _GetUser, _profiles: Iterator[_Profile], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) profile = next(_profiles) with expectation as e: new_user = logged_in_user.create_user(_app, role, profile=profile) if not e: new_user.log_in(_app) assert _will_be_asked_to_reset_password(_app, new_user) @pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN]) def test_cannot_create_duplicate_user_with_different_email_case( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: admin = _get_user(_app, role_or_user).log_in(_app) # Create first user username, password = token_hex(8), token_hex(8) email = _randomize_casing(f"{string.ascii_lowercase}@{token_hex(16)}.com") profile = _Profile(email=email, password=password, username=username) admin.create_user(_app, profile=profile) # Try to create second user with same email but different case case_different_profile = replace(profile, email=_randomize_casing(profile.email)) with pytest.raises(Exception): # Should fail due to duplicate email admin.create_user(_app, profile=case_different_profile) class TestPatchViewer: @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_cannot_patch_viewer_without_access( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) with _EXPECTATION_401: _patch_viewer(_app, None, u.password, new_username="new_username") @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_cannot_change_password_without_current_password( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) new_password = next(_passwords) with pytest.raises(Exception): _patch_viewer(_app, logged_in_user, None, new_password=new_password) @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_cannot_change_password_with_wrong_current_password( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) assert (wrong_password := next(_passwords)) != logged_in_user.password new_password = next(_passwords) with pytest.raises(Exception): _patch_viewer(_app, logged_in_user, wrong_password, new_password=new_password) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_change_password( self, role: UserRoleInput, _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role) logged_in_user = u.log_in(_app) new_password = f"new_password_{next(_passwords)}" assert new_password != logged_in_user.password _patch_viewer( _app, (old_token := logged_in_user.tokens), (old_password := logged_in_user.password), new_password=new_password, ) another_password = f"another_password_{next(_passwords)}" with _EXPECTATION_401: # old tokens should no longer work _patch_viewer(_app, old_token, new_password, new_password=another_password) with _EXPECTATION_401: # old password should no longer work u.log_in(_app) new_profile = replace(u.profile, password=new_password) new_u = replace(u, profile=new_profile) new_tokens = new_u.log_in(_app) assert not _will_be_asked_to_reset_password(_app, new_u) with pytest.raises(Exception): # old password should no longer work, even with new tokens _patch_viewer(_app, new_tokens, old_password, new_password=another_password) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_change_username( self, role: UserRoleInput, _get_user: _GetUser, _usernames: Iterator[_Username], _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role) logged_in_user = u.log_in(_app) new_username = f"new_username_{next(_usernames)}" _patch_viewer(_app, logged_in_user, None, new_username=new_username) another_username = f"another_username_{next(_usernames)}" wrong_password = next(_passwords) assert wrong_password != logged_in_user.password _patch_viewer(_app, logged_in_user, wrong_password, new_username=another_username) class TestPatchUser: @pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN]) @pytest.mark.parametrize("new_role", list(UserRoleInput)) def test_cannot_change_role_of_default_admin( self, role_or_user: _RoleOrUser, new_role: UserRoleInput, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with pytest.raises(Exception, match="role"): logged_in_user.patch_user(_app, _DEFAULT_ADMIN, new_role=new_role) def test_admin_cannot_change_role_for_self( self, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, _ADMIN) logged_in_user = u.log_in(_app) with pytest.raises(Exception, match="role"): logged_in_user.patch_user(_app, u, new_role=_MEMBER) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) @pytest.mark.parametrize("new_role", list(UserRoleInput)) def test_only_admin_can_change_role_for_non_self( self, role_or_user: _RoleOrUser, role: UserRoleInput, new_role: UserRoleInput, expectation: AbstractContextManager[Optional[Unauthorized]], _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role) assert non_self.gid != logged_in_user.gid with _EXPECTATION_401: _patch_user(_app, non_self, new_role=new_role) with expectation: logged_in_user.patch_user(_app, non_self, new_role=new_role) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_only_admin_can_change_password_for_non_self( self, role_or_user: UserRoleInput, role: UserRoleInput, expectation: AbstractContextManager[Optional[Unauthorized]], _get_user: _GetUser, _passwords: Iterator[_Password], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role) assert non_self.gid != logged_in_user.gid old_password = non_self.password new_password = f"new_password_{next(_passwords)}" assert new_password != old_password with _EXPECTATION_401: _patch_user(_app, non_self, new_password=new_password) with expectation as e: logged_in_user.patch_user(_app, non_self, new_password=new_password) if e: # password should still work non_self.log_in(_app) return with _EXPECTATION_401: # old password should no longer work non_self.log_in(_app) new_profile = replace(non_self.profile, password=new_password) new_non_self = replace(non_self, profile=new_profile) new_non_self.log_in(_app) assert _will_be_asked_to_reset_password(_app, new_non_self) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_only_admin_can_change_username_for_non_self( self, role_or_user: _RoleOrUser, role: UserRoleInput, expectation: AbstractContextManager[Optional[Unauthorized]], _get_user: _GetUser, _usernames: Iterator[_Username], _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role) assert non_self.gid != logged_in_user.gid old_username = non_self.username new_username = f"new_username_{next(_usernames)}" assert new_username != old_username with _EXPECTATION_401: _patch_user(_app, non_self, new_username=new_username) with expectation: logged_in_user.patch_user(_app, non_self, new_username=new_username) @pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN]) @pytest.mark.parametrize("old_role", list(UserRoleInput)) @pytest.mark.parametrize("new_role", list(UserRoleInput)) def test_user_is_logged_out_when_role_changes( self, role_or_user: _RoleOrUser, old_role: UserRoleInput, new_role: UserRoleInput, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that changing a user's role invalidates their existing tokens. This is a security test to ensure that when a user's role changes, their old tokens (which contain the old role) are immediately invalidated. This prevents privilege escalation/retention vulnerabilities. """ if old_role == new_role: pytest.skip("Skipping test when old_role == new_role") # Create user with old_role and log them in user = _get_user(_app, old_role) logged_in_user = user.log_in(_app) old_tokens = logged_in_user.tokens # Verify user can access with old tokens logged_in_user.visit(_app) # Admin changes user's role _patch_user(_app, user, _app.admin_secret, new_role=new_role) # Old tokens should no longer work (user should be logged out) logged_in_user.visit(_app, 401) # User needs to log in again new_logged_in_user = user.log_in(_app) new_tokens = new_logged_in_user.tokens # New tokens should be different assert new_tokens.access_token != old_tokens.access_token assert new_tokens.refresh_token != old_tokens.refresh_token # New tokens should work new_logged_in_user.visit(_app) class TestDeleteUsers: @pytest.mark.parametrize( "role_or_user,expectation", [ (_MEMBER, _DENIED), (_ADMIN, pytest.raises(Exception)), (_DEFAULT_ADMIN, pytest.raises(Exception)), ], ) def test_cannot_delete_system_user( self, role_or_user: UserRoleInput, expectation: _Expectation, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with expectation: logged_in_user.delete_users(_app, _SYSTEM_USER_GID) @pytest.mark.parametrize( "role_or_user,expectation", [ (_MEMBER, _DENIED), (_ADMIN, pytest.raises(Exception, match="Cannot delete the default admin user")), (_DEFAULT_ADMIN, pytest.raises(Exception)), ], ) def test_cannot_delete_default_admin_user( self, role_or_user: _RoleOrUser, expectation: _Expectation, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with expectation: logged_in_user.delete_users(_app, _DEFAULT_ADMIN) _DEFAULT_ADMIN.log_in(_app) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_only_admin_can_delete_users( self, role_or_user: _RoleOrUser, role: UserRoleInput, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role) with expectation as e: logged_in_user.delete_users(_app, non_self) if e: non_self.log_in(_app) else: with _EXPECTATION_401: non_self.log_in(_app) @pytest.mark.parametrize("role_or_user", [_ADMIN, _DEFAULT_ADMIN]) def test_error_is_raised_when_deleting_a_non_existent_user_id( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) phantom = _GqlId(GlobalID(type_name="User", node_id=str(999_999_999))) user = _get_user(_app) with pytest.raises(Exception, match="Some user IDs could not be found"): logged_in_user.delete_users(_app, phantom, user) user.log_in(_app) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_user_deletion_deletes_all_tokens( self, role: UserRoleInput, _get_user: _GetUser, _spans: Sequence[ReadableSpan], _app: _AppInfo, ) -> None: user = _get_user(_app, role) logged_in_user = user.log_in(_app) tokens = logged_in_user.tokens logged_in_user.visit(_app) _delete_users(_app, _app.admin_secret, users=[user]) with _EXPECTATION_401: tokens.refresh(_app) logged_in_user.visit(_app, 401) class TestCreateApiKey: @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _OK), (_MEMBER, _OK), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) def test_create_user_api_key( self, role_or_user: _RoleOrUser, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with expectation: logged_in_user.create_api_key(_app) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) def test_only_admin_can_create_system_api_key( self, role_or_user: _RoleOrUser, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with expectation: logged_in_user.create_api_key(_app, "System") class TestDeleteApiKey: @pytest.mark.parametrize("role_or_user", [_VIEWER, _MEMBER, _ADMIN, _DEFAULT_ADMIN]) def test_delete_user_api_key( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) api_key = logged_in_user.create_api_key(_app) logged_in_user.delete_api_key(_app, api_key) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", [_MEMBER, _ADMIN]) def test_only_admin_can_delete_user_api_key_for_non_self( self, role_or_user: _RoleOrUser, role: UserRoleInput, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role).log_in(_app) assert non_self.gid != logged_in_user.gid api_key = non_self.create_api_key(_app) with expectation: logged_in_user.delete_api_key(_app, api_key) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) def test_only_admin_can_delete_system_api_key( self, role_or_user: _RoleOrUser, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) api_key = _DEFAULT_ADMIN.create_api_key(_app, "System") with expectation: logged_in_user.delete_api_key(_app, api_key) class TestGraphQLQuery: @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize( "query", [ "query{users{edges{node{id}}}}", "query{userApiKeys{id}}", "query{systemApiKeys{id}}", ], ) def test_only_admin_can_list_users_and_api_keys( self, role_or_user: _RoleOrUser, query: str, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) with expectation: logged_in_user.gql(_app, query) @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_can_query_user_node_for_self( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) query = 'query{node(id:"' + u.gid + '"){__typename}}' logged_in_user.gql(_app, query) @pytest.mark.parametrize( "role_or_user,expectation", [ (_VIEWER, _DENIED), (_MEMBER, _DENIED), (_ADMIN, _OK), (_DEFAULT_ADMIN, _OK), ], ) @pytest.mark.parametrize("role", list(UserRoleInput)) def test_only_admin_can_query_user_node_for_non_self( self, role_or_user: _RoleOrUser, role: UserRoleInput, expectation: _OK_OR_DENIED, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, role_or_user) logged_in_user = u.log_in(_app) non_self = _get_user(_app, role) query = 'query{node(id:"' + non_self.gid + '"){__typename}}' with expectation: logged_in_user.gql(_app, query) class TestSpanExporters: @pytest.mark.parametrize( "use_api_key,expires_at,expected", [ (True, NOW + timedelta(days=1), SpanExportResult.SUCCESS), (True, None, SpanExportResult.SUCCESS), (True, NOW, SpanExportResult.FAILURE), (False, None, SpanExportResult.FAILURE), ], ) def test_api_key( self, use_api_key: bool, expires_at: Optional[datetime], expected: SpanExportResult, _span_exporter: _SpanExporterFactory, _spans: Sequence[ReadableSpan], _app: _AppInfo, ) -> None: headers: Optional[_Headers] = None api_key: Optional[_ApiKey] = None if use_api_key: api_key = _DEFAULT_ADMIN.create_api_key(_app, "System", expires_at=expires_at) # Must use all lower case for `authorization` because # otherwise it would crash the gRPC receiver. headers = dict(authorization=f"Bearer {api_key}") export = _span_exporter(_app, headers=headers).export for _ in range(2): assert export(_spans) is expected if api_key and expected is SpanExportResult.SUCCESS: _DEFAULT_ADMIN.delete_api_key(_app, api_key) assert export(_spans) is SpanExportResult.FAILURE @pytest.mark.parametrize( "use_admin_secret,expected", [ (True, SpanExportResult.SUCCESS), (False, SpanExportResult.FAILURE), ], ) def test_admin_secret( self, use_admin_secret: bool, expected: SpanExportResult, _span_exporter: _SpanExporterFactory, _spans: Sequence[ReadableSpan], _app: _AppInfo, ) -> None: if use_admin_secret: assert (api_key := _app.admin_secret) else: api_key = _AdminSecret("") # Must use all lower case for `authorization` because # otherwise it would crash the gRPC receiver. headers = dict(authorization=f"Bearer {str(api_key)}") export = _span_exporter(_app, headers=headers).export assert export(_spans) is expected class TestEmbeddingsRestApi: @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_authenticated_users_can_access_route( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: user = _get_user(_app, role_or_user) logged_in_user = user.log_in(_app) with _EXPECTATION_404: # no files have been exported logged_in_user.export_embeddings(_app, "embeddings") def test_unauthenticated_requests_receive_401(self, _app: _AppInfo) -> None: with _EXPECTATION_401: _export_embeddings(_app, None, filename="embeddings") class TestPrompts: def test_authenticated_users_are_recorded_in_prompts( self, _get_user: _GetUser, _app: _AppInfo, ) -> None: u = _get_user(_app, _MEMBER) logged_in_user = u.log_in(_app) # Create new prompt response, _ = logged_in_user.gql( _app, query=""" mutation CreateChatPromptMutation($input: CreateChatPromptInput!) { createChatPrompt(input: $input) { id promptVersions { edges { promptVersion: node { user { id } } } } } } """, variables={ "input": { "name": "prompt-name", "description": "prompt-description", "promptVersion": { "description": "prompt-version-description", "templateFormat": "MUSTACHE", "template": { "messages": [ { "role": "USER", "content": [{"text": {"text": "hello world"}}], } ] }, "invocationParameters": {"temperature": 0.4}, "modelProvider": "OPENAI", "modelName": "o1-mini", }, } }, ) prompt_id = response["data"]["createChatPrompt"]["id"] prompt_versions = response["data"]["createChatPrompt"]["promptVersions"]["edges"] assert len(prompt_versions) == 1 user = prompt_versions[0]["promptVersion"]["user"] assert user is not None assert user["id"] == logged_in_user.gid # Create new version for existing prompt response, _ = logged_in_user.gql( _app, query=""" mutation CreateChatPromptVersionMutation($input: CreateChatPromptVersionInput!) { createChatPromptVersion(input: $input) { promptVersions { edges { promptVersion: node { user { id } } } } } } """, variables={ "input": { "promptId": prompt_id, "promptVersion": { "description": "new-version-description", "templateFormat": "MUSTACHE", "template": { "messages": [ { "role": "USER", "content": [{"text": {"text": "new version"}}], } ] }, "invocationParameters": {"temperature": 0.4}, "modelProvider": "OPENAI", "modelName": "o1-mini", }, } }, ) # Verify both versions record the user prompt_versions = response["data"]["createChatPromptVersion"]["promptVersions"]["edges"] assert len(prompt_versions) == 2 for version in prompt_versions: user = version["promptVersion"]["user"] assert user is not None assert user["id"] == logged_in_user.gid class TestSpanAnnotations: QUERY = """ mutation CreateSpanAnnotations($input: [CreateSpanAnnotationInput!]!) { createSpanAnnotations(input: $input) { spanAnnotations { ...SpanAnnotationFields } } } mutation PatchSpanAnnotations($input: [PatchAnnotationInput!]!) { patchSpanAnnotations(input: $input) { spanAnnotations { ...SpanAnnotationFields } } } mutation DeleteSpanAnnotations($input: DeleteAnnotationsInput!) { deleteSpanAnnotations(input: $input) { spanAnnotations { ...SpanAnnotationFields } } } query GetSpanAnnotation($annotationId: ID!) { spanAnnotation: node(id: $annotationId) { ... on SpanAnnotation { ...SpanAnnotationFields } } } fragment SpanAnnotationFields on SpanAnnotation { id name score label explanation annotatorKind metadata source identifier spanId user { id email username } } """ async def test_other_users_cannot_patch_and_only_creator_or_admin_can_delete( self, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: assert _existing_spans, "At least one existing span is required for this test" span_gid, *_ = choice(_existing_spans) annotation_creator = _get_user(_app, _MEMBER) logged_in_annotation_creator = annotation_creator.log_in(_app) member = _get_user(_app, _MEMBER) logged_in_member = member.log_in(_app) admin = _get_user(_app, _ADMIN) logged_in_admin = admin.log_in(_app) # Create span annotation name = token_hex(8) response, _ = logged_in_annotation_creator.gql( _app, query=self.QUERY, operation_name="CreateSpanAnnotations", variables={ "input": { "spanId": str(span_gid), "name": name, "annotatorKind": "HUMAN", "label": "correct", "score": 1, "explanation": "explanation", "metadata": {}, "identifier": "identifier", "source": "APP", } }, ) span_annotations = response["data"]["createSpanAnnotations"]["spanAnnotations"] assert len(span_annotations) == 1 original_span_annotation = span_annotations[0] annotation_id = original_span_annotation["id"] # Only the user who created the annotation can patch for user in [logged_in_member, logged_in_admin]: with pytest.raises(RuntimeError) as exc_info: response, _ = user.gql( _app, query=self.QUERY, operation_name="PatchSpanAnnotations", variables={ "input": { "annotationId": annotation_id, "name": f"patched-{name}", "annotatorKind": "LLM", "label": "incorrect", "score": 0, "explanation": "patched-explanation", "metadata": {"patched": "key"}, "identifier": "patched-identifier", } }, ) assert "At least one span annotation is not associated with the current user." in str( exc_info.value ) # Check that the annotation remains unchanged response, _ = user.gql( _app, query=self.QUERY, operation_name="GetSpanAnnotation", variables={"annotationId": annotation_id}, ) span_annotation = response["data"]["spanAnnotation"] assert span_annotation == original_span_annotation # Member who did not create the annotation cannot delete with pytest.raises(RuntimeError) as exc_info: logged_in_member.gql( _app, query=self.QUERY, operation_name="DeleteSpanAnnotations", variables={ "input": { "annotationIds": [annotation_id], } }, ) assert "At least one span annotation is not associated with the current user." in str( exc_info.value ) # Check that the annotation remains unchanged response, _ = user.gql( _app, query=self.QUERY, operation_name="GetSpanAnnotation", variables={"annotationId": annotation_id}, ) span_annotation = response["data"]["spanAnnotation"] assert span_annotation == original_span_annotation # Admin can delete response, _ = logged_in_admin.gql( _app, query=self.QUERY, operation_name="DeleteSpanAnnotations", variables={ "input": { "annotationIds": [annotation_id], } }, ) class TestTraceAnnotations: QUERY = """ mutation CreateTraceAnnotations($input: [CreateTraceAnnotationInput!]!) { createTraceAnnotations(input: $input) { traceAnnotations { ...TraceAnnotationFields } } } mutation PatchTraceAnnotations($input: [PatchAnnotationInput!]!) { patchTraceAnnotations(input: $input) { traceAnnotations { ...TraceAnnotationFields } } } mutation DeleteTraceAnnotations($input: DeleteAnnotationsInput!) { deleteTraceAnnotations(input: $input) { traceAnnotations { ...TraceAnnotationFields } } } query GetTraceAnnotation($annotationId: ID!) { traceAnnotation: node(id: $annotationId) { ... on TraceAnnotation { ...TraceAnnotationFields } } } fragment TraceAnnotationFields on TraceAnnotation { id name score label explanation annotatorKind metadata source identifier trace { traceId } user { id email username } } """ async def test_other_users_cannot_patch_and_only_creator_or_admin_can_delete( self, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: assert _existing_spans, "At least one existing span is required for this test" existing_span = choice(_existing_spans) trace_gid = existing_span.trace.id annotation_creator = _get_user(_app, _MEMBER) logged_in_annotation_creator = annotation_creator.log_in(_app) member = _get_user(_app, _MEMBER) logged_in_member = member.log_in(_app) admin = _get_user(_app, _ADMIN) logged_in_admin = admin.log_in(_app) # Create trace annotation response, _ = logged_in_annotation_creator.gql( _app, query=self.QUERY, operation_name="CreateTraceAnnotations", variables={ "input": { "traceId": str(trace_gid), "name": "trace-annotation-name", "annotatorKind": "HUMAN", "label": "correct", "score": 1, "explanation": "explanation", "metadata": {}, "identifier": "identifier", "source": "APP", } }, ) trace_annotations = response["data"]["createTraceAnnotations"]["traceAnnotations"] assert len(trace_annotations) == 1 original_trace_annotation = trace_annotations[0] annotation_id = original_trace_annotation["id"] # Only the user who created the annotation can patch for user in [logged_in_member, logged_in_admin]: with pytest.raises(RuntimeError) as exc_info: response, _ = user.gql( _app, query=self.QUERY, operation_name="PatchTraceAnnotations", variables={ "input": { "annotationId": annotation_id, "name": "patched-trace-annotation-name", "annotatorKind": "LLM", "label": "incorrect", "score": 0, "explanation": "patched-explanation", "metadata": {"patched": "key"}, "identifier": "patched-identifier", } }, ) assert "At least one trace annotation is not associated with the current user." in str( exc_info.value ) # Check that the annotation remains unchanged response, _ = user.gql( _app, query=self.QUERY, operation_name="GetTraceAnnotation", variables={"annotationId": annotation_id}, ) trace_annotation = response["data"]["traceAnnotation"] assert trace_annotation == original_trace_annotation # Member who did not create the annotation cannot delete with pytest.raises(RuntimeError) as exc_info: logged_in_member.gql( _app, query=self.QUERY, operation_name="DeleteTraceAnnotations", variables={ "input": { "annotationIds": [annotation_id], } }, ) assert ( "At least one trace annotation is not associated with the current user " "and the current user is not an admin." in str(exc_info.value) ) # Check that the annotation remains unchanged response, _ = user.gql( _app, query=self.QUERY, operation_name="GetTraceAnnotation", variables={"annotationId": annotation_id}, ) trace_annotation = response["data"]["traceAnnotation"] assert trace_annotation == original_trace_annotation # Admin can delete response, _ = logged_in_admin.gql( _app, query=self.QUERY, operation_name="DeleteTraceAnnotations", variables={ "input": { "annotationIds": [annotation_id], } }, ) class TestApiAccessViaCookiesOrApiKeys: """Tests REST API v1 access control using both cookie and API key authentication. This test suite verifies that access restrictions are enforced consistently across all user roles (Admin, Member, Viewer, Default Admin) at the v1 router level, regardless of authentication method: - Cookie-based authentication (access tokens from login) - API key authentication (Bearer tokens) The comprehensive test parametrized by role validates 54 total endpoints/operations: Common Read Resources (28 GET endpoints): - All roles have identical read access - Covers: Projects, Datasets, Experiments, Prompts, Annotation Configs, Evaluations, Spans, Annotations - Tests both valid responses (200) and error cases (404, 422) Admin-Only Endpoints (5 operations): - Admins: Access granted (may get 200, 404, 422 based on request validity) - Members/Viewers: Always 403 Forbidden - User management: GET/POST/DELETE on /v1/users - Project management: PUT/DELETE on /v1/projects Write Operations (21 POST/PUT/DELETE operations): - Viewers: Always 403 Forbidden (read-only access) - Admins/Members: Access granted (may get 400, 404, 415, 422 based on request validity) Error Handling: - Invalid ID format: Returns 422 for GlobalID parsing errors - Missing required headers/body: Returns 400 or 422 - Unsupported media type: Returns 415 - Resource not found: Returns 404 - Dynamic test IDs using token_hex(4) for test isolation """ @pytest.mark.parametrize("role_or_user", list(UserRoleInput) + [_DEFAULT_ADMIN]) def test_role_based_access_control( self, role_or_user: _RoleOrUser, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test role-based access control across all v1 API endpoints with cookies and API keys. This comprehensive test verifies access control for all user roles (Admin, Member, Viewer, Default Admin) across three categories of endpoints, using both authentication methods: 1. Cookie-based authentication (session tokens) 2. API key authentication (Bearer tokens) Test Coverage: Common Read Resources (28 GET endpoints): - All roles should have identical read access to common resources - Covers: Projects, Datasets, Experiments, Prompts, Annotation Configs, Evaluations, Spans, and Annotations - Tests both valid responses (200) and error cases (404, 422) Admin-Only Endpoints (5 operations): - Admins: Access granted (may get 200, 404, 422 based on request validity) - Members/Viewers: Always 403 Forbidden (authorization denied) - User management: GET /v1/users, POST /v1/users, DELETE /v1/users/{id} - Project management: PUT /v1/projects/{id}, DELETE /v1/projects/{id} Write Operations (21 POST/PUT/DELETE operations): - Viewers: Always 403 Forbidden (read-only access) - Admins/Members: Access granted (may get 400, 404, 415, 422 based on request validity) - Covers: Datasets, Experiments, Prompts, Annotations, Evaluations, Spans, Traces, and Project creation (POST /v1/projects) This verifies that authorization is enforced consistently regardless of authentication method. Each role maintains the same permissions whether using cookies or API keys. """ user = _get_user(_app, role_or_user) logged_in_user = user.log_in(_app) api_key = logged_in_user.create_api_key(_app) tokens = logged_in_user.tokens is_admin = user.role is UserRoleInput.ADMIN or role_or_user is _DEFAULT_ADMIN is_viewer = user.role is UserRoleInput.VIEWER for client in (_httpx_client(_app, tokens), _httpx_client(_app, api_key)): # Test 1: Common read resources - all roles should have identical access for expected_status_code, method, endpoint in _COMMON_RESOURCE_ENDPOINTS: assert expected_status_code not in (401, 403), ( f"Test misconfiguration: expected_status_code should not be " f"401 or 403 (got {expected_status_code} for {method} {endpoint})" ) endpoint = endpoint.format(token_hex(4)) response = client.request(method, endpoint) assert response.status_code == expected_status_code, ( f"Expected {expected_status_code} but got {response.status_code} for {endpoint}" ) # Test 2: Admin-only endpoints - only admins should have access for expected_status_code, method, endpoint in _ADMIN_ONLY_ENDPOINTS: endpoint = endpoint.format(token_hex(4)) response = client.request(method, endpoint) if is_admin: assert response.status_code == expected_status_code, ( f"Admin expected {expected_status_code} but got {response.status_code} " f"for {method} {endpoint}" ) else: assert response.status_code == 403, ( f"Non-admin expected 403 but got {response.status_code} " f"for {method} {endpoint}" ) # Test 3: Write operations - viewers blocked, admins/members have access for expected_status_code, method, endpoint in _VIEWER_BLOCKED_WRITE_OPERATIONS: endpoint = endpoint.format(token_hex(4)) response = client.request(method, endpoint) if is_viewer: assert response.status_code == 403, ( f"Viewer expected 403 but got {response.status_code} for {method} {endpoint}" ) else: assert response.status_code == expected_status_code, ( f"Admin/Member expected {expected_status_code} but got {response.status_code} " f"for {method} {endpoint}" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server