Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_annotations.py153 kB
# pyright: reportPrivateUsage=false from __future__ import annotations from asyncio import gather, sleep from random import choice, sample from secrets import token_bytes, token_hex from typing import Any, Literal, NamedTuple, Optional, Sequence, cast import httpx import pandas as pd import pytest from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import format_span_id, format_trace_id from strawberry.relay import GlobalID from typing_extensions import assert_never import phoenix as px from phoenix.client.__generated__ import v1 from phoenix.trace import DocumentEvaluations, SpanEvaluations, TraceEvaluations from .._helpers import ( _ADMIN, _MEMBER, _AppInfo, _await_or_return, _ExistingSpan, _get, _GetUser, _gql, _grpc_span_exporter, _RoleOrUser, _SecurityArtifact, _start_span, ) _ANNOTATOR_KINDS = ["LLM", "CODE", "HUMAN"] class TestClientForSpanAnnotations: """Tests the Phoenix span annotation client functionality. Verifies that the client can: - Create and update single annotations - Handle multiple annotations at once - Work with different user roles - Use DataFrames for annotations - Work in both regular and async mode - Handle special cases like zero scores """ # GraphQL query to retrieve span annotations for a given span ID query = """ query GetSpanAnnotations($id: ID!) { node (id: $id) { ... on Span { spanAnnotations { id name source identifier annotatorKind metadata label score explanation user { id } } } } } """ @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize( "role_or_user, api_key_kind", [ (_MEMBER, "User"), (_ADMIN, "User"), (_ADMIN, "System"), ], ) async def test_add_span_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, api_key_kind: Literal["User", "System"], _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests creating and updating single annotations. Verifies that: - New annotations can be created with all fields - Existing annotations can be updated - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span_gid1, span_id1, *_ = choice(_existing_spans) # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, api_key_kind)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Single Annotation UPSERT Test # ============================================================================ # Test UPSERT functionality by adding multiple annotations with the same name. # This verifies that the API correctly updates existing annotations rather # than creating new annotations. annotation_name = token_hex(8) existing_gid: Optional[str] = None # First iteration: Create initial annotation for j in range(2): # Generate random test data for the annotation score = int.from_bytes(token_bytes(4), byteorder="big") label = token_hex(8) explanation = token_hex(8) metadata = {token_hex(8): token_hex(8)} # Create the span annotation result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=annotation_name, span_id=span_id1, annotator_kind="LLM", # Test non-default annotator_kind label=label, score=score, explanation=explanation, metadata=metadata, sync=sync, ), ) if sync: assert result # Verify the annotation was created correctly by querying the GraphQL API def get_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid1)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["spanAnnotations"] } return annotations.get((label, score, explanation)) anno = await _get( query_fn=get_annotation, error_msg="Created annotation should be present in span annotations", no_wait=sync, ) # Expected user ID for the annotation expected_user_id: str if api_key_kind == "User": expected_user_id = u.gid elif api_key_kind == "System": system_user_gid = str(GlobalID(type_name="User", node_id=str(1))) expected_user_id = system_user_gid else: assert_never(api_key_kind) # Get the annotation and verify all fields match what was provided assert anno["name"] == annotation_name, "Annotation name should match input" assert anno["source"] == "API", "Annotation source should be API" assert anno["annotatorKind"] == "LLM", "Annotation annotator_kind should be LLM" assert anno["metadata"] == metadata, "Annotation metadata should match input" user = anno["user"] assert user is not None, "Annotation should have a user" assert user["id"] == expected_user_id, "Annotation user ID should match expected user" if j == 0: existing_gid = anno["id"] else: assert anno["id"] == existing_gid, ( "Annotation ID should remain the same after update" ) @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_span_annotations( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling multiple annotations at once. Verifies that: - Multiple annotations can be created in one call - Multiple annotations can be updated at once - Works with annotations across different spans - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" (span_gid1, span_id1, *_), (span_gid2, span_id2, *_) = sample(_existing_spans, 2) # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Batch Annotation Test # ============================================================================ # Test batch annotation creation and updates using log_span_annotations # Create annotations for both spans in a single batch operation # Setup test data for batch operations span_ids = [span_id1, span_id2] span_gids = [span_gid1, span_gid2] annotation_names = [token_hex(8), token_hex(8)] identifiers = [token_hex(8), token_hex(8)] existing_gids: list[Optional[str]] = [None, None] # Two iterations: First creates annotations, second updates them for i in range(2): # Generate new random values for each iteration labels = [token_hex(8), token_hex(8)] scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] explanations = [token_hex(8), token_hex(8)] metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] # Create annotation data for both spans span_annotations: list[v1.SpanAnnotationData] = [ { "name": annotation_names[i], "span_id": span_ids[i], "annotator_kind": "CODE", # Test non-default annotator_kind "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(len(span_ids)) ] # Log the batch annotations result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations( span_annotations=span_annotations, sync=sync, ), ) if sync: # Verify the batch operation returned the expected number of results assert result assert len(result) == 2, ( "Batch operation should return results for both annotations" ) # Verify each annotation in the batch for j in range(2): def get_batch_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gids[j])}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["spanAnnotations"] } return annotations.get((labels[j], scores[j], explanations[j])) anno = await _get( query_fn=get_batch_annotation, error_msg=f"Batch annotation {j + 1} should be present in span annotations", no_wait=sync, ) # Verify the batch annotation fields match what was provided assert anno["name"] == annotation_names[j], ( f"Batch annotation {j + 1} name should match input" ) assert anno["source"] == "API", f"Batch annotation {j + 1} source should be API" assert anno["annotatorKind"] == "CODE", ( f"Batch annotation {j + 1} annotator_kind should be CODE" ) assert anno["metadata"] == metadata[j], ( f"Batch annotation {j + 1} metadata should match input" ) assert anno["identifier"] == identifiers[j], ( f"Batch annotation {j + 1} identifier should match input" ) # Verify ID persistence across updates if i == 0: existing_gids[j] = anno["id"] else: assert anno["id"] == existing_gids[j], ( f"Batch annotation {j + 1} ID should remain the same after update" ) @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_span_annotations_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests using DataFrames for annotations. Tests three ways to use DataFrames: 1. With span_id as a column 2. With span_id as the index 3. With a shared annotator type Verifies that: - Annotations can be read from DataFrames - Different DataFrame layouts are handled correctly - Shared settings work properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" (span_gid1, span_id1, *_), (span_gid2, span_id2, *_) = sample(_existing_spans, 2) # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case 1: Using span_id as column # ============================================================================ # This test case demonstrates standard DataFrame usage with span_id as a column # All fields are provided as columns in the DataFrame df1_annotation_names = [token_hex(8), token_hex(8)] df1_span_ids = [span_id1, span_id2] df1_annotator_kinds = ["HUMAN", "LLM"] df1_labels = [token_hex(8), token_hex(8)] df1_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df1_explanations = [token_hex(8), token_hex(8)] df1_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df1 = pd.DataFrame( { "name": df1_annotation_names, "span_id": df1_span_ids, "annotator_kind": df1_annotator_kinds, "label": df1_labels, "score": df1_scores, "explanation": df1_explanations, "metadata": df1_metadata, } ) # Log annotations from DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations_dataframe( dataframe=df1, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, span_gid in enumerate([span_gid1, span_gid2]): def get_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["spanAnnotations"] } return annotations.get((df1_labels[i], df1_scores[i], df1_explanations[i])) anno = await _get( query_fn=get_df_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in span annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df1_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df1_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == df1_annotator_kinds[i], ( f"DataFrame annotation {i + 1} annotator_kind should match input" ) # ============================================================================ # Test Case 2: Using span_id as index # ============================================================================ # This test case demonstrates using span_id as the DataFrame index # This is an alternative way to specify span IDs without a dedicated column df2_annotation_names = [token_hex(8), token_hex(8)] df2_annotator_kinds = ["HUMAN", "LLM"] df2_labels = [token_hex(8), token_hex(8)] df2_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df2_explanations = [token_hex(8), token_hex(8)] df2_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df2 = pd.DataFrame( { "name": df2_annotation_names, "annotator_kind": df2_annotator_kinds, "label": df2_labels, "score": df2_scores, "explanation": df2_explanations, "metadata": df2_metadata, }, index=[span_id1, span_id2], ) # Log annotations from DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations_dataframe( dataframe=df2, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, span_gid in enumerate([span_gid1, span_gid2]): def get_df2_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["spanAnnotations"] } return annotations.get((df2_labels[i], df2_scores[i], df2_explanations[i])) anno = await _get( query_fn=get_df2_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in span annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df2_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df2_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == df2_annotator_kinds[i], ( f"DataFrame annotation {i + 1} annotator_kind should match input" ) # ============================================================================ # Test Case 3: Using global annotator_kind # ============================================================================ # This test case demonstrates using a global annotator_kind parameter # The DataFrame does not include an annotator_kind column, and the value is # provided as a parameter to the API call. global_annotator_kind: Literal["HUMAN"] = "HUMAN" df3_annotation_names = [token_hex(8), token_hex(8)] df3_span_ids = [span_id1, span_id2] df3_labels = [token_hex(8), token_hex(8)] df3_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df3_explanations = [token_hex(8), token_hex(8)] df3_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df3 = pd.DataFrame( { "name": df3_annotation_names, "span_id": df3_span_ids, "label": df3_labels, "score": df3_scores, "explanation": df3_explanations, "metadata": df3_metadata, } ) # Log annotations from DataFrame with global annotator_kind result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations_dataframe( dataframe=df3, annotator_kind=global_annotator_kind, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, span_gid in enumerate([span_gid1, span_gid2]): def get_df3_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["spanAnnotations"] } return annotations.get((df3_labels[i], df3_scores[i], df3_explanations[i])) anno = await _get( query_fn=get_df3_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in span annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df3_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df3_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == global_annotator_kind, ( f"DataFrame annotation {i + 1} annotator_kind should match global value" ) @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling annotations with zero scores. Verifies that: - Zero scores are saved and loaded correctly - Missing optional fields are handled properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span_gid1, span_id1, *_ = choice(_existing_spans) # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score # ============================================================================ # Test that a score of 0 is properly recorded and not treated as falsey zero_score_annotation_name = token_hex(8) # Create annotation with score of 0 result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=zero_score_annotation_name, span_id=span_id1, annotator_kind="LLM", score=0, # Explicitly test score of 0 sync=sync, ), ) if sync: assert result # Verify the annotation was created correctly by querying the GraphQL API def get_zero_score_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["spanAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_annotation, error_msg="Annotation with score of 0 should be present in span annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "Annotation score should be exactly 0" assert anno["label"] is None, "Annotation label should be None" assert anno["explanation"] is None, "Annotation explanation should be None" assert anno["source"] == "API", "Annotation source should be API" assert anno["annotatorKind"] == "LLM", "Annotation annotator_kind should be LLM" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling zero scores in DataFrames. Verifies that: - Zero scores can be read from DataFrames - Missing optional fields are handled properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span_gid1, span_id1, *_ = choice(_existing_spans) # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score in DataFrame # ============================================================================ # Test that a score of 0 is properly recorded and not treated as falsey zero_score_annotation_name = token_hex(8) # Create DataFrame with score of 0 # The DataFrame uses span ID to identify the span df = pd.DataFrame( { "name": [zero_score_annotation_name], "span_id": [span_id1], "annotator_kind": ["LLM"], "score": [0], # Explicitly test score of 0 } ) # Log annotations from DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations_dataframe( dataframe=df, sync=sync, ), ) if sync: assert result # Verify the annotation was created correctly by querying the GraphQL API def get_zero_score_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["spanAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_df_annotation, error_msg="DataFrame annotation with score of 0 should be present in span annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "DataFrame annotation score should be exactly 0" assert anno["label"] is None, "DataFrame annotation label should be None" assert anno["explanation"] is None, "DataFrame annotation explanation should be None" assert anno["source"] == "API", "DataFrame annotation source should be API" assert anno["annotatorKind"] == "LLM", "DataFrame annotation annotator_kind should be LLM" async def test_burst_annotations_with_unique_identifiers( self, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests sending multiple span annotations in a burst with unique identifiers. Sends the same annotation name with different identifiers in rapid succession. Annotations are deduplicated by (name, identifier) tuple rather than just by name. Verifies that: - Multiple annotations with the same name but different identifiers can be sent in a burst - All 5 unique annotations are properly inserted - Each annotation maintains its unique identifier and all field values - Duplicate batches are correctly deduplicated """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL span ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span_gid1, span_id1, *_ = choice(_existing_spans) api_key = _app.admin_secret from phoenix.client import AsyncClient # ============================================================================ # Test Case: Burst Annotations # ============================================================================ annotation_name = token_hex(8) num_annotations = 5 # Generate unique data for each annotation identifiers = [token_hex(8) for _ in range(num_annotations)] labels = [token_hex(8) for _ in range(num_annotations)] scores = [int.from_bytes(token_bytes(4), byteorder="big") for _ in range(num_annotations)] explanations = [token_hex(8) for _ in range(num_annotations)] metadata = [{token_hex(8): token_hex(8)} for _ in range(num_annotations)] # Create annotation data for burst sending span_annotations: list[v1.SpanAnnotationData] = [ { "name": annotation_name, "span_id": span_id1, "annotator_kind": "CODE", "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(num_annotations) ] # Send all annotations in a burst task = AsyncClient(base_url=_app.base_url, api_key=api_key).spans.log_span_annotations( span_annotations=span_annotations * 2, sync=False, ) await gather(task, task) # Verify all annotations were created correctly by querying the GraphQL API def get_all_burst_annotations() -> Optional[dict[str, dict[str, Any]]]: res, _ = _gql( _app, api_key, query=self.query, operation_name="GetSpanAnnotations", variables={"id": str(span_gid1)}, ) # Filter to only our test annotations (by name) test_annotations = [ anno for anno in res["data"]["node"]["spanAnnotations"] if anno["name"] == annotation_name ] # Check if we have all expected annotations if len(test_annotations) == num_annotations: # Return indexed by identifier for easy lookup return {anno["identifier"]: anno for anno in test_annotations} return None annotations_by_identifier = await _get( query_fn=get_all_burst_annotations, error_msg=f"All {num_annotations} burst annotations should be present", ) # Verify each annotation exists with correct values for i in range(num_annotations): assert identifiers[i] in annotations_by_identifier, ( f"Annotation with identifier {identifiers[i]} should be present" ) anno = annotations_by_identifier[identifiers[i]] assert anno["name"] == annotation_name, f"Annotation {i} name should match input" assert anno["source"] == "API", f"Annotation {i} source should be API" assert anno["annotatorKind"] == "CODE", f"Annotation {i} annotator_kind should be CODE" assert anno["metadata"] == metadata[i], f"Annotation {i} metadata should match input" assert anno["label"] == labels[i], f"Annotation {i} label should match input" assert anno["score"] == scores[i], f"Annotation {i} score should match input" assert anno["explanation"] == explanations[i], ( f"Annotation {i} explanation should match input" ) class TestClientForSpanDocumentAnnotations: """Tests the Phoenix span document annotation client functionality. Verifies that the client can: - Create and update single span document annotations - Handle multiple span document annotations at once - Work with different user roles - Use DataFrames for span document annotations - Work in both regular and async mode - Handle special cases like zero scores """ # GraphQL query to retrieve span document annotations for a given span ID query = """ query GetSpanDocumentEvaluations($id: ID!) { node (id: $id) { ... on Span { documentEvaluations { id name source identifier annotatorKind metadata label score explanation documentPosition user { id } } } } } """ @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize( "role_or_user, api_key_kind", [ (_MEMBER, "User"), (_ADMIN, "User"), (_ADMIN, "System"), ], ) async def test_add_document_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, api_key_kind: Literal["User", "System"], _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test creating and updating single span document annotations. Verifies UPSERT behavior, user permissions, and field persistence. """ # Setup assert _existing_spans, "At least one span required" span_gid1, span_id1, *_ = choice(_existing_spans) # Setup authenticated user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, api_key_kind)) # Setup client from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Test UPSERT functionality annotation_name = token_hex(8) document_position = 0 existing_gid: Optional[str] = None # Test create then update for j in range(2): score = int.from_bytes(token_bytes(4), byteorder="big") label = token_hex(8) explanation = token_hex(8) metadata = {token_hex(8): token_hex(8)} result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_document_annotation( annotation_name=annotation_name, span_id=span_id1, document_position=document_position, annotator_kind="LLM", label=label, score=score, explanation=explanation, metadata=metadata, sync=sync, ), ) if sync: assert result # Verify annotation via GraphQL def get_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanDocumentEvaluations", variables={"id": str(span_gid1)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["documentEvaluations"] } return annotations.get((label, score, explanation)) anno = await _get( query_fn=get_annotation, error_msg="Document annotation should be present", no_wait=sync, ) # Expected user ID for the annotation expected_user_id: str if api_key_kind == "User": expected_user_id = u.gid elif api_key_kind == "System": system_user_gid = str(GlobalID(type_name="User", node_id=str(1))) expected_user_id = system_user_gid else: assert_never(api_key_kind) # Verify annotation fields assert anno["name"] == annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" assert anno["metadata"] == metadata assert anno["documentPosition"] == document_position assert anno["user"]["id"] == expected_user_id if j == 0: existing_gid = anno["id"] else: assert anno["id"] == existing_gid, "ID should persist on update" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_document_annotations( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test batch span document annotation operations. Verifies multi-document operations, UPSERT behavior, and consistency. """ # Setup assert len(_existing_spans) >= 2, "Two spans required" (span_gid1, span_id1, *_), (span_gid2, span_id2, *_) = sample(_existing_spans, 2) # Setup authenticated user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Setup client from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Setup batch test data span_ids = [span_id1, span_id2] span_gids = [span_gid1, span_gid2] annotation_names = [token_hex(8), token_hex(8)] identifiers = ["", ""] # Use empty identifiers since non-empty ones are not supported existing_gids: list[Optional[str]] = [None, None] # Test create then update for i in range(2): labels = [token_hex(8), token_hex(8)] scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] explanations = [token_hex(8), token_hex(8)] metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] document_positions = [0, 0] # Create batch annotation data document_annotations: list[v1.SpanDocumentAnnotationData] = [ { "name": annotation_names[i], "span_id": span_ids[i], "document_position": document_positions[i], "annotator_kind": "CODE", "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(len(span_ids)) ] # Submit batch annotations result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_document_annotations( document_annotations=document_annotations, sync=sync, ), ) if sync: assert result and len(result) == 2 # Verify annotations for j in range(2): def get_batch_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanDocumentEvaluations", variables={"id": str(span_gids[j])}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["documentEvaluations"] } return annotations.get((labels[j], scores[j], explanations[j])) anno = await _get( query_fn=get_batch_annotation, error_msg=f"Batch document annotation {j + 1} should be present", no_wait=sync, ) # Verify annotation fields assert anno["name"] == annotation_names[j] assert anno["source"] == "API" assert anno["annotatorKind"] == "CODE" assert anno["metadata"] == metadata[j] assert anno["identifier"] == identifiers[j] assert anno["documentPosition"] == document_positions[j] # Verify ID persistence across updates if i == 0: existing_gids[j] = anno["id"] else: assert anno["id"] == existing_gids[j], "ID should persist on update" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_document_annotations_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test DataFrame span document annotations. Verifies column/index layouts and global parameters. """ # Setup assert len(_existing_spans) >= 2, "Two spans required" (span_gid1, span_id1, *_), (span_gid2, span_id2, *_) = sample(_existing_spans, 2) # Setup authenticated user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Setup client from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Test: DataFrame with columns df_annotation_names = [token_hex(8), token_hex(8)] df_span_ids = [span_id1, span_id2] df_annotator_kinds = ["HUMAN", "LLM"] df_labels = [token_hex(8), token_hex(8)] df_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df_explanations = [token_hex(8), token_hex(8)] df_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] document_positions = [0, 0] df = pd.DataFrame( { "name": df_annotation_names, "span_id": df_span_ids, "document_position": document_positions, "annotator_kind": df_annotator_kinds, "label": df_labels, "score": df_scores, "explanation": df_explanations, "metadata": df_metadata, } ) # Log annotations from DataFrame result = await _await_or_return( Client( base_url=_app.base_url, api_key=api_key ).spans.log_document_annotations_dataframe( dataframe=df, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, span_gid in enumerate([span_gid1, span_gid2]): def get_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanDocumentEvaluations", variables={"id": str(span_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["documentEvaluations"] } return annotations.get((df_labels[i], df_scores[i], df_explanations[i])) anno = await _get( query_fn=get_df_annotation, error_msg=f"DataFrame document annotation {i + 1} should be present", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df_annotation_names[i] assert anno["source"] == "API" assert anno["metadata"] == df_metadata[i] assert anno["annotatorKind"] == df_annotator_kinds[i] assert anno["documentPosition"] == document_positions[i] @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_document_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test zero score handling in span document annotations.""" # Setup assert _existing_spans, "At least one span required" span_gid1, span_id1, *_ = choice(_existing_spans) # Setup authenticated user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Setup client from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Test zero score zero_score_annotation_name = token_hex(8) document_position = 0 # Create annotation with score of 0 result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_document_annotation( annotation_name=zero_score_annotation_name, span_id=span_id1, document_position=document_position, annotator_kind="LLM", score=0, # Explicitly test score of 0 sync=sync, ), ) if sync: assert result # Verify zero score annotation def get_zero_score_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSpanDocumentEvaluations", variables={"id": str(span_gid1)}, ) annotations = { anno["name"]: anno for anno in res["data"]["node"]["documentEvaluations"] } return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_annotation, error_msg="Document annotation with score of 0 should be present", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0 assert anno["label"] is None assert anno["explanation"] is None assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" assert anno["documentPosition"] == document_position @pytest.mark.parametrize("sync", [True, False]) async def test_document_annotation_identifier_validation( self, sync: bool, _existing_spans: Sequence[_ExistingSpan], _app: _AppInfo, ) -> None: """Test server-side identifier validation via direct HTTP requests. Verifies that: - Non-empty identifiers are rejected with HTTP 422 """ # Setup assert _existing_spans, "At least one span required" _, span_id1, *_ = choice(_existing_spans) # Setup HTTP client for direct requests async with httpx.AsyncClient(base_url=_app.base_url) as client: headers = { "Authorization": f"Bearer {_app.admin_secret}", "Content-Type": "application/json", } # Non-empty identifier should be rejected non_empty_request_body = { "data": [ { "span_id": span_id1, "document_position": 0, "name": token_hex(8), "annotator_kind": "HUMAN", "identifier": "test_identifier", # Non-empty identifier "result": { "label": "True", "score": 0.95, "explanation": "This should be rejected.", }, "metadata": {}, } ] } response = await client.post( f"v1/document_annotations?sync={sync}", json=non_empty_request_body, headers=headers ) assert response.status_code == 422, ( "Non-empty identifier should be rejected with HTTP 422" ) class TestClientForTraceAnnotations: """Tests the Phoenix trace annotation client functionality. Verifies that the client can: - Create and update single trace annotations - Handle multiple trace annotations at once - Work with different user roles - Work in both regular and async mode - Retrieve trace annotations """ # GraphQL query to retrieve trace annotations for a given trace ID query = """ query GetTraceAnnotations($id: ID!) { node (id: $id) { ... on Trace { traceAnnotations { id name source identifier annotatorKind metadata label score explanation user { id } } } } } """ @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize( "role_or_user, api_key_kind", [ (_MEMBER, "User"), (_ADMIN, "User"), (_ADMIN, "System"), ], ) async def test_add_trace_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, api_key_kind: Literal["User", "System"], _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests creating and updating single trace annotations. Verifies that: - New trace annotations can be created with all fields - Existing trace annotations can be updated - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # Setup assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans trace_id1 = span1.trace.trace_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, api_key_kind)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Test UPSERT functionality by adding multiple annotations with the same name annotation_name = token_hex(8) # Create initial annotation score = 0.75 label = "good" explanation = "Test trace annotation" metadata = {"test": "metadata"} result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.add_trace_annotation( annotation_name=annotation_name, trace_id=trace_id1, annotator_kind="HUMAN", label=label, score=score, explanation=explanation, metadata=metadata, sync=sync, ), ) if sync: assert result is not None, "Should receive annotation ID when sync=True" assert result["id"], "Should have valid annotation ID" else: assert result is None, "Should not receive annotation ID when sync=False" # Verify annotation was created via GraphQL # Get trace GID from span data - use the existing trace GID from the span trace_gid = str(span1.trace.id) # Use _get() method to wait for annotation to be created and retrieved def get_trace_annotation() -> Optional[dict[str, Any]]: gql_resp, _ = _gql( _app, _app.admin_secret, query=self.query, variables={"id": trace_gid} ) annotations: list[dict[str, Any]] = gql_resp["data"]["node"]["traceAnnotations"] # Filter to find the annotation we just created our_annotations = [anno for anno in annotations if anno["name"] == annotation_name] if len(our_annotations) == 1: return our_annotations[0] return None anno = await _get( query_fn=get_trace_annotation, error_msg=f"Should have exactly one trace annotation with name {annotation_name}", no_wait=sync, ) assert anno["name"] == annotation_name assert anno["label"] == label assert anno["score"] == score assert anno["explanation"] == explanation assert anno["annotatorKind"] == "HUMAN" assert anno["metadata"] == metadata assert anno["source"] == "API" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_trace_annotations( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling multiple trace annotations at once. Verifies that: - Multiple trace annotations can be created in one call - Multiple trace annotations can be updated at once - Works with annotations across different traces - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL trace IDs and graphql Global IDs from the fixture assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" span1, span2, *_ = _existing_spans trace_id1 = span1.trace.trace_id trace_id2 = span2.trace.trace_id trace_gid1 = span1.trace.id trace_gid2 = span2.trace.id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Batch Annotation Test # ============================================================================ # Test batch annotation creation and updates using log_trace_annotations # Create annotations for both traces in a single batch operation # Setup test data for batch operations trace_ids = [trace_id1, trace_id2] trace_gids = [trace_gid1, trace_gid2] annotation_names = [token_hex(8), token_hex(8)] identifiers = [token_hex(8), token_hex(8)] existing_gids: list[Optional[str]] = [None, None] # Two iterations: First creates annotations, second updates them for i in range(2): # Generate new random values for each iteration labels = [token_hex(8), token_hex(8)] scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] explanations = [token_hex(8), token_hex(8)] metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] # Create annotation data for both traces trace_annotations: list[v1.TraceAnnotationData] = [ { "name": annotation_names[i], "trace_id": trace_ids[i], "annotator_kind": "CODE", # Test non-default annotator_kind "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(len(trace_ids)) ] # Log the batch annotations result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations( trace_annotations=trace_annotations, sync=sync, ), ) if sync: # Verify the batch operation returned the expected number of results assert result assert len(result) == 2, ( "Batch operation should return results for both annotations" ) # Verify each annotation in the batch for j in range(2): def get_batch_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gids[j])}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["traceAnnotations"] } return annotations.get((labels[j], scores[j], explanations[j])) anno = await _get( query_fn=get_batch_annotation, error_msg=f"Batch annotation {j + 1} should be present in trace annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == annotation_names[j], ( f"Batch annotation {j + 1} name should match input" ) assert anno["source"] == "API", f"Batch annotation {j + 1} source should be API" assert anno["annotatorKind"] == "CODE", ( f"Batch annotation {j + 1} annotator_kind should be CODE" ) assert anno["metadata"] == metadata[j], ( f"Batch annotation {j + 1} metadata should match input" ) assert anno["identifier"] == identifiers[j], ( f"Batch annotation {j + 1} identifier should match input" ) # Verify ID persistence across updates if i == 0: existing_gids[j] = anno["id"] else: assert anno["id"] == existing_gids[j], ( f"Batch annotation {j + 1} ID should remain the same after update" ) @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_trace_annotations_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests using DataFrames for trace annotations. Tests three ways to use DataFrames: 1. With trace_id as a column 2. With trace_id as the index 3. With a shared annotator type Verifies that: - Trace annotations can be read from DataFrames - Different DataFrame layouts are handled correctly - Shared settings work properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL trace IDs and graphql Global IDs from the fixture assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" span1, span2, *_ = _existing_spans trace_id1 = span1.trace.trace_id trace_id2 = span2.trace.trace_id trace_gid1 = span1.trace.id trace_gid2 = span2.trace.id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case 1: Using trace_id as column # ============================================================================ # This test case demonstrates standard DataFrame usage with trace_id as a column # All fields are provided as columns in the DataFrame df1_annotation_names = [token_hex(8), token_hex(8)] df1_trace_ids = [trace_id1, trace_id2] df1_annotator_kinds = ["HUMAN", "CODE"] df1_labels = [token_hex(8), token_hex(8)] df1_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df1_explanations = [token_hex(8), token_hex(8)] df1_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df1 = pd.DataFrame( { "name": df1_annotation_names, "trace_id": df1_trace_ids, "annotator_kind": df1_annotator_kinds, "label": df1_labels, "score": df1_scores, "explanation": df1_explanations, "metadata": df1_metadata, } ) # Log annotations from DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations_dataframe( dataframe=df1, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, trace_gid in enumerate([trace_gid1, trace_gid2]): def get_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["traceAnnotations"] } return annotations.get((df1_labels[i], df1_scores[i], df1_explanations[i])) anno = await _get( query_fn=get_df_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in trace annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df1_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df1_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == df1_annotator_kinds[i], ( f"DataFrame annotation {i + 1} annotator_kind should match input" ) # ============================================================================ # Test Case 2: Using trace_id as index # ============================================================================ # This test case demonstrates using trace_id as the DataFrame index # This is an alternative way to specify trace IDs without a dedicated column df2_annotation_names = [token_hex(8), token_hex(8)] df2_annotator_kinds = ["HUMAN", "CODE"] df2_labels = [token_hex(8), token_hex(8)] df2_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df2_explanations = [token_hex(8), token_hex(8)] df2_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df2 = pd.DataFrame( { "name": df2_annotation_names, "annotator_kind": df2_annotator_kinds, "label": df2_labels, "score": df2_scores, "explanation": df2_explanations, "metadata": df2_metadata, }, index=[trace_id1, trace_id2], ) # Log annotations from DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations_dataframe( dataframe=df2, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, trace_gid in enumerate([trace_gid1, trace_gid2]): def get_df2_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["traceAnnotations"] } return annotations.get((df2_labels[i], df2_scores[i], df2_explanations[i])) anno = await _get( query_fn=get_df2_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in trace annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df2_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df2_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == df2_annotator_kinds[i], ( f"DataFrame annotation {i + 1} annotator_kind should match input" ) # ============================================================================ # Test Case 3: Using global annotator_kind # ============================================================================ # This test case demonstrates using a global annotator_kind parameter # The DataFrame does not include an annotator_kind column, and the value is # provided as a parameter to the API call. global_annotator_kind: Literal["CODE"] = "CODE" df3_annotation_names = [token_hex(8), token_hex(8)] df3_trace_ids = [trace_id1, trace_id2] df3_labels = [token_hex(8), token_hex(8)] df3_scores = [ int.from_bytes(token_bytes(4), byteorder="big"), int.from_bytes(token_bytes(4), byteorder="big"), ] df3_explanations = [token_hex(8), token_hex(8)] df3_metadata = [{token_hex(8): token_hex(8)} for _ in range(2)] df3 = pd.DataFrame( { "name": df3_annotation_names, "trace_id": df3_trace_ids, "label": df3_labels, "score": df3_scores, "explanation": df3_explanations, "metadata": df3_metadata, } ) # Log annotations from DataFrame with global annotator_kind result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations_dataframe( dataframe=df3, annotator_kind=global_annotator_kind, sync=sync, ), ) if sync: assert result # Verify annotations were created correctly for i, trace_gid in enumerate([trace_gid1, trace_gid2]): def get_df3_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid)}, ) annotations = { (anno["label"], anno["score"], anno["explanation"]): anno for anno in res["data"]["node"]["traceAnnotations"] } return annotations.get((df3_labels[i], df3_scores[i], df3_explanations[i])) anno = await _get( query_fn=get_df3_annotation, error_msg=f"DataFrame annotation {i + 1} should be present in trace annotations", no_wait=sync, ) # Verify annotation exists with correct values assert anno["name"] == df3_annotation_names[i], ( f"DataFrame annotation {i + 1} name should match input" ) assert anno["source"] == "API", f"DataFrame annotation {i + 1} source should be API" assert anno["metadata"] == df3_metadata[i], ( f"DataFrame annotation {i + 1} metadata should match input" ) assert anno["annotatorKind"] == global_annotator_kind, ( f"DataFrame annotation {i + 1} annotator_kind should match global value" ) @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling trace annotations with zero scores. Verifies that: - Zero scores are saved and loaded correctly - Missing optional fields are handled properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL trace ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans trace_gid1 = span1.trace.id trace_id1 = span1.trace.trace_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score # ============================================================================ # Test that a score of 0 is properly recorded and not treated as falsey zero_score_annotation_name = token_hex(8) # Create annotation with score of 0 result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.add_trace_annotation( annotation_name=zero_score_annotation_name, trace_id=trace_id1, annotator_kind="LLM", score=0, # Explicitly test score of 0 sync=sync, ), ) if sync: assert result # Verify the annotation was created correctly by querying the GraphQL API def get_zero_score_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["traceAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_annotation, error_msg="Annotation with score of 0 should be present in trace annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "Annotation score should be exactly 0" assert anno["label"] is None, "Annotation label should be None" assert anno["explanation"] is None, "Annotation explanation should be None" assert anno["source"] == "API", "Annotation source should be API" assert anno["annotatorKind"] == "LLM", "Annotation annotator_kind should be LLM" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling zero scores in trace annotation DataFrames. Verifies that: - Zero scores can be read from DataFrames - Zero scores are saved and loaded correctly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract OTEL trace ID and graphql Global ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans trace_gid1 = span1.trace.id trace_id1 = span1.trace.trace_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score in DataFrame # ============================================================================ # Test that zero scores work correctly when provided via DataFrame import pandas as pd zero_score_annotation_name = token_hex(8) df = pd.DataFrame( { "name": [zero_score_annotation_name], "trace_id": [trace_id1], "annotator_kind": ["CODE"], "score": [0], # Test zero score # Omit label and explanation to test None handling } ) result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations_dataframe( dataframe=df, sync=sync ), ) if sync: assert result # Verify the annotation was created correctly def get_zero_score_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["traceAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_df_annotation, error_msg="Zero score DataFrame annotation should be present in trace annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "DataFrame annotation score should be exactly 0" assert anno["label"] is None, "DataFrame annotation label should be None" assert anno["explanation"] is None, "DataFrame annotation explanation should be None" assert anno["source"] == "API", "DataFrame annotation source should be API" assert anno["annotatorKind"] == "CODE", "DataFrame annotation annotator_kind should be CODE" async def test_burst_trace_annotations_with_unique_identifiers( self, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests sending multiple trace annotations in a burst with unique identifiers. Sends the same annotation name with different identifiers in rapid succession. Annotations are deduplicated by (name, identifier) tuple rather than just by name. Verifies that: - Multiple annotations with the same name but different identifiers can be sent in a burst - All 5 unique annotations are properly inserted - Each annotation maintains its unique identifier and all field values - Duplicate batches are correctly deduplicated """ # Setup assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans trace_gid1 = span1.trace.id trace_id1 = span1.trace.trace_id api_key = _app.admin_secret from phoenix.client import AsyncClient # Test Case: Burst Trace Annotations annotation_name = token_hex(8) num_annotations = 5 # Generate unique data for each annotation identifiers = [token_hex(8) for _ in range(num_annotations)] labels = [token_hex(8) for _ in range(num_annotations)] scores = [int.from_bytes(token_bytes(4), byteorder="big") for _ in range(num_annotations)] explanations = [token_hex(8) for _ in range(num_annotations)] metadata = [{token_hex(8): token_hex(8)} for _ in range(num_annotations)] # Create annotation data for burst sending trace_annotations: list[v1.TraceAnnotationData] = [ { "name": annotation_name, "trace_id": trace_id1, "annotator_kind": "CODE", "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(num_annotations) ] # Send all annotations in a burst task = AsyncClient(base_url=_app.base_url, api_key=api_key).traces.log_trace_annotations( trace_annotations=trace_annotations * 2, sync=False, ) await gather(task, task) # Verify all annotations were created correctly by querying the GraphQL API def get_all_burst_annotations() -> Optional[dict[str, dict[str, Any]]]: res, _ = _gql( _app, api_key, query=self.query, operation_name="GetTraceAnnotations", variables={"id": str(trace_gid1)}, ) # Filter to only our test annotations (by name) test_annotations = [ anno for anno in res["data"]["node"]["traceAnnotations"] if anno["name"] == annotation_name ] # Check if we have all expected annotations if len(test_annotations) == num_annotations: # Return indexed by identifier for easy lookup return {anno["identifier"]: anno for anno in test_annotations} return None annotations_by_identifier = await _get( query_fn=get_all_burst_annotations, error_msg=f"All {num_annotations} burst trace annotations should be present", ) # Verify each annotation exists with correct values for i in range(num_annotations): assert identifiers[i] in annotations_by_identifier, ( f"Trace annotation with identifier {identifiers[i]} should be present" ) anno = annotations_by_identifier[identifiers[i]] assert anno["name"] == annotation_name, f"Trace annotation {i} name should match input" assert anno["source"] == "API", f"Trace annotation {i} source should be API" assert anno["annotatorKind"] == "CODE", ( f"Trace annotation {i} annotator_kind should be CODE" ) assert anno["metadata"] == metadata[i], ( f"Trace annotation {i} metadata should match input" ) assert anno["label"] == labels[i], f"Trace annotation {i} label should match input" assert anno["score"] == scores[i], f"Trace annotation {i} score should match input" assert anno["explanation"] == explanations[i], ( f"Trace annotation {i} explanation should match input" ) class TestClientForSessionAnnotations: """Tests the Phoenix session annotation client functionality. Verifies that the client can: - Create and update single session annotations - Handle multiple session annotations at once - Work with different user roles - Work in both regular and async mode """ # GraphQL query to retrieve session annotations query = """ query GetSessionAnnotations($id: ID!) { node(id: $id) { ... on ProjectSession { sessionAnnotations { id name annotatorKind label score explanation metadata identifier source user { id } } } } } """ @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize( "role_or_user, api_key_kind", [ (_MEMBER, "User"), (_ADMIN, "User"), (_ADMIN, "System"), ], ) async def test_add_session_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, api_key_kind: Literal["User", "System"], _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests creating and updating single session annotations. Verifies that: - New session annotations can be created with all fields - Existing session annotations can be updated - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # Setup assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans session1 = span1.trace.session assert session1 is not None, "Session is required for this test" session_id1 = session1.session_id session_gid1 = session1.id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, api_key_kind)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # Test data annotation_name = token_hex(8) # ============================================================================ # Test Case: Create New Session Annotation # ============================================================================ # Create a new session annotation with all fields result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).sessions.add_session_annotation( annotation_name=annotation_name, session_id=session_id1, annotator_kind="CODE", label="helpful", score=0.9, explanation="This session was very helpful to the user", metadata={"model_name": "gpt-4", "version": "2024-01"}, identifier="test-id-123", sync=sync, ), ) if sync: assert result assert isinstance(result["id"], str) # Verify the annotation was created correctly by querying the GraphQL API def get_session_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, operation_name="GetSessionAnnotations", variables={"id": str(session_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["sessionAnnotations"]} return annotations.get(annotation_name) anno = await _get( query_fn=get_session_annotation, error_msg="Session annotation should be present in session annotations", no_wait=sync, ) # Verify the annotation exists and has the expected values assert anno["name"] == annotation_name assert anno["annotatorKind"] == "CODE" assert anno["label"] == "helpful" assert anno["score"] == 0.9 assert anno["explanation"] == "This session was very helpful to the user" assert anno["metadata"] == {"model_name": "gpt-4", "version": "2024-01"} assert anno["identifier"] == "test-id-123" assert anno["source"] == "API" # Store the annotation ID for the update test original_annotation_id = anno["id"] # ============================================================================ # Test Case: Update Existing Session Annotation # ============================================================================ # Update the annotation by sending it again with the same name, session_id, and identifier updated_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).sessions.add_session_annotation( annotation_name=annotation_name, session_id=session_id1, annotator_kind="HUMAN", # Changed label="extremely_helpful", # Changed score=1.0, # Changed explanation="This session was extremely helpful after review", # Changed metadata={"model_name": "gpt-4", "version": "2024-02", "reviewed": True}, # Changed identifier="test-id-123", # Same identifier - should update sync=sync, ), ) if sync: assert updated_result assert isinstance(updated_result["id"], str) # Verify the annotation was updated correctly updated_anno = await _get( query_fn=get_session_annotation, error_msg="Updated session annotation should be present in session annotations", no_wait=sync, ) # Verify the annotation was updated (same ID, different values) assert updated_anno["id"] == original_annotation_id # ID should remain the same assert updated_anno["name"] == annotation_name assert updated_anno["annotatorKind"] == "HUMAN" # Updated assert updated_anno["label"] == "extremely_helpful" # Updated assert updated_anno["score"] == 1.0 # Updated assert ( updated_anno["explanation"] == "This session was extremely helpful after review" ) # Updated assert updated_anno["metadata"] == { "model_name": "gpt-4", "version": "2024-02", "reviewed": True, } # Updated assert updated_anno["identifier"] == "test-id-123" assert updated_anno["source"] == "API" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_session_annotations( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling multiple session annotations at once. Verifies that: - Multiple session annotations can be created in one call - Multiple session annotations can be updated at once - Works with annotations across different sessions - Annotation IDs remain the same when updating - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract session IDs from the fixture assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" span1, span2, *_ = _existing_spans assert span1.trace.session is not None, "Session is required for this test" assert span2.trace.session is not None, "Session is required for this test" session_id1 = span1.trace.session.session_id session_id2 = span2.trace.session.session_id # Ensure we have different sessions for a more robust test unique_sessions = {session_id1, session_id2} if len(unique_sessions) < 2: pytest.skip("Test requires at least two different sessions") session_ids = [session_id1, session_id2] # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, "User")) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Data Setup # ============================================================================ # Create annotation data for both sessions annotation_names = [token_hex(8), token_hex(8)] labels = ["helpful", "relevant"] scores = [0.8, 0.9] explanations = ["First session annotation", "Second session annotation"] identifiers = ["batch-test-1", "batch-test-2"] metadata = [{"model": "gpt-4", "batch": 1}, {"model": "claude", "batch": 2}] # Create annotation data for both sessions session_annotations: list[v1.SessionAnnotationData] = [ { "name": annotation_names[i], "session_id": session_ids[i], "annotator_kind": "CODE", # Test non-default annotator_kind "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(len(session_ids)) ] # ============================================================================ # Test Case: Create Multiple Session Annotations # ============================================================================ result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).sessions.log_session_annotations( session_annotations=session_annotations, sync=sync ), ) if sync: assert result assert len(result) == 2 for r in result: assert isinstance(r["id"], str) # Wait for annotations to be inserted and verify them assert span1.trace.session is not None, "Session is required for this test" assert span2.trace.session is not None, "Session is required for this test" session_gid1 = span1.trace.session.id session_gid2 = span2.trace.session.id # Retrieve and verify first session annotation def get_session_annotation_1() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, variables={"id": str(session_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["sessionAnnotations"]} return annotations.get(annotation_names[0]) anno1 = await _get( query_fn=get_session_annotation_1, error_msg="First session annotation should be present", no_wait=sync, ) assert anno1["name"] == annotation_names[0] assert anno1["annotatorKind"] == "CODE" assert anno1["label"] == labels[0] assert anno1["score"] == scores[0] assert anno1["explanation"] == explanations[0] assert anno1["metadata"] == metadata[0] assert anno1["identifier"] == identifiers[0] assert anno1["source"] == "API" # Retrieve and verify second session annotation (if different session) if session_gid1 != session_gid2: def get_session_annotation_2() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, variables={"id": str(session_gid2)}, ) annotations = { anno["name"]: anno for anno in res["data"]["node"]["sessionAnnotations"] } return annotations.get(annotation_names[1]) anno2 = await _get( query_fn=get_session_annotation_2, error_msg="Second session annotation should be present", no_wait=sync, ) assert anno2["name"] == annotation_names[1] assert anno2["annotatorKind"] == "CODE" assert anno2["label"] == labels[1] assert anno2["score"] == scores[1] assert anno2["explanation"] == explanations[1] assert anno2["metadata"] == metadata[1] assert anno2["identifier"] == identifiers[1] assert anno2["source"] == "API" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_log_session_annotations_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests using DataFrames for session annotations. Tests three ways to use DataFrames: 1. With session_id as a column 2. With session_id as the index 3. With a shared annotator type Verifies that: - DataFrames are processed correctly in chunks - Both individual and global parameters work - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract session IDs from fixtures (use the first two) assert len(_existing_spans) >= 2, "At least two existing spans are required for this test" span1, span2, *_ = _existing_spans assert span1.trace.session is not None, "Session is required for this test" assert span2.trace.session is not None, "Session is required for this test" session_id1 = span1.trace.session.session_id session_id2 = span2.trace.session.session_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app, "User")) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case 1: session_id as a column # ============================================================================ import pandas as pd df_with_session_column = pd.DataFrame( { "name": ["helpfulness", "clarity"], "session_id": [session_id1, session_id2], "annotator_kind": ["HUMAN", "LLM"], "label": ["helpful", "clear"], "score": [0.8, 0.9], "explanation": ["Session was helpful", "Session was clear"], "metadata": [{"test": "case1a"}, {"test": "case1b"}], "identifier": ["df-test-1", "df-test-2"], } ) result = await _await_or_return( Client( base_url=_app.base_url, api_key=api_key ).sessions.log_session_annotations_dataframe( dataframe=df_with_session_column, sync=sync ), ) if sync: assert result assert len(result) == 2 # ============================================================================ # Test Case 2: session_id as index # ============================================================================ df_with_session_index = pd.DataFrame( { "name": ["relevance", "accuracy"], "annotator_kind": ["CODE", "HUMAN"], "label": ["relevant", "accurate"], "score": [0.7, 0.95], "explanation": ["Session was relevant", "Session was accurate"], "metadata": [{"test": "case2a"}, {"test": "case2b"}], "identifier": ["df-index-1", "df-index-2"], }, index=[session_id1, session_id2], ) result = await _await_or_return( Client( base_url=_app.base_url, api_key=api_key ).sessions.log_session_annotations_dataframe( dataframe=df_with_session_index, sync=sync ), ) if sync: assert result assert len(result) == 2 # ============================================================================ # Test Case 3: Global annotator_kind # ============================================================================ df_global_annotator = pd.DataFrame( { "name": ["engagement", "satisfaction"], "session_id": [session_id1, session_id2], "label": ["engaged", "satisfied"], "score": [0.6, 0.85], "explanation": ["Session showed engagement", "Session was satisfying"], "identifier": ["global-1", "global-2"], } ) result = await _await_or_return( Client( base_url=_app.base_url, api_key=api_key ).sessions.log_session_annotations_dataframe( dataframe=df_global_annotator, annotator_kind="LLM", # Global parameter sync=sync, ), ) if sync: assert result assert len(result) == 2 @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling session annotations with zero scores. Verifies that: - Zero scores are saved and loaded correctly - Missing optional fields are handled properly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract session ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans assert span1.trace.session is not None, "Session is required for this test" session_id1 = span1.trace.session.session_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score # ============================================================================ # Test that a score of 0 is properly recorded and not treated as falsey zero_score_annotation_name = token_hex(8) # Create annotation with score of 0 result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).sessions.add_session_annotation( annotation_name=zero_score_annotation_name, session_id=session_id1, annotator_kind="LLM", score=0, # Explicitly test score of 0 sync=sync, ), ) if sync: assert result # Verify the annotation was created correctly by querying the GraphQL API assert span1.trace.session is not None, "Session is required for this test" session_gid1 = span1.trace.session.id def get_zero_score_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, variables={"id": str(session_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["sessionAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_annotation, error_msg="Annotation with score of 0 should be present in session annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "Annotation score should be exactly 0" assert anno["label"] is None, "Annotation label should be None" assert anno["explanation"] is None, "Annotation explanation should be None" assert anno["source"] == "API", "Annotation source should be API" assert anno["annotatorKind"] == "LLM", "Annotation annotator_kind should be LLM" @pytest.mark.parametrize("sync", [True, False]) # server ingestion path @pytest.mark.parametrize("is_async", [True, False]) # sync/async client @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_zero_score_annotation_dataframe( self, sync: bool, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests handling zero scores in session annotation DataFrames. Verifies that: - Zero scores can be read from DataFrames - Zero scores are saved and loaded correctly - Works in both regular and async mode - User permissions are properly checked """ # ============================================================================ # Setup # ============================================================================ # Extract session ID from the fixture assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans assert span1.trace.session is not None, "Session is required for this test" session_id1 = span1.trace.session.session_id # Set up test environment with logged-in user u = _get_user(_app, role_or_user).log_in(_app) api_key = str(u.create_api_key(_app)) # Import appropriate client based on test parameter from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # ============================================================================ # Test Case: Zero Score in DataFrame # ============================================================================ # Test that zero scores work correctly when provided via DataFrame import pandas as pd zero_score_annotation_name = token_hex(8) df = pd.DataFrame( { "name": [zero_score_annotation_name], "session_id": [session_id1], "annotator_kind": ["CODE"], "score": [0], # Test zero score # Omit label and explanation to test None handling } ) result = await _await_or_return( Client( base_url=_app.base_url, api_key=api_key ).sessions.log_session_annotations_dataframe(dataframe=df, sync=sync), ) if sync: assert result # Verify the annotation was created correctly assert span1.trace.session is not None, "Session is required for this test" session_gid1 = span1.trace.session.id def get_zero_score_df_annotation() -> Optional[dict[str, Any]]: res, _ = _gql( _app, u, query=self.query, variables={"id": str(session_gid1)}, ) annotations = {anno["name"]: anno for anno in res["data"]["node"]["sessionAnnotations"]} return annotations.get(zero_score_annotation_name) anno = await _get( query_fn=get_zero_score_df_annotation, error_msg="Zero score DataFrame annotation should be present in session annotations", no_wait=sync, ) # Verify the annotation exists and has score of 0 assert anno["score"] == 0, "DataFrame annotation score should be exactly 0" assert anno["label"] is None, "DataFrame annotation label should be None" assert anno["explanation"] is None, "DataFrame annotation explanation should be None" assert anno["source"] == "API", "DataFrame annotation source should be API" assert anno["annotatorKind"] == "CODE", "DataFrame annotation annotator_kind should be CODE" async def test_burst_session_annotations_with_unique_identifiers( self, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Tests sending multiple session annotations in a burst with unique identifiers. Sends the same annotation name with different identifiers in rapid succession. Annotations are deduplicated by (name, identifier) tuple rather than just by name. Verifies that: - Multiple annotations with the same name but different identifiers can be sent in a burst - All 5 unique annotations are properly inserted - Each annotation maintains its unique identifier and all field values - Duplicate batches are correctly deduplicated """ # Setup assert _existing_spans, "At least one existing span is required for this test" span1, *_ = _existing_spans assert span1.trace.session is not None, "Session is required for this test" session_gid1 = span1.trace.session.id session_id1 = span1.trace.session.session_id api_key = _app.admin_secret from phoenix.client import AsyncClient # Test Case: Burst Session Annotations annotation_name = token_hex(8) num_annotations = 5 # Generate unique data for each annotation identifiers = [token_hex(8) for _ in range(num_annotations)] labels = [token_hex(8) for _ in range(num_annotations)] scores = [int.from_bytes(token_bytes(4), byteorder="big") for _ in range(num_annotations)] explanations = [token_hex(8) for _ in range(num_annotations)] metadata = [{token_hex(8): token_hex(8)} for _ in range(num_annotations)] # Create annotation data for burst sending session_annotations: list[v1.SessionAnnotationData] = [ { "name": annotation_name, "session_id": session_id1, "annotator_kind": "CODE", "identifier": identifiers[i], "metadata": metadata[i], "result": { "label": labels[i], "score": scores[i], "explanation": explanations[i], }, } for i in range(num_annotations) ] # Send all annotations in a burst task = AsyncClient( base_url=_app.base_url, api_key=api_key ).sessions.log_session_annotations( session_annotations=session_annotations * 2, sync=False, ) await gather(task, task) # Verify all annotations were created correctly by querying the GraphQL API def get_all_burst_annotations() -> Optional[dict[str, dict[str, Any]]]: res, _ = _gql( _app, api_key, query=self.query, operation_name="GetSessionAnnotations", variables={"id": str(session_gid1)}, ) # Filter to only our test annotations (by name) test_annotations = [ anno for anno in res["data"]["node"]["sessionAnnotations"] if anno["name"] == annotation_name ] # Check if we have all expected annotations if len(test_annotations) == num_annotations: # Return indexed by identifier for easy lookup return {anno["identifier"]: anno for anno in test_annotations} return None annotations_by_identifier = await _get( query_fn=get_all_burst_annotations, error_msg=f"All {num_annotations} burst session annotations should be present", ) # Verify each annotation exists with correct values for i in range(num_annotations): assert identifiers[i] in annotations_by_identifier, ( f"Session annotation with identifier {identifiers[i]} should be present" ) anno = annotations_by_identifier[identifiers[i]] assert anno["name"] == annotation_name, ( f"Session annotation {i} name should match input" ) assert anno["source"] == "API", f"Session annotation {i} source should be API" assert anno["annotatorKind"] == "CODE", ( f"Session annotation {i} annotator_kind should be CODE" ) assert anno["metadata"] == metadata[i], ( f"Session annotation {i} metadata should match input" ) assert anno["label"] == labels[i], f"Session annotation {i} label should match input" assert anno["score"] == scores[i], f"Session annotation {i} score should match input" assert anno["explanation"] == explanations[i], ( f"Session annotation {i} explanation should match input" ) class TestSendingAnnotationsBeforeSpan: """Tests sending annotations before spans exist. *CAVEAT* This is a single consolidated because we want to reduce flakiness and total wait time for server operations that are asynchronous. Verifies that: - Annotations can be sent before spans are created - Annotations are properly linked to spans after creation - Multiple annotations per span are handled correctly - Many evaluations can be processed at once - Data remains consistent - Span, trace, and session annotations work correctly - Existing annotations can be updated (UPSERT) """ # GraphQL queries for retrieving annotations and evaluations query = """ query GetSpanAnnotations($id: ID!) { node (id: $id) { ... on Span { spanAnnotations { id name source identifier annotatorKind metadata label score explanation } } } } query GetTraceAnnotations($id: ID!) { node (id: $id) { ... on Trace { traceAnnotations { id name source identifier annotatorKind metadata label score explanation } } } } query GetSessionAnnotations($id: ID!) { node (id: $id) { ... on ProjectSession { sessionAnnotations { id name source identifier annotatorKind metadata label score explanation } } } } query GetDocumentAnnotations($id: ID!) { node (id: $id) { ... on Span { documentEvaluations { id name source identifier annotatorKind metadata label score explanation documentPosition } } } } query GetSpanTraceGlobalID($traceId: ID!) { projects(first: 1000) { edges { node { trace(traceId: $traceId) { id traceId session { id } spans { edges { node { id spanId } } } } } } } } """ @pytest.fixture def _span(self) -> ReadableSpan: """Creates test span with document retrieval attributes.""" memory = InMemorySpanExporter() attributes = { "session.id": token_hex(8), "retrieval.documents.0.document.id": token_hex(8), "retrieval.documents.1.document.id": token_hex(8), "retrieval.documents.2.document.id": token_hex(8), } _start_span(project_name=token_hex(8), attributes=attributes, exporter=memory).end() assert (spans := memory.get_finished_spans()) return spans[0] def _get_span_trace_gid( self, app: _AppInfo, auth: _SecurityArtifact, *, span_id: str, trace_id: str, ) -> Optional[_SpanTraceGlobalID]: """Gets global ID for span_id and trace_id.""" res, _ = _gql( app, auth, query=self.query, variables={"traceId": trace_id}, operation_name="GetSpanTraceGlobalID", ) for project in res["data"]["projects"]["edges"]: if (trace := project["node"]["trace"]) and trace["traceId"] == trace_id: for edge in trace["spans"]["edges"]: if edge["node"]["spanId"] == span_id: return _SpanTraceGlobalID( span_gid=cast(str, edge["node"]["id"]), trace_gid=cast(str, trace["id"]), session_gid=cast(str, trace["session"]["id"]), ) return None @pytest.mark.filterwarnings("ignore:.*:DeprecationWarning") async def test_annotations_and_evaluations( self, _span: ReadableSpan, _app: _AppInfo, ) -> None: """Tests sending annotations and evaluations before spans exist. Verifies that: - Span and trace annotations can be sent before spans exist - Evaluations can be sent before spans exist - Data is properly linked to spans after creation - Multiple annotations per span are handled - Many evaluations can be processed at once - Data remains consistent - Existing annotations can be updated (UPSERT) """ # Get IDs from the span assert (span_context := _span.get_span_context()) # type: ignore[no-untyped-call] span_id = format_span_id(span_context.span_id) trace_id = format_trace_id(span_context.trace_id) assert ( self._get_span_trace_gid(_app, _app.admin_secret, span_id=span_id, trace_id=trace_id) is None ) assert (session_id := str((_span.attributes or {})["session.id"])) # Set up the client api_key = str(_app.admin_secret) from phoenix.client import Client client = Client(base_url=_app.base_url, api_key=api_key) # Make test data span_annotation_name = token_hex(8) trace_annotation_name = token_hex(8) session_annotation_name = token_hex(8) document_annotation_name = token_hex(8) span_eval_name = token_hex(8) trace_eval_name = token_hex(8) doc_eval_name = token_hex(8) span_annotator_kind: Any = choice(_ANNOTATOR_KINDS) trace_annotator_kind: Any = choice(_ANNOTATOR_KINDS) session_annotator_kind: Any = choice(_ANNOTATOR_KINDS) document_annotator_kind: Any = choice(_ANNOTATOR_KINDS) # Set up initial test data span_anno_scores: list[int] = [] span_anno_labels: list[str] = [] span_anno_explanations: list[str] = [] span_anno_metadatas: list[dict[str, Any]] = [] trace_anno_scores: list[int] = [] trace_anno_labels: list[str] = [] trace_anno_explanations: list[str] = [] trace_anno_metadatas: list[dict[str, Any]] = [] session_anno_scores: list[int] = [] session_anno_labels: list[str] = [] session_anno_explanations: list[str] = [] session_anno_metadatas: list[dict[str, Any]] = [] document_anno_scores: list[int] = [] document_anno_labels: list[str] = [] document_anno_explanations: list[str] = [] document_anno_metadatas: list[dict[str, Any]] = [] span_eval_scores: list[int] = [] span_eval_labels: list[str] = [] span_eval_explanations: list[str] = [] trace_eval_scores: list[int] = [] trace_eval_labels: list[str] = [] trace_eval_explanations: list[str] = [] doc_eval_scores: list[int] = [] doc_eval_labels: list[str] = [] doc_eval_explanations: list[str] = [] # Run multiple test rounds num_iterations = 2 assert num_iterations >= 2 document_position = 1 for _ in range(num_iterations): # Make new test data for this round span_anno_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) span_anno_labels.append(token_hex(8)) span_anno_explanations.append(token_hex(8)) span_anno_metadatas.append({token_hex(8): token_hex(8)}) trace_anno_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) trace_anno_labels.append(token_hex(8)) trace_anno_explanations.append(token_hex(8)) trace_anno_metadatas.append({token_hex(8): token_hex(8)}) session_anno_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) session_anno_labels.append(token_hex(8)) session_anno_explanations.append(token_hex(8)) session_anno_metadatas.append({token_hex(8): token_hex(8)}) document_anno_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) document_anno_labels.append(token_hex(8)) document_anno_explanations.append(token_hex(8)) document_anno_metadatas.append({token_hex(8): token_hex(8)}) span_eval_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) span_eval_labels.append(token_hex(8)) span_eval_explanations.append(token_hex(8)) trace_eval_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) trace_eval_labels.append(token_hex(8)) trace_eval_explanations.append(token_hex(8)) doc_eval_scores.append(int.from_bytes(token_bytes(4), byteorder="big")) doc_eval_labels.append(token_hex(8)) doc_eval_explanations.append(token_hex(8)) # Add annotations and evaluations client.spans.add_span_annotation( annotation_name=span_annotation_name, span_id=span_id, annotator_kind=span_annotator_kind, label=span_anno_labels[-1], score=span_anno_scores[-1], explanation=span_anno_explanations[-1], metadata=span_anno_metadatas[-1], sync=False, ) client.traces.add_trace_annotation( annotation_name=trace_annotation_name, trace_id=trace_id, annotator_kind=trace_annotator_kind, label=trace_anno_labels[-1], score=trace_anno_scores[-1], explanation=trace_anno_explanations[-1], metadata=trace_anno_metadatas[-1], sync=False, ) client.sessions.add_session_annotation( annotation_name=session_annotation_name, session_id=session_id, annotator_kind=session_annotator_kind, label=session_anno_labels[-1], score=session_anno_scores[-1], explanation=session_anno_explanations[-1], metadata=session_anno_metadatas[-1], sync=False, ) client.spans.add_document_annotation( annotation_name=document_annotation_name, span_id=span_id, document_position=document_position, annotator_kind=document_annotator_kind, label=document_anno_labels[-1], score=document_anno_scores[-1], explanation=document_anno_explanations[-1], metadata=document_anno_metadatas[-1], sync=False, ) px.Client(endpoint=_app.base_url, api_key=api_key).log_evaluations( SpanEvaluations( eval_name=span_eval_name, dataframe=pd.DataFrame( { "span_id": [span_id], "label": [span_eval_labels[-1]], "score": [span_eval_scores[-1]], "explanation": [span_eval_explanations[-1]], } ), ), TraceEvaluations( eval_name=trace_eval_name, dataframe=pd.DataFrame( { "trace_id": [trace_id], "label": [trace_eval_labels[-1]], "score": [trace_eval_scores[-1]], "explanation": [trace_eval_explanations[-1]], } ), ), DocumentEvaluations( eval_name=doc_eval_name, dataframe=pd.DataFrame( { "span_id": [span_id], "document_position": [document_position], "label": [doc_eval_labels[-1]], "score": [doc_eval_scores[-1]], "explanation": [doc_eval_explanations[-1]], } ), ), ) # Use a significant sleep duration to verify that annotations sent before # span ingestion are properly processed in chronological order await sleep(0.5) # Send the span and wait headers = {"authorization": f"Bearer {_app.admin_secret}"} assert ( _grpc_span_exporter(_app, headers=headers).export([_span]) is SpanExportResult.SUCCESS ) # Get the global IDs span_gid, trace_gid, session_gid = await _get( query_fn=lambda: self._get_span_trace_gid( _app, _app.admin_secret, span_id=span_id, trace_id=trace_id ), error_msg="Span and trace IDs should be present", ) # Helper functions for querying annotations def get_span_anno(key: tuple[str, int, str]) -> Optional[dict[str, Any]]: res, _ = _gql( _app, _app.admin_secret, query=self.query, variables={"id": str(span_gid)}, operation_name="GetSpanAnnotations", ) annos = { (result["label"], result["score"], result["explanation"]): result for result in res["data"]["node"]["spanAnnotations"] } return annos.get(key) def get_trace_anno(key: tuple[str, int, str]) -> Optional[dict[str, Any]]: res, _ = _gql( _app, _app.admin_secret, query=self.query, variables={"id": str(trace_gid)}, operation_name="GetTraceAnnotations", ) annos = { (result["label"], result["score"], result["explanation"]): result for result in res["data"]["node"]["traceAnnotations"] } return annos.get(key) def get_session_anno(key: tuple[str, int, str]) -> Optional[dict[str, Any]]: res, _ = _gql( _app, _app.admin_secret, query=self.query, variables={"id": str(session_gid)}, operation_name="GetSessionAnnotations", ) annos = { (result["label"], result["score"], result["explanation"]): result for result in res["data"]["node"]["sessionAnnotations"] } return annos.get(key) def get_document_anno(key: tuple[str, int, str]) -> Optional[dict[str, Any]]: res, _ = _gql( _app, _app.admin_secret, query=self.query, variables={"id": str(span_gid)}, operation_name="GetDocumentAnnotations", ) annos = { (result["label"], result["score"], result["explanation"]): result for result in res["data"]["node"]["documentEvaluations"] } return annos.get(key) # Check the initial annotations span_anno_label = span_anno_labels[-1] span_anno_score = span_anno_scores[-1] span_anno_explanation = span_anno_explanations[-1] anno = await _get( query_fn=get_span_anno, args=((span_anno_label, span_anno_score, span_anno_explanation),), error_msg="Span annotation should be present", ) assert anno["name"] == span_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == span_annotator_kind assert anno["metadata"] == span_anno_metadatas[-1] # Retain the gid for the UPSERT test span_anno_gid = anno["id"] # Check the span evaluations span_eval_label = span_eval_labels[-1] span_eval_score = span_eval_scores[-1] span_eval_explanation = span_eval_explanations[-1] anno = await _get( query_fn=get_span_anno, args=((span_eval_label, span_eval_score, span_eval_explanation),), error_msg="Span evaluation should be present", ) assert anno["name"] == span_eval_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" # Evaluation annotator kind is always "LLM" # Retain the gid for the UPSERT test span_eval_gid = anno["id"] # Check the trace annotations trace_anno_label = trace_anno_labels[-1] trace_anno_score = trace_anno_scores[-1] trace_anno_explanation = trace_anno_explanations[-1] anno = await _get( query_fn=get_trace_anno, args=((trace_anno_label, trace_anno_score, trace_anno_explanation),), error_msg="Trace annotation should be present", ) assert anno["name"] == trace_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == trace_annotator_kind assert anno["metadata"] == trace_anno_metadatas[-1] # Retain the gid for the UPSERT test trace_anno_gid = anno["id"] # Check the session annotations session_anno_label = session_anno_labels[-1] session_anno_score = session_anno_scores[-1] session_anno_explanation = session_anno_explanations[-1] anno = await _get( query_fn=get_session_anno, args=((session_anno_label, session_anno_score, session_anno_explanation),), error_msg="Session annotation should be present", ) assert anno["name"] == session_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == session_annotator_kind assert anno["metadata"] == session_anno_metadatas[-1] # Retain the gid for the UPSERT test session_anno_gid = anno["id"] # Check the trace evaluations trace_eval_label = trace_eval_labels[-1] trace_eval_score = trace_eval_scores[-1] trace_eval_explanation = trace_eval_explanations[-1] anno = await _get( query_fn=get_trace_anno, args=((trace_eval_label, trace_eval_score, trace_eval_explanation),), error_msg="Trace evaluation should be present", ) assert anno["name"] == trace_eval_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" # Evaluation annotator kind is always "LLM" # Retain the gid for the UPSERT test trace_eval_gid = anno["id"] # Check the document evaluations doc_eval_label = doc_eval_labels[-1] doc_eval_score = doc_eval_scores[-1] doc_eval_explanation = doc_eval_explanations[-1] anno = await _get( query_fn=get_document_anno, args=((doc_eval_label, doc_eval_score, doc_eval_explanation),), error_msg="Document evaluation should be present", ) assert anno["name"] == doc_eval_name assert anno["documentPosition"] == document_position # Check the document annotations document_anno_label = document_anno_labels[-1] document_anno_score = document_anno_scores[-1] document_anno_explanation = document_anno_explanations[-1] anno = await _get( query_fn=get_document_anno, args=((document_anno_label, document_anno_score, document_anno_explanation),), error_msg="Document annotation should be present", ) assert anno["name"] == document_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == document_annotator_kind assert anno["metadata"] == document_anno_metadatas[-1] assert anno["documentPosition"] == document_position # Retain the gid for the UPSERT test document_anno_gid = anno["id"] # Test UPSERT by updating existing annotations # Make new test data for updates new_span_annotator_kind: Any = choice(_ANNOTATOR_KINDS) new_span_anno_score = int.from_bytes(token_bytes(4), byteorder="big") new_span_anno_label = token_hex(8) new_span_anno_explanation = token_hex(8) new_span_anno_metadata = {token_hex(8): token_hex(8)} new_trace_annotator_kind: Any = choice(_ANNOTATOR_KINDS) new_trace_anno_score = int.from_bytes(token_bytes(4), byteorder="big") new_trace_anno_label = token_hex(8) new_trace_anno_explanation = token_hex(8) new_trace_anno_metadata = {token_hex(8): token_hex(8)} new_session_annotator_kind: Any = choice(_ANNOTATOR_KINDS) new_session_anno_score = int.from_bytes(token_bytes(4), byteorder="big") new_session_anno_label = token_hex(8) new_session_anno_explanation = token_hex(8) new_session_anno_metadata = {token_hex(8): token_hex(8)} new_document_annotator_kind: Any = choice(_ANNOTATOR_KINDS) new_document_anno_score = int.from_bytes(token_bytes(4), byteorder="big") new_document_anno_label = token_hex(8) new_document_anno_explanation = token_hex(8) new_document_anno_metadata = {token_hex(8): token_hex(8)} new_span_eval_score = int.from_bytes(token_bytes(4), byteorder="big") new_span_eval_label = token_hex(8) new_span_eval_explanation = token_hex(8) new_trace_eval_score = int.from_bytes(token_bytes(4), byteorder="big") new_trace_eval_label = token_hex(8) new_trace_eval_explanation = token_hex(8) new_doc_eval_score = int.from_bytes(token_bytes(4), byteorder="big") new_doc_eval_label = token_hex(8) new_doc_eval_explanation = token_hex(8) # Update annotations and evaluations client.spans.add_span_annotation( annotation_name=span_annotation_name, span_id=span_id, annotator_kind=new_span_annotator_kind, label=new_span_anno_label, score=new_span_anno_score, explanation=new_span_anno_explanation, metadata=new_span_anno_metadata, sync=False, ) client.traces.add_trace_annotation( annotation_name=trace_annotation_name, trace_id=trace_id, annotator_kind=new_trace_annotator_kind, label=new_trace_anno_label, score=new_trace_anno_score, explanation=new_trace_anno_explanation, metadata=new_trace_anno_metadata, sync=False, ) client.sessions.add_session_annotation( annotation_name=session_annotation_name, session_id=session_id, annotator_kind=new_session_annotator_kind, label=new_session_anno_label, score=new_session_anno_score, explanation=new_session_anno_explanation, metadata=new_session_anno_metadata, sync=False, ) client.spans.add_document_annotation( annotation_name=document_annotation_name, span_id=span_id, document_position=document_position, annotator_kind=new_document_annotator_kind, label=new_document_anno_label, score=new_document_anno_score, explanation=new_document_anno_explanation, metadata=new_document_anno_metadata, sync=False, ) px.Client(endpoint=_app.base_url, api_key=api_key).log_evaluations( SpanEvaluations( eval_name=span_eval_name, dataframe=pd.DataFrame( { "span_id": [span_id], "label": [new_span_eval_label], "score": [new_span_eval_score], "explanation": [new_span_eval_explanation], } ), ), TraceEvaluations( eval_name=trace_eval_name, dataframe=pd.DataFrame( { "trace_id": [trace_id], "label": [new_trace_eval_label], "score": [new_trace_eval_score], "explanation": [new_trace_eval_explanation], } ), ), DocumentEvaluations( eval_name=doc_eval_name, dataframe=pd.DataFrame( { "span_id": [span_id], "document_position": [document_position], "label": [new_doc_eval_label], "score": [new_doc_eval_score], "explanation": [new_doc_eval_explanation], } ), ), ) # Check updated annotations anno = await _get( query_fn=get_span_anno, args=((new_span_anno_label, new_span_anno_score, new_span_anno_explanation),), error_msg="Updated span annotation should be present", ) assert anno["name"] == span_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == new_span_annotator_kind assert anno["metadata"] == new_span_anno_metadata assert anno["id"] == span_anno_gid # Old version should no longer exist assert get_span_anno((span_anno_label, span_anno_score, span_anno_explanation)) is None # Check updated span evaluations anno = await _get( query_fn=get_span_anno, args=((new_span_eval_label, new_span_eval_score, new_span_eval_explanation),), error_msg="Updated span evaluation should be present", ) assert anno["name"] == span_eval_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" # Evaluation annotator kind is always "LLM" assert anno["id"] == span_eval_gid # Old version should no longer exist assert get_span_anno((span_eval_label, span_eval_score, span_eval_explanation)) is None # Check updated trace annotations anno = await _get( query_fn=get_trace_anno, args=((new_trace_anno_label, new_trace_anno_score, new_trace_anno_explanation),), error_msg="Updated trace annotation should be present", ) assert anno["name"] == trace_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == new_trace_annotator_kind assert anno["metadata"] == new_trace_anno_metadata assert anno["id"] == trace_anno_gid # Old version should no longer exist assert get_trace_anno((trace_anno_label, trace_anno_score, trace_anno_explanation)) is None # Check updated session annotations anno = await _get( query_fn=get_session_anno, args=((new_session_anno_label, new_session_anno_score, new_session_anno_explanation),), error_msg="Updated session annotation should be present", ) assert anno["name"] == session_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == new_session_annotator_kind assert anno["metadata"] == new_session_anno_metadata assert anno["id"] == session_anno_gid # Old version should no longer exist assert ( get_session_anno((session_anno_label, session_anno_score, session_anno_explanation)) is None ) # Check updated document annotations anno = await _get( query_fn=get_document_anno, args=( (new_document_anno_label, new_document_anno_score, new_document_anno_explanation), ), error_msg="Updated document annotation should be present", ) assert anno["name"] == document_annotation_name assert anno["source"] == "API" assert anno["annotatorKind"] == new_document_annotator_kind assert anno["metadata"] == new_document_anno_metadata assert anno["documentPosition"] == document_position assert anno["id"] == document_anno_gid # Old version should no longer exist assert ( get_document_anno((document_anno_label, document_anno_score, document_anno_explanation)) is None ) # Check updated trace evaluations anno = await _get( query_fn=get_trace_anno, args=((new_trace_eval_label, new_trace_eval_score, new_trace_eval_explanation),), error_msg="Updated trace evaluation should be present", ) assert anno["name"] == trace_eval_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" # Evaluation annotator kind is always "LLM" assert anno["id"] == trace_eval_gid # Old version should no longer exist assert get_trace_anno((trace_eval_label, trace_eval_score, trace_eval_explanation)) is None # Check updated document evaluations anno = await _get( query_fn=get_document_anno, args=((new_doc_eval_label, new_doc_eval_score, new_doc_eval_explanation),), error_msg="Updated document evaluation should be present", ) assert anno["name"] == doc_eval_name assert anno["source"] == "API" assert anno["annotatorKind"] == "LLM" # Evaluation annotator kind is always "LLM" assert anno["documentPosition"] == document_position # Old version should no longer exist assert get_document_anno((doc_eval_label, doc_eval_score, doc_eval_explanation)) is None class _SpanTraceGlobalID(NamedTuple): """Helper class for storing span and trace global IDs.""" span_gid: str trace_gid: str session_gid: str

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server